[med-svn] [snakemake] 01/04: New upstream version 3.10.0
Kevin Murray
daube-guest at moszumanska.debian.org
Thu Jan 19 00:46:41 UTC 2017
This is an automated email from the git hooks/post-receive script.
daube-guest pushed a commit to branch master
in repository snakemake.
commit 91fdd01c8fed7ae121e4b2e55f2d930b2ec761c4
Author: Kevin Murray <kdmfoss at gmail.com>
Date: Thu Jan 19 11:27:46 2017 +1100
New upstream version 3.10.0
---
CHANGELOG.md | 14 ++-
docs/index.rst | 2 +-
docs/project_info/faq.rst | 4 +-
docs/snakefiles/rules.rst | 41 +++++++
docs/tutorial/welcome.rst | 2 +-
snakemake/__init__.py | 22 +++-
snakemake/common.py | 24 ++++
snakemake/conda.py | 121 +++++++++++++++++----
snakemake/dag.py | 99 ++++++++++++++++-
snakemake/executors.py | 15 ++-
snakemake/io.py | 32 +++++-
snakemake/jobs.py | 65 ++++++++---
snakemake/logging.py | 72 ++++++------
snakemake/persistence.py | 4 +-
snakemake/remote/HTTP.py | 3 +
snakemake/rules.py | 57 +++++++---
snakemake/script.py | 13 ++-
snakemake/version.py | 2 +-
snakemake/workflow.py | 11 +-
tests/test_parser/Snakefile | 3 +-
tests/test_unpack_dict/Snakefile | 27 +++++
.../expected-results/prefix.out1.txt | 1 +
.../expected-results/prefix.out2.txt | 1 +
tests/test_unpack_dict/prefix.in1.txt | 1 +
tests/test_unpack_dict/prefix.in2.txt | 1 +
tests/test_unpack_list/Snakefile | 27 +++++
.../expected-results/prefix.out1.txt | 1 +
.../expected-results/prefix.out2.txt | 1 +
tests/test_unpack_list/prefix.in1.txt | 1 +
tests/test_unpack_list/prefix.in2.txt | 1 +
tests/tests.py | 8 ++
31 files changed, 553 insertions(+), 123 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f6c547a..8710eb4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,20 @@
# Change Log
+## [3.10.0] - 2017-01-18
+### Added
+- Workflows can now be archived to a tarball with `snakemake --archive my-workflow.tar.gz`. The archive contains all input files, source code versioned with git and all software packages that are defined via conda environments. Hence, the archive allows to fully reproduce a workflow on a different machine. Such an archive can be uploaded to Zenodo, such that your workflow is secured in a self-contained, executable way for the future.
+### Changed
+- Improved logging.
+- Reduced memory footprint.
+- Added a flag to automatically unpack the output of input functions.
+- Improved handling of HTTP redirects with remote files.
+- Improved exception handling with DRMAA.
+- Scripts referred by the script directive can now use locally defined external python modules.
+
+
## [3.9.1] - 2016-12-23
### Added
-- Jobs can be restarted upon failure (--restart-times).
+- Jobs can be restarted upon failure (--restart-times).
### Changed
- The docs have been restructured and improved. Now available under snakemake.readthedocs.org.
- Changes in scripts show up with --list-code-changes.
diff --git a/docs/index.rst b/docs/index.rst
index 9cd466b..aef65fc 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -63,7 +63,7 @@ Quick Example
Getting started
---------------
-To get started, consider the :ref:`tutorial <tutorial-welcome>`, the `introductory slides <http://slides.com/johanneskoester/deck-1>`_, and the :ref:`FAQ <project_info-faq>`.
+To get started, consider the :ref:`tutorial <tutorial-welcome>`, the `introductory slides <http://slides.com/johanneskoester/snakemake-tutorial-2016>`_, and the :ref:`FAQ <project_info-faq>`.
.. _main-support:
diff --git a/docs/project_info/faq.rst b/docs/project_info/faq.rst
index a256295..e2597f4 100644
--- a/docs/project_info/faq.rst
+++ b/docs/project_info/faq.rst
@@ -248,8 +248,8 @@ Here is an example where you want to merge N files together, but if N == 1 a sym
output: "{foo}/all_merged.txt"
input: my_input_func # some function that yields 1 or more files to merge
run:
- if len(output) > 1:
- shell("cat {input} | sort > {out}")
+ if len(input) > 1:
+ shell("cat {input} | sort > {output}")
else:
shell("ln -sr {input} {output}")
diff --git a/docs/snakefiles/rules.rst b/docs/snakefiles/rules.rst
index 28287fd..55ace0a 100644
--- a/docs/snakefiles/rules.rst
+++ b/docs/snakefiles/rules.rst
@@ -523,6 +523,47 @@ Note that the function will be executed when the rule is evaluated and before th
Finally, when implementing the input function, it is best practice to make sure that it can properly handle all possible wildcard values your rule can have.
In particular, input files should not be combined with very general rules that can be applied to create almost any file: Snakemake will try to apply the rule, and will report the exceptions of your input function as errors.
+.. _snakefiles-unpack:
+
+Input Functions and ``unpack()``
+--------------------------------
+
+In some cases, you might want to have your input functions return named input files.
+This can be done by having them return ``dict()`` objects with the names as the dict keys and the file names as the dict values and using the ``unpack()`` keyword.
+
+.. code-block:: python
+
+ def myfunc(wildcards):
+ return { 'foo': '{wildcards.token}.txt'.format(wildcards=wildcards)
+
+ rule:
+ input: unpack(myfunc)
+ output: "someoutput.{token}.txt"
+ shell: "..."
+
+Note that ``unpack()`` only necessary for input functions returning ``dict``.
+While it also works for ``list``, remember that lists (and nested lists) of strings are automatically flattened.
+
+Also note that if you do not pass in a *function* into the input list but you directly *call a function* then you don't use ``unpack()`` either.
+Here, you can simply use Python's double-star (``**``) operator for unpacking the parameters.
+
+Note that as Snakefiles are translated into Python for execution, the same rules as for using the `star and double-star unpacking Python operators <https://docs.python.org/3/tutorial/controlflow.html#unpacking-argument-lists>`_ apply.
+These restrictions do not apply when using ``unpack()``.
+
+.. code-block:: python
+
+ def myfunc1():
+ return ['foo.txt']
+
+ def myfunc2():
+ return {'foo': 'nowildcards.txt'}
+
+ rule:
+ input:
+ *myfunc1(),
+ **myfunc2(),
+ output: "..."
+ shell: "..."
.. _snakefiles-version_tracking:
diff --git a/docs/tutorial/welcome.rst b/docs/tutorial/welcome.rst
index 28307b4..18c95eb 100644
--- a/docs/tutorial/welcome.rst
+++ b/docs/tutorial/welcome.rst
@@ -32,7 +32,7 @@ Tutorial Setup
.. _Vagrant: https://www.vagrantup.com
.. _Vagrant Documentation: https://docs.vagrantup.com
.. _Blogpost: http://blog.osteel.me/posts/2015/01/25/how-to-use-vagrant-on-windows.html
-.. _slides: http://slides.com/johanneskoester/deck-1
+.. _slides: http://slides.com/johanneskoester/snakemake-tutorial-2016
This tutorial introduces the text-based workflow system Snakemake_.
Snakemake follows the `GNU Make`_ paradigm: workflows are defined in terms of rules that define how to create output files from input files.
diff --git a/snakemake/__init__.py b/snakemake/__init__.py
index 2293ac8..6c802a2 100644
--- a/snakemake/__init__.py
+++ b/snakemake/__init__.py
@@ -74,6 +74,7 @@ def snakemake(snakefile,
list_params_changes=False,
list_resources=False,
summary=False,
+ archive=None,
detailed_summary=False,
latency_wait=3,
benchmark_repeats=1,
@@ -150,6 +151,7 @@ def snakemake(snakefile,
list_input_changes (bool): list output files with changed input files (default False)
list_params_changes (bool): list output files with changed params (default False)
summary (bool): list summary of all output files and their status (default False)
+ archive (str): archive workflow into the given tarball
latency_wait (int): how many seconds to wait for an output file to appear after the execution of a job, e.g. to handle filesystem latency (default 3)
benchmark_repeats (int): number of repeated runs of a job if declared for benchmarking (default 1)
wait_for_files (list): wait for given files to be present before executing the workflow
@@ -401,6 +403,7 @@ def snakemake(snakefile,
list_input_changes=list_input_changes,
list_params_changes=list_params_changes,
summary=summary,
+ archive=archive,
latency_wait=latency_wait,
benchmark_repeats=benchmark_repeats,
wait_for_files=wait_for_files,
@@ -628,6 +631,20 @@ def get_argument_parser():
"explanatory. Finally the last column denotes whether the file "
"will be updated or created during the next workflow execution.")
parser.add_argument(
+ "--archive",
+ metavar="FILE",
+ help="Archive the workflow into the given tar archive FILE. The archive "
+ "will be created such that the workflow can be re-executed on a vanilla "
+ "system. The function needs conda and git to be installed. "
+ "It will archive every file that is under git version control. "
+ "Note that it is best practice to have the Snakefile, config files, and "
+ "scripts under version control. Hence, they will be included in the archive. "
+ "Further, it will add input files that are not generated by "
+ "by the workflow itself and conda environments. Note that symlinks are "
+ "dereferenced. Supported "
+ "formats are .tar, .tar.gz, .tar.bz2 and .tar.xz."
+ )
+ parser.add_argument(
"--touch", "-t",
action="store_true",
help=("Touch output files (mark them up to date without really "
@@ -939,10 +956,10 @@ def get_argument_parser():
return parser
-def main():
+def main(argv=None):
"""Main entry point."""
parser = get_argument_parser()
- args = parser.parse_args()
+ args = parser.parse_args(argv)
if args.bash_completion:
cmd = b"complete -o bashdefault -C snakemake-bash-completion snakemake"
@@ -1055,6 +1072,7 @@ def main():
list_params_changes=args.list_params_changes,
summary=args.summary,
detailed_summary=args.detailed_summary,
+ archive=args.archive,
print_compilation=args.print_compilation,
verbose=args.verbose,
debug=args.debug,
diff --git a/snakemake/common.py b/snakemake/common.py
index 4c74a37..e210db0 100644
--- a/snakemake/common.py
+++ b/snakemake/common.py
@@ -1,3 +1,10 @@
+__author__ = "Johannes Köster"
+__copyright__ = "Copyright 2016, Johannes Köster"
+__email__ = "johannes.koester at protonmail.com"
+__license__ = "MIT"
+
+from functools import update_wrapper
+
DYNAMIC_FILL = "__snakemake_dynamic__"
@@ -9,3 +16,20 @@ class Mode:
default = 0
subprocess = 1
cluster = 2
+
+
+class lazy_property(property):
+ __slots__ = ["method", "cached", "__doc__"]
+
+ def __init__(self, method):
+ self.method = method
+ self.cached = "_{}".format(method.__name__)
+ super().__init__(method, doc=method.__doc__)
+
+ def __get__(self, instance, owner):
+ cached = getattr(instance, self.cached) if hasattr(instance, self.cached) else None
+ if cached is not None:
+ return cached
+ value = self.method(instance)
+ setattr(instance, self.cached, value)
+ return value
diff --git a/snakemake/conda.py b/snakemake/conda.py
index a206f71..e80dc82 100644
--- a/snakemake/conda.py
+++ b/snakemake/conda.py
@@ -1,15 +1,85 @@
import os
import subprocess
import tempfile
-from urllib.request import urlopen
+from urllib.request import urlopen, urlretrieve
+from urllib.parse import urlparse
import hashlib
import shutil
from distutils.version import StrictVersion
+import json
+from glob import glob
-from snakemake.exceptions import CreateCondaEnvironmentException
+from snakemake.exceptions import CreateCondaEnvironmentException, WorkflowError
from snakemake.logging import logger
+def get_env_archive(job, env_hash):
+ """Get path to archived environment derived from given environment file."""
+ return os.path.join(job.rule.workflow.persistence.conda_env_archive_path, env_hash)
+
+
+def archive_env(job):
+ """Create self-contained archive of environment."""
+ try:
+ import yaml
+ except ImportError:
+ raise WorkflowError("Error importing PyYAML. "
+ "Please install PyYAML to archive workflows.")
+
+ env_archive = get_env_archive(job, get_env_hash(job.conda_env_file))
+ if os.path.exists(env_archive):
+ return env_archive
+
+ try:
+ # Download
+ logger.info("Downloading packages for conda environment {}...".format(job.conda_env_file))
+ os.makedirs(env_archive, exist_ok=True)
+ try:
+ out = subprocess.check_output(["conda", "list", "--explicit",
+ "--prefix", job.conda_env],
+ stderr=subprocess.STDOUT)
+ logger.debug(out.decode())
+ except subprocess.CalledProcessError as e:
+ raise WorkflowError("Error exporting conda packages:\n" +
+ e.output.decode())
+ for l in out.decode().split("\n"):
+ if l and not l.startswith("#") and not l.startswith("@"):
+ pkg_url = l
+ logger.info(pkg_url)
+ parsed = urlparse(pkg_url)
+ pkg_name = os.path.basename(parsed.path)
+ urlretrieve(pkg_url, os.path.join(env_archive, pkg_name))
+ except (Exception, BaseException) as e:
+ shutil.rmtree(env_archive)
+ raise e
+ return env_archive
+
+
+def is_remote_env_file(env_file):
+ return urlparse(env_file).scheme
+
+
+def get_env_hash(env_file):
+ md5hash = hashlib.md5()
+ if is_remote_env_file(env_file):
+ md5hash.update(urlopen(env_file).read())
+ else:
+ with open(env_file, 'rb') as f:
+ md5hash.update(f.read())
+ return md5hash.hexdigest()
+
+
+def get_env_path(job, env_hash):
+ """Return environment path from hash.
+ First tries full hash, if it does not exist, (8-prefix) is used as
+ default."""
+ for h in [env_hash, env_hash[:8]]:
+ path = os.path.join(job.rule.workflow.persistence.conda_env_path, h)
+ if os.path.exists(path):
+ return path
+ return path
+
+
def create_env(job):
""" Create conda enviroment for the given job. """
if shutil.which("conda") is None:
@@ -25,31 +95,34 @@ def create_env(job):
"Unable to check conda version:\n" + e.output.decode()
)
- md5hash = hashlib.md5()
+ # Read env file and create hash.
env_file = job.conda_env_file
- if os.path.exists(env_file):
- with open(env_file, 'rb') as f:
- md5hash.update(f.read())
- else:
- if env_file.startswith('file://'):
- with open(env_file.replace('file://', ''), 'rb') as handle:
- content = handle.read()
- else:
- content = urlopen(env_file).read()
- md5hash.update(content)
+ tmp_file = None
+ if is_remote_env_file(env_file):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
- tmp.write(content)
+ tmp.write(urlopen(env_file).read())
env_file = tmp.name
-
- env_path = os.path.join(job.rule.workflow.persistence.conda_env_path, md5hash.hexdigest()[:8])
+ tmp_file = tmp.name
+ env_hash = get_env_hash(env_file)
+ env_path = get_env_path(job, env_hash)
+ # Create environment if not already present.
if not os.path.exists(env_path):
- logger.info("Creating conda environment for {}...".format(job.conda_env_file))
+ logger.info("Creating conda environment {}...".format(job.conda_env_file))
+ # Check if env archive exists. Use that if present.
+ env_archive = get_env_archive(job, env_hash)
try:
- out = subprocess.check_output(["conda", "env", "create",
- "--file", env_file,
- "--prefix", env_path],
- stderr=subprocess.STDOUT)
- logger.debug(out)
+ if os.path.exists(env_archive):
+ # install packages manually from env archive
+ out = subprocess.check_output(["conda", "create", "--prefix", env_path] +
+ glob(os.path.join(env_archive, "*.tar.bz2")),
+ stderr=subprocess.STDOUT
+ )
+ else:
+ out = subprocess.check_output(["conda", "env", "create",
+ "--file", env_file,
+ "--prefix", env_path],
+ stderr=subprocess.STDOUT)
+ logger.debug(out.decode())
logger.info("Environment for {} created.".format(job.conda_env_file))
except subprocess.CalledProcessError as e:
# remove potential partially installed environment
@@ -58,8 +131,8 @@ def create_env(job):
"Could not create conda environment from {}:\n".format(job.conda_env_file) +
e.output.decode())
- if env_file != job.conda_env_file:
+ if tmp_file:
# temporary file was created
- os.remove(env_file)
+ os.remove(tmp_file)
return env_path
diff --git a/snakemake/dag.py b/snakemake/dag.py
index 5e0d2ff..9ebcc79 100644
--- a/snakemake/dag.py
+++ b/snakemake/dag.py
@@ -7,11 +7,14 @@ import os
import shutil
import textwrap
import time
+import tarfile
from collections import defaultdict, Counter
from itertools import chain, combinations, filterfalse, product, groupby
from functools import partial, lru_cache
from inspect import isfunction, ismethod
from operator import itemgetter, attrgetter
+from pathlib import Path
+import subprocess
from snakemake.io import IOFile, _IOFile, PeriodicityDetector, wait_for_files, is_flagged, contains_wildcard
from snakemake.jobs import Job, Reason
@@ -25,6 +28,7 @@ from snakemake.exceptions import UnexpectedOutputException, InputFunctionExcepti
from snakemake.logging import logger
from snakemake.output_index import OutputIndex
from snakemake.common import DYNAMIC_FILL
+from snakemake import conda
# Workaround for Py <3.5 prior to existence of RecursionError
try:
@@ -121,13 +125,31 @@ class DAG:
job = self.update(self.file2jobs(file), file=file)
self.targetjobs.add(job)
+ self.cleanup()
+
self.update_needrun()
self.set_until_jobs()
self.delete_omitfrom_jobs()
+ self.update_jobids()
# check if remaining jobs are valid
- for job in self.jobs:
+ for i, job in enumerate(self.jobs):
job.is_valid()
+ def update_jobids(self):
+ for job in self.jobs:
+ if job not in self._jobid:
+ self._jobid[job] = len(self._jobid)
+
+ def cleanup(self):
+ final_jobs = set(self.jobs)
+ todelete = [job for job in self.dependencies if job not in final_jobs]
+ for job in todelete:
+ del self.dependencies[job]
+ try:
+ del self.depending[job]
+ except KeyError:
+ pass
+
def update_output_index(self):
"""Update the OutputIndex."""
self.output_index = OutputIndex(self.rules)
@@ -270,7 +292,9 @@ class DAG:
try:
wait_for_files(expanded_output, latency_wait=wait)
except IOError as e:
- raise MissingOutputException(str(e), rule=job.rule)
+ raise MissingOutputException(str(e) + "\nThis might be due to "
+ "filesystem latency. If that is the case, consider to increase the "
+ "wait time with --latency-wait.", rule=job.rule)
#It is possible, due to archive expansion or cluster clock skew, that
#the files appear older than the input. But we know they must be new,
@@ -407,8 +431,6 @@ class DAG:
def jobid(self, job):
"""Return job id of given job."""
- if job not in self._jobid:
- self._jobid[job] = len(self._jobid)
return self._jobid[job]
def update(self, jobs, file=None, visited=None, skip_until_dynamic=False):
@@ -480,12 +502,13 @@ class DAG:
exceptions = dict()
for file, jobs in potential_dependencies:
try:
- producer[file] = self.update(
+ selected_job = self.update(
jobs,
file=file,
visited=visited,
skip_until_dynamic=skip_until_dynamic or file in
job.dynamic_input)
+ producer[file] = selected_job
except (MissingInputException, CyclicGraphException,
PeriodicWildcardError) as ex:
if file in missing_input:
@@ -665,6 +688,7 @@ class DAG:
def postprocess(self):
"""Postprocess the DAG. This has to be invoked after any change to the
DAG topology."""
+ self.update_jobids()
self.update_needrun()
self.update_priority()
self.update_ready()
@@ -1063,6 +1087,71 @@ class DAG:
else:
yield "\t".join((f, date, rule, version, log, status, pending))
+ def archive(self, path):
+ """Archives workflow such that it can be re-run on a different system.
+
+ Archiving includes git versioned files (i.e. Snakefiles, config files, ...),
+ ancestral input files and conda environments.
+ """
+ if path.endswith(".tar"):
+ mode = "x"
+ elif path.endswith("tar.bz2"):
+ mode = "x:bz2"
+ elif path.endswith("tar.xz"):
+ mode = "x:xz"
+ elif path.endswith("tar.gz"):
+ mode = "x:xz"
+ else:
+ raise WorkflowError("Unsupported archive format "
+ "(supported: .tar, .tar.gz, .tar.bz2, .tar.xz)")
+ if os.path.exists(path):
+ raise WorkflowError("Archive already exists:\n" + path)
+
+ try:
+ workdir = Path(os.path.abspath(os.getcwd()))
+ with tarfile.open(path, mode=mode, dereference=True) as archive:
+
+ def add(path):
+ if workdir not in Path(os.path.abspath(path)).parents:
+ logger.warning("Path {} cannot be archived: "
+ "not within working directory.".format(path))
+ else:
+ archive.add(os.path.relpath(path))
+
+ logger.info("Archiving files under version control...")
+ try:
+ out = subprocess.check_output(["git", "ls-files", "."])
+ for f in out.decode().split("\n"):
+ if f:
+ logger.info(f)
+ add(f)
+ except subprocess.CalledProcessError as e:
+ raise WorkflowError("Error executing git.")
+
+ logger.info("Archiving external input files...")
+ for job in self.jobs:
+ # input files
+ for f in job.input:
+ if not any(f in files for files in self.dependencies[job].values()):
+ # this is an input file that is not created by any job
+ logger.info(f)
+ add(f)
+
+ logger.info("Archiving conda environments...")
+ envs = set()
+ for job in self.jobs:
+ if job.conda_env_file:
+ job.create_conda_env()
+ env_archive = job.archive_conda_env()
+ envs.add(env_archive)
+ for env in envs:
+ add(env)
+
+ except (Exception, BaseException) as e:
+ os.remove(path)
+ raise e
+
+
def d3dag(self, max_jobs=10000):
def node(job):
jobid = self.jobid(job)
diff --git a/snakemake/executors.py b/snakemake/executors.py
index 12ed07b..dd9a5d7 100644
--- a/snakemake/executors.py
+++ b/snakemake/executors.py
@@ -256,7 +256,10 @@ class CPUExecutor(RealExecutor):
if self.use_threads or (not job.is_shadow and (job.is_shell or job.is_norun or job.is_script or job.is_wrapper)):
job.prepare()
- job.create_conda_env()
+ conda_env = None
+ if self.workflow.use_conda:
+ job.create_conda_env()
+ conda_env = job.conda_env
benchmark = None
if job.benchmark is not None:
@@ -265,7 +268,7 @@ class CPUExecutor(RealExecutor):
run_wrapper, job.rule.run_func, job.input.plainstrings(),
job.output.plainstrings(), job.params, job.wildcards, job.threads,
job.resources, job.log.plainstrings(), job.rule.version, benchmark,
- self.benchmark_repeats, job.rule.name, job.conda_env,
+ self.benchmark_repeats, job.rule.name, conda_env,
self.workflow.linemaps, self.workflow.debug,
shadow_dir=job.shadow_dir)
else:
@@ -746,8 +749,8 @@ class DRMAAExecutor(ClusterExecutor):
jt.jobName = os.path.basename(jobscript)
jobid = self.session.runJob(jt)
- except (drmaa.errors.InternalException,
- drmaa.errors.InvalidAttributeValueException) as e:
+ except (drmaa.InternalException,
+ drmaa.InvalidAttributeValueException) as e:
print_exception(WorkflowError("DRMAA Error: {}".format(e)),
self.workflow.linemaps)
error_callback(job)
@@ -777,11 +780,11 @@ class DRMAAExecutor(ClusterExecutor):
try:
retval = self.session.wait(active_job.jobid,
drmaa.Session.TIMEOUT_NO_WAIT)
- except drmaa.errors.ExitTimeoutException as e:
+ except drmaa.ExitTimeoutException as e:
# job still active
self.active_jobs.append(active_job)
continue
- except (drmaa.errors.InternalException, Exception) as e:
+ except (drmaa.InternalException, Exception) as e:
print_exception(WorkflowError("DRMAA Error: {}".format(e)),
self.workflow.linemaps)
os.remove(active_job.jobscript)
diff --git a/snakemake/io.py b/snakemake/io.py
index d60a055..8c1b5de 100644
--- a/snakemake/io.py
+++ b/snakemake/io.py
@@ -3,6 +3,7 @@ __copyright__ = "Copyright 2015, Johannes Köster"
__email__ = "koester at jimmy.harvard.edu"
__license__ = "MIT"
+import collections
import os
import shutil
import re
@@ -72,9 +73,13 @@ class _IOFile(str):
A file that is either input or output of a rule.
"""
+ __slots__ = ["_is_function", "_file", "rule", "_regex"]
+
def __new__(cls, file):
obj = str.__new__(cls, file)
obj._is_function = isfunction(file) or ismethod(file)
+ obj._is_function = obj._is_function or (
+ isinstance(file, AnnotatedString) and bool(file.callable))
obj._file = file
obj.rule = None
obj._regex = None
@@ -450,17 +455,20 @@ def apply_wildcards(pattern,
return re.sub(_wildcard_regex, format_match, pattern)
-
-
-
def not_iterable(value):
return isinstance(value, str) or isinstance(value, dict) or not isinstance(
value, Iterable)
+def is_callable(value):
+ return (callable(value) or
+ (isinstance(value, _IOFile) and value._is_function))
+
+
class AnnotatedString(str):
def __init__(self, value):
self.flags = dict()
+ self.callable = value if is_callable(value) else None
def flag(value, flag_type, flag_value=True):
@@ -547,6 +555,8 @@ def dynamic(value):
def touch(value):
return flag(value, "touch")
+def unpack(value):
+ return flag(value, "unpack")
def expand(*args, **wildcards):
"""
@@ -758,7 +768,7 @@ class Namedlist(list):
add = len(items) - 1
for name, (i, j) in self._names.items():
if i > index:
- self._names[name] = (i + add, j + add)
+ self._names[name] = (i + add, None if j is None else j + add)
elif i == index:
self.set_name(name, i, end=i + len(items))
@@ -814,7 +824,7 @@ def _load_configfile(configpath):
try:
with open(configpath) as f:
try:
- return json.load(f)
+ return json.load(f, object_pairs_hook=collections.OrderedDict)
except ValueError:
f.seek(0) # try again
try:
@@ -824,7 +834,17 @@ def _load_configfile(configpath):
"has not been installed. Please install "
"PyYAML to use YAML config files.")
try:
- return yaml.load(f)
+ # From http://stackoverflow.com/a/21912744/84349
+ class OrderedLoader(yaml.Loader):
+ pass
+ def construct_mapping(loader, node):
+ loader.flatten_mapping(node)
+ return collections.OrderedDict(
+ loader.construct_pairs(node))
+ OrderedLoader.add_constructor(
+ yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
+ construct_mapping)
+ return yaml.load(f, OrderedLoader)
except yaml.YAMLError:
raise WorkflowError("Config file is not valid JSON or YAML. "
"In case of YAML, make sure to not mix "
diff --git a/snakemake/jobs.py b/snakemake/jobs.py
index e8ad1ce..0cf824d 100644
--- a/snakemake/jobs.py
+++ b/snakemake/jobs.py
@@ -20,7 +20,7 @@ from snakemake.utils import format, listfiles
from snakemake.exceptions import RuleException, ProtectedOutputException, WorkflowError
from snakemake.exceptions import UnexpectedOutputException, CreateCondaEnvironmentException
from snakemake.logging import logger
-from snakemake.common import DYNAMIC_FILL
+from snakemake.common import DYNAMIC_FILL, lazy_property
from snakemake import conda, wrapper
@@ -31,6 +31,14 @@ def jobfiles(jobs, type):
class Job:
HIGHEST_PRIORITY = sys.maxsize
+ __slots__ = ["rule", "dag", "targetfile", "wildcards_dict", "wildcards",
+ "_format_wildcards", "input", "dependencies", "output",
+ "_params", "_log", "_benchmark", "_resources",
+ "_conda_env_file", "_conda_env", "shadow_dir", "_inputsize",
+ "restart_times", "dynamic_output", "dynamic_input",
+ "temp_output", "protected_output", "touch_output",
+ "subworkflow_input", "_hash"]
+
def __init__(self, rule, dag, targetfile=None, format_wildcards=None):
self.rule = rule
self.dag = dag
@@ -77,9 +85,8 @@ class Job:
if f_ in self.rule.subworkflow_input:
self.subworkflow_input[f] = self.rule.subworkflow_input[f_]
self._hash = self.rule.__hash__()
- if True or not self.dynamic_output:
- for o in self.output:
- self._hash ^= o.__hash__()
+ for o in self.output:
+ self._hash ^= o.__hash__()
def is_valid(self):
"""Check if job is valid"""
@@ -133,10 +140,6 @@ class Job:
@property
def conda_env_file(self):
- if not self.rule.workflow.use_conda:
- # if use_conda is False, ignore conda_env_file definition
- return None
-
if self._conda_env_file is None:
self._conda_env_file = self.rule.expand_conda_env(self.wildcards_dict)
return self._conda_env_file
@@ -157,6 +160,13 @@ class Job:
except CreateCondaEnvironmentException as e:
raise WorkflowError(e, rule=self.rule)
+ def archive_conda_env(self):
+ """Archive a conda environment into a custom local channel."""
+ if self.conda_env_file:
+ self.create_conda_env()
+ return conda.archive_env(self)
+ return None
+
@property
def is_shadow(self):
return self.rule.shadow_depth is not None
@@ -575,6 +585,7 @@ class Job:
"params": params,
"threads": self.threads,
"resources": resources,
+ "jobid": self.dag.jobid(self)
}
properties.update(aux_properties)
return properties
@@ -607,16 +618,36 @@ class Job:
class Reason:
+
+ __slots__ = ["_updated_input", "_updated_input_run", "_missing_output",
+ "_incomplete_output", "forced", "noio", "nooutput", "derived"]
+
def __init__(self):
- self.updated_input = set()
- self.updated_input_run = set()
- self.missing_output = set()
- self.incomplete_output = set()
+ self._updated_input = None
+ self._updated_input_run = None
+ self._missing_output = None
+ self._incomplete_output = None
self.forced = False
self.noio = False
self.nooutput = False
self.derived = True
+ @lazy_property
+ def updated_input(self):
+ return set()
+
+ @lazy_property
+ def updated_input_run(self):
+ return set()
+
+ @lazy_property
+ def missing_output(self):
+ return set()
+
+ @lazy_property
+ def incomplete_output(self):
+ return set()
+
def __str__(self):
s = list()
if self.forced:
@@ -629,17 +660,17 @@ class Reason:
s.append("Rules with a run or shell declaration but no output "
"are always executed.")
else:
- if self.missing_output:
+ if self._missing_output:
s.append("Missing output files: {}".format(", ".join(
self.missing_output)))
- if self.incomplete_output:
+ if self._incomplete_output:
s.append("Incomplete output files: {}".format(", ".join(
self.incomplete_output)))
- updated_input = self.updated_input - self.updated_input_run
- if updated_input:
+ if self._updated_input:
+ updated_input = self.updated_input - self.updated_input_run
s.append("Updated input files: {}".format(", ".join(
updated_input)))
- if self.updated_input_run:
+ if self._updated_input_run:
s.append("Input files updated by another job: {}".format(
", ".join(self.updated_input_run)))
s = "; ".join(s)
diff --git a/snakemake/logging.py b/snakemake/logging.py
index 46788da..0dfcb0b 100644
--- a/snakemake/logging.py
+++ b/snakemake/logging.py
@@ -88,6 +88,7 @@ class Logger:
self.printreason = False
self.quiet = False
self.logfile = None
+ self.last_msg_was_job_info = False
def setup(self):
# logfile output is done always
@@ -185,7 +186,7 @@ class Logger:
if fmt != None:
yield fmt
- singleitems = ["benchmark"]
+ singleitems = ["jobid", "benchmark"]
if self.printreason:
singleitems.append("reason")
for item in singleitems:
@@ -207,26 +208,9 @@ class Logger:
yield " resources: " + resources
level = msg["level"]
- if level == "info" and not self.quiet:
- self.logger.warning(msg["msg"])
- if level == "warning":
- self.logger.warning(msg["msg"])
- elif level == "error":
- self.logger.error(msg["msg"])
- elif level == "debug":
- self.logger.debug(msg["msg"])
- elif level == "resources_info" and not self.quiet:
- self.logger.warning(msg["msg"])
- elif level == "run_info" and not self.quiet:
- self.logger.warning(msg["msg"])
- elif level == "progress" and not self.quiet:
- done = msg["done"]
- total = msg["total"]
- p = done / total
- percent_fmt = ("{:.2%}" if p < 0.01 else "{:.0%}").format(p)
- self.logger.info("{} of {} steps ({}) done".format(
- done, total, percent_fmt))
- elif level == "job_info" and not self.quiet:
+ if level == "job_info" and not self.quiet:
+ if not self.last_msg_was_job_info:
+ self.logger.info("")
if msg["msg"] is not None:
self.logger.info(msg["msg"])
if self.printreason:
@@ -234,18 +218,42 @@ class Logger:
else:
self.logger.info("\n".join(job_info(msg)))
self.logger.info("")
- elif level == "shellcmd":
- if self.printshellcmds:
+
+ self.last_msg_was_job_info = True
+ else:
+ if level == "info" and not self.quiet:
+ self.logger.warning(msg["msg"])
+ if level == "warning":
+ self.logger.warning(msg["msg"])
+ elif level == "error":
+ self.logger.error(msg["msg"])
+ elif level == "debug":
+ self.logger.debug(msg["msg"])
+ elif level == "resources_info" and not self.quiet:
self.logger.warning(msg["msg"])
- elif level == "job_finished":
- # do not display this on the console for now
- pass
- elif level == "rule_info":
- self.logger.info(msg["name"])
- if msg["docstring"]:
- self.logger.info(" " + msg["docstring"])
- elif level == "d3dag":
- print(json.dumps({"nodes": msg["nodes"], "links": msg["edges"]}))
+ elif level == "run_info" and not self.quiet:
+ self.logger.warning(msg["msg"])
+ elif level == "progress" and not self.quiet:
+ done = msg["done"]
+ total = msg["total"]
+ p = done / total
+ percent_fmt = ("{:.2%}" if p < 0.01 else "{:.0%}").format(p)
+ self.logger.info("{} of {} steps ({}) done".format(
+ done, total, percent_fmt))
+ elif level == "shellcmd":
+ if self.printshellcmds:
+ self.logger.warning(msg["msg"])
+ elif level == "job_finished" and not self.quiet:
+ self.logger.info("Finished job {}.".format(msg["jobid"]))
+ pass
+ elif level == "rule_info":
+ self.logger.info(msg["name"])
+ if msg["docstring"]:
+ self.logger.info(" " + msg["docstring"])
+ elif level == "d3dag":
+ print(json.dumps({"nodes": msg["nodes"], "links": msg["edges"]}))
+
+ self.last_msg_was_job_info = False
def format_dict(dict, omit_keys=[], omit_values=[]):
diff --git a/snakemake/persistence.py b/snakemake/persistence.py
index b395731..5fd903f 100644
--- a/snakemake/persistence.py
+++ b/snakemake/persistence.py
@@ -39,10 +39,12 @@ class Persistence:
self._shellcmd_path = os.path.join(self.path, "shellcmd_tracking")
self.shadow_path = os.path.join(self.path, "shadow")
self.conda_env_path = os.path.join(self.path, "conda")
+ self.conda_env_archive_path = os.path.join(self.path, "conda-archive")
for d in (self._incomplete_path, self._version_path, self._code_path,
self._rule_path, self._input_path, self._log_path, self._params_path,
- self._shellcmd_path, self.shadow_path, self.conda_env_path):
+ self._shellcmd_path, self.shadow_path, self.conda_env_path,
+ self.conda_env_archive_path):
if not os.path.exists(d):
os.mkdir(d)
diff --git a/snakemake/remote/HTTP.py b/snakemake/remote/HTTP.py
index 5d1bae5..d9941cd 100644
--- a/snakemake/remote/HTTP.py
+++ b/snakemake/remote/HTTP.py
@@ -87,6 +87,9 @@ class RemoteObject(DomainObject):
def exists(self):
if self._matched_address:
with self.httpr(verb="HEAD") as httpr:
+ # if a file redirect was found
+ if httpr.status_code in range(300,308):
+ raise HTTPFileException("The file specified appears to have been moved (HTTP %s), check the URL or try adding 'allow_redirects=True' to the remote() file object: %s" % (httpr.status_code, httpr.url))
return httpr.status_code == requests.codes.ok
return False
else:
diff --git a/snakemake/rules.py b/snakemake/rules.py
index 22911d0..308b2b2 100644
--- a/snakemake/rules.py
+++ b/snakemake/rules.py
@@ -12,7 +12,7 @@ from collections import defaultdict, Iterable
from snakemake.io import IOFile, _IOFile, protected, temp, dynamic, Namedlist, AnnotatedString, contains_wildcard_constraints, update_wildcard_constraints
from snakemake.io import expand, InputFiles, OutputFiles, Wildcards, Params, Log, Resources
-from snakemake.io import apply_wildcards, is_flagged, not_iterable
+from snakemake.io import apply_wildcards, is_flagged, not_iterable, is_callable
from snakemake.exceptions import RuleException, IOFileException, WildcardError, InputFunctionException, WorkflowError
from snakemake.logging import logger
from snakemake.common import Mode
@@ -415,6 +415,8 @@ class Rule:
snakefile=self.snakefile)
def apply_input_function(self, func, wildcards, **aux_params):
+ if isinstance(func, _IOFile):
+ func = func._file.callable
sig = inspect.signature(func)
_aux_params = {k: v for k, v in aux_params.items() if k in sig.parameters}
try:
@@ -434,26 +436,47 @@ class Rule:
for name, item in olditems.allitems():
start = len(newitems)
is_iterable = True
+ is_unpack = is_flagged(item, "unpack")
- if callable(item):
+ if is_callable(item):
item = self.apply_input_function(item, wildcards, **aux_params)
- if not_iterable(item) or no_flattening:
- item = [item]
- is_iterable = False
- for item_ in item:
- if check_return_type and not isinstance(item_, str):
- raise WorkflowError("Function did not return str or list "
- "of str.", rule=self)
- concrete = concretize(item_, wildcards)
- newitems.append(concrete)
- if mapping is not None:
- mapping[concrete] = item_
+ if is_unpack:
+ # Sanity checks before interpreting unpack()
+ if not isinstance(item, (list, dict)):
+ raise WorkflowError(
+ "Can only use unpack() on list and dict", rule=self)
+ if name:
+ raise WorkflowError(
+ "Cannot combine named input file with unpack()",
+ rule=self)
+ # Allow streamlined code with/without unpack
+ if isinstance(item, list):
+ pairs = zip([None] * len(item), item)
+ else:
+ assert isinstance(item, dict)
+ pairs = item.items()
+ else:
+ pairs = [(name, item)]
+
+ for name, item in pairs:
+ if not_iterable(item) or no_flattening:
+ item = [item]
+ is_iterable = False
+ for item_ in item:
+ if check_return_type and not isinstance(item_, str):
+ raise WorkflowError("Function did not return str or list "
+ "of str.", rule=self)
+ concrete = concretize(item_, wildcards)
+ newitems.append(concrete)
+ if mapping is not None:
+ mapping[concrete] = item_
- if name:
- newitems.set_name(
- name, start,
- end=len(newitems) if is_iterable else None)
+ if name:
+ newitems.set_name(
+ name, start,
+ end=len(newitems) if is_iterable else None)
+ start = len(newitems)
def expand_input(self, wildcards):
def concretize_iofile(f, wildcards):
diff --git a/snakemake/script.py b/snakemake/script.py
index 8b79e2d..ff39823 100644
--- a/snakemake/script.py
+++ b/snakemake/script.py
@@ -220,11 +220,18 @@ def script(path, basedir, input, output, params, wildcards, threads, resources,
raise ValueError(
"Unsupported script: Expecting either Python (.py) or R (.R) script.")
- dir = ".snakemake/scripts"
- os.makedirs(dir, exist_ok=True)
+ if path.startswith("file://"):
+ # in case of local path, use the same directory
+ dir = os.path.dirname(path)[7:]
+ prefix = ".snakemake."
+ else:
+ dir = ".snakemake/scripts"
+ prefix = ""
+ os.makedirs(dir, exist_ok=True)
+
with tempfile.NamedTemporaryFile(
suffix="." + os.path.basename(path),
- prefix="",
+ prefix=prefix,
dir=dir,
delete=False) as f:
f.write(preamble.encode())
diff --git a/snakemake/version.py b/snakemake/version.py
index 2916409..11acea2 100644
--- a/snakemake/version.py
+++ b/snakemake/version.py
@@ -1,3 +1,3 @@
-__version__ = "3.9.1"
+__version__ = "3.10.0"
MIN_PY_VERSION = (3, 3)
diff --git a/snakemake/workflow.py b/snakemake/workflow.py
index 28d5840..c11f1fe 100644
--- a/snakemake/workflow.py
+++ b/snakemake/workflow.py
@@ -23,7 +23,7 @@ from snakemake.dag import DAG
from snakemake.scheduler import JobScheduler
from snakemake.parser import parse
import snakemake.io
-from snakemake.io import protected, temp, temporary, ancient, expand, dynamic, glob_wildcards, flag, not_iterable, touch
+from snakemake.io import protected, temp, temporary, ancient, expand, dynamic, glob_wildcards, flag, not_iterable, touch, unpack
from snakemake.persistence import Persistence
from snakemake.utils import update_config
from snakemake.script import script
@@ -209,6 +209,7 @@ class Workflow:
list_input_changes=False,
list_params_changes=False,
summary=False,
+ archive=None,
detailed_summary=False,
latency_wait=3,
benchmark_repeats=3,
@@ -312,7 +313,7 @@ class Workflow:
self.persistence = Persistence(
nolock=nolock,
dag=dag,
- warn_only=dryrun or printrulegraph or printdag or summary or
+ warn_only=dryrun or printrulegraph or printdag or summary or archive or
list_version_changes or list_code_changes or list_input_changes or
list_params_changes)
@@ -406,6 +407,9 @@ class Workflow:
elif detailed_summary:
print("\n".join(dag.summary(detailed=True)))
return True
+ elif archive:
+ dag.archive(archive)
+ return True
elif list_version_changes:
items = list(
chain(*map(self.persistence.version_changed, dag.jobs)))
@@ -583,6 +587,9 @@ class Workflow:
self._ruleorder.add(*rulenames)
def subworkflow(self, name, snakefile=None, workdir=None, configfile=None):
+ # Take absolute path of config file, because it is relative to current
+ # workdir, which could be changed for the subworkflow.
+ configfile = os.path.abspath(configfile)
sw = Subworkflow(self, name, snakefile, workdir, configfile)
self._subworkflows[name] = sw
self.globals[name] = sw.target
diff --git a/tests/test_parser/Snakefile b/tests/test_parser/Snakefile
index 22d414f..8a5cd14 100644
--- a/tests/test_parser/Snakefile
+++ b/tests/test_parser/Snakefile
@@ -7,7 +7,6 @@ class Test:
rule:
output: Test().include
- shell:
- "touch {output}"
+ shell: "touch {output}"
diff --git a/tests/test_unpack_dict/Snakefile b/tests/test_unpack_dict/Snakefile
new file mode 100644
index 0000000..6c50dc3
--- /dev/null
+++ b/tests/test_unpack_dict/Snakefile
@@ -0,0 +1,27 @@
+rule all:
+ input:
+ 'prefix.out1.txt',
+ 'prefix.out2.txt',
+
+
+def get_in(wildcards):
+ result = {'one': '{wildcards.prefix}.in1.txt', 'two': '{wildcards.prefix}.in2.txt'}
+ return {key: value.format(wildcards=wildcards) for key, value in result.items()}
+
+
+def get_out():
+ return {'one': '{prefix}.out1.txt', 'two': '{prefix}.out2.txt'}
+
+
+rule copy_files:
+ input:
+ unpack(get_in)
+ output:
+ **get_out()
+ shell:
+ r"""
+ cp {input.one} {output.one}
+ cp {input.two} {output.two}
+ """
+
+
diff --git a/tests/test_unpack_dict/expected-results/prefix.out1.txt b/tests/test_unpack_dict/expected-results/prefix.out1.txt
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/tests/test_unpack_dict/expected-results/prefix.out1.txt
@@ -0,0 +1 @@
+1
diff --git a/tests/test_unpack_dict/expected-results/prefix.out2.txt b/tests/test_unpack_dict/expected-results/prefix.out2.txt
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/tests/test_unpack_dict/expected-results/prefix.out2.txt
@@ -0,0 +1 @@
+2
diff --git a/tests/test_unpack_dict/prefix.in1.txt b/tests/test_unpack_dict/prefix.in1.txt
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/tests/test_unpack_dict/prefix.in1.txt
@@ -0,0 +1 @@
+1
diff --git a/tests/test_unpack_dict/prefix.in2.txt b/tests/test_unpack_dict/prefix.in2.txt
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/tests/test_unpack_dict/prefix.in2.txt
@@ -0,0 +1 @@
+2
diff --git a/tests/test_unpack_list/Snakefile b/tests/test_unpack_list/Snakefile
new file mode 100644
index 0000000..03dba57
--- /dev/null
+++ b/tests/test_unpack_list/Snakefile
@@ -0,0 +1,27 @@
+rule all:
+ input:
+ 'prefix.out1.txt',
+ 'prefix.out2.txt',
+
+
+def get_in(wildcards):
+ result = ['{wildcards.prefix}.in1.txt', '{wildcards.prefix}.in2.txt']
+ return [x.format(wildcards=wildcards) for x in result]
+
+
+def get_out():
+ return ['{prefix}.out1.txt', '{prefix}.out2.txt']
+
+
+rule copy_files:
+ input:
+ unpack(get_in)
+ output:
+ *get_out()
+ shell:
+ r"""
+ cp {input[0]} {output[0]}
+ cp {input[1]} {output[1]}
+ """
+
+
diff --git a/tests/test_unpack_list/expected-results/prefix.out1.txt b/tests/test_unpack_list/expected-results/prefix.out1.txt
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/tests/test_unpack_list/expected-results/prefix.out1.txt
@@ -0,0 +1 @@
+1
diff --git a/tests/test_unpack_list/expected-results/prefix.out2.txt b/tests/test_unpack_list/expected-results/prefix.out2.txt
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/tests/test_unpack_list/expected-results/prefix.out2.txt
@@ -0,0 +1 @@
+2
diff --git a/tests/test_unpack_list/prefix.in1.txt b/tests/test_unpack_list/prefix.in1.txt
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/tests/test_unpack_list/prefix.in1.txt
@@ -0,0 +1 @@
+1
diff --git a/tests/test_unpack_list/prefix.in2.txt b/tests/test_unpack_list/prefix.in2.txt
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/tests/test_unpack_list/prefix.in2.txt
@@ -0,0 +1 @@
+2
diff --git a/tests/tests.py b/tests/tests.py
index 9d57da9..f27d531 100644
--- a/tests/tests.py
+++ b/tests/tests.py
@@ -191,6 +191,14 @@ def test_conditional():
targets="test.out test.0.out test.1.out test.2.out".split())
+def test_unpack_dict():
+ run(dpath("test_unpack_dict"))
+
+
+def test_unpack_list():
+ run(dpath("test_unpack_list"))
+
+
def test_shell():
run(dpath("test_shell"))
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/snakemake.git
More information about the debian-med-commit
mailing list