[med-svn] [snakemake] 01/04: Imported Upstream version 3.6.1+dfsg

Kevin Murray daube-guest at moszumanska.debian.org
Tue Apr 19 05:18:03 UTC 2016


This is an automated email from the git hooks/post-receive script.

daube-guest pushed a commit to branch master
in repository snakemake.

commit fa7a4fd6ea524cac6957f71ded4408646adea7e9
Author: Kevin Murray <spam at kdmurray.id.au>
Date:   Tue Apr 19 12:12:55 2016 +1000

    Imported Upstream version 3.6.1+dfsg
---
 CHANGELOG.md                                       |  71 ++++++++++++
 LICENSE => LICENSE.md                              |   2 +-
 README => README.md                                |   2 +-
 misc/vim/syntax/snakemake.vim                      |   6 +-
 setup.py                                           |   2 +-
 snakemake-tutorial.html                            |   2 +-
 snakemake/__init__.py                              |  18 +--
 snakemake/dag.py                                   | 127 ++++++++++++---------
 snakemake/exceptions.py                            |  18 +--
 snakemake/executors.py                             |  26 +++--
 snakemake/io.py                                    | 106 ++++++++++-------
 snakemake/jobs.py                                  |  86 ++++++++------
 snakemake/logging.py                               |   5 +-
 snakemake/output_index.py                          |   8 +-
 snakemake/parser.py                                |   8 +-
 snakemake/persistence.py                           |   2 +
 snakemake/remote/FTP.py                            |  12 +-
 snakemake/remote/dropbox.py                        |  33 ++++--
 snakemake/rules.py                                 |  39 +++----
 snakemake/script.py                                |  22 +++-
 snakemake/shell.py                                 |   2 +-
 snakemake/version.py                               |   2 +-
 snakemake/workflow.py                              |   7 ++
 tests/test01/Snakefile                             |   5 +
 tests/test_input_generator/Snakefile               |  33 ++++++
 .../test_input_generator/expected-results/foo.out1 |   3 +
 .../test_input_generator/expected-results/foo.out2 |   3 +
 .../test_input_generator/expected-results/foo.out3 |   3 +
 tests/test_input_generator/foo.1.in                |   1 +
 tests/test_input_generator/foo.2.in                |   1 +
 tests/test_input_generator/foo.3.in                |   1 +
 tests/test_script/Snakefile                        |   5 +
 tests/test_script/config.yaml                      |   1 +
 tests/test_script/scripts/test.R                   |   6 +
 tests/testapi.py                                   |  13 +++
 tests/tests.py                                     |   3 +
 36 files changed, 469 insertions(+), 215 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..581916d
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,71 @@
+# Change Log
+
+## [3.6.1] - 2016-04-08
+### Changed
+- Work around missing RecursionError in Python < 3.5
+- Improved conversion of numpy and pandas data structures to R scripts.
+- Fixed locking of working directory.
+
+## [3.6.0] - 2016-03-10
+### Added
+- onstart handler, that allows to add code that shall be only executed before the actual workflow execution (not on dryrun).
+- Parameters defined in the cluster config file are now accessible in the job properties under the key "cluster".
+- The wrapper directive can be considered stable.
+### Changed
+- Allow to use rule/job parameters with braces notation in cluster config.
+- Show a proper error message in case of recursion errors.
+- Remove non-empty temp dirs.
+- Don't set the process group of Snakemake in order to allow kill signals from parent processes to be propagated.
+- Fixed various corner case bugs.
+- The params directive no longer converts a list ``l`` implicitly to ``" ".join(l)``.
+
+## [3.5.5] - 2016-01-23
+### Added
+- New experimental wrapper directive, which allows to refer to re-usable [wrapper scripts](https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-wrappers). Wrappers are provided in the [Snakemake Wrapper Repository](https://bitbucket.org/snakemake/snakemake-wrappers).
+- David Koppstein implemented two new command line options to constrain the execution of the DAG of job to sub-DAGs (--until and --omit-from).
+### Changed
+- Fixed various bugs, e.g. with shadow jobs and --latency-wait.
+
+## [3.5.4] - 2015-12-04
+### Changed
+- The params directive now fully supports non-string parameters. Several bugs in the remote support were fixed.
+
+## [3.5.3] - 2015-11-24
+### Changed
+- The missing remote module was added to the package.
+
+## [3.5.2] - 2015-11-24
+### Added
+- Support for easy integration of external R and Python scripts via the new [script directive](https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-external-scripts).
+- Chris Tomkins-Tinch has implemented support for remote files: Snakemake can now handle input and output files from Amazon S3, Google Storage, FTP, SFTP, HTTP and Dropbox.
+- Simon Ye has implemented support for sandboxing jobs with [shadow rules](https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-shadow-rules).
+### Changed
+- Manuel Holtgrewe has fixed dynamic output files in combination with mutliple wildcards.
+- It is now possible to add suffixes to all shell commands with shell.suffix("mysuffix").
+- Job execution has been refactored to spawn processes only when necessary, resolving several problems in combination with huge workflows consisting of thousands of jobs and reducing the memory footprint.
+- In order to reflect the new collaborative development model, Snakemake has moved from my personal bitbucket account to http://snakemake.bitbucket.org.
+
+## [3.4.2] - 2015-09-12
+### Changed
+- Willem Ligtenberg has reduced the memory usage of Snakemake.
+- Per Unneberg has improved config file handling to provide a more intuitive overwrite behavior.
+- Simon Ye has improved the test suite of Snakemake and helped with setting up continuous integration via Codeship.
+- The cluster implementation has been rewritten to use only a single thread to wait for jobs. This avoids failures with large numbers of jobs.
+- Benchmarks are now writing tab-delimited text files instead of JSON.
+- Snakemake now always requires to set the number of jobs with -j when in cluster mode. Set this to a high value if your cluster does not have restrictions.
+- The Snakemake Conda package has been moved to the bioconda channel.
+- The handling of Symlinks was improved, which made a switch to Python 3.3 as the minimum required Python version necessary.
+
+## [3.4.1] - 2015-08-05
+### Changed
+- This release fixes a bug that caused named input or output files to always be returned as lists instead of single files.
+
+## [3.4] - 2015-07-18
+### Added
+- This release adds support for executing jobs on clusters in synchronous mode (e.g. qsub -sync). Thanks to David Alexander for implementing this.
+- There is now vim syntax highlighting support (thanks to Jay Hesselberth).
+- Snakemake is now available as Conda package.
+### Changed
+- Lots of bugs have been fixed. Thanks go to e.g. David Koppstein, Marcel Martin, John Huddleston and Tao Wen for helping with useful reports and debugging.
+
+See [here](https://bitbucket.org/snakemake/snakemake/wiki/News-Archive) for older changes.
diff --git a/LICENSE b/LICENSE.md
similarity index 94%
rename from LICENSE
rename to LICENSE.md
index 7472113..4c51a3a 100644
--- a/LICENSE
+++ b/LICENSE.md
@@ -1,4 +1,4 @@
-Copyright (c) 2015 Johannes Köster <johannes.koester at tu-dortmund.de>
+Copyright (c) 2016 Johannes Köster <johannes.koester at tu-dortmund.de>
 
 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 
diff --git a/README b/README.md
similarity index 88%
rename from README
rename to README.md
index 6fea197..b488bdd 100644
--- a/README
+++ b/README.md
@@ -4,4 +4,4 @@ Snakemake is a workflow management system that aims to reduce the complexity of
 
 Homepage: http://snakemake.bitbucket.org
 
-Copyright (c) 2015 Johannes Köster <johannes.koester at tu-dortmund.de> (see LICENSE)
+Copyright (c) 2016 Johannes Köster <johannes.koester at tu-dortmund.de> (see LICENSE)
diff --git a/misc/vim/syntax/snakemake.vim b/misc/vim/syntax/snakemake.vim
index e0f59f8..d8aef59 100644
--- a/misc/vim/syntax/snakemake.vim
+++ b/misc/vim/syntax/snakemake.vim
@@ -1,7 +1,7 @@
 " Vim syntax file
 " Language:	Snakemake (extended from python.vim)
 " Maintainer:	Jay Hesselberth (jay.hesselberth at gmail.com)
-" Last Change:	2015 Jul 1 
+" Last Change:	2016 Jan 23
 "
 " Usage
 "
@@ -43,9 +43,9 @@ source $VIMRUNTIME/syntax/python.vim
 
 syn keyword pythonStatement	include workdir onsuccess onerror
 syn keyword pythonStatement	ruleorder localrules configfile
-syn keyword pythonStatement	touch protected temp
+syn keyword pythonStatement	touch protected temp wrapper
 syn keyword pythonStatement	input output params message threads resources
-syn keyword pythonStatement	version run shell benchmark snakefile log
+syn keyword pythonStatement	version run shell benchmark snakefile log script
 syn keyword pythonStatement	rule subworkflow nextgroup=pythonFunction skipwhite
 
 " similar to special def and class treatment from python.vim, except
diff --git a/setup.py b/setup.py
index de6757a..6e8abeb 100644
--- a/setup.py
+++ b/setup.py
@@ -54,7 +54,7 @@ setup(
     },
     package_data={'': ['*.css', '*.sh', '*.html']},
     tests_require=['rpy2', 'docutils', 'nose>=1.3', 'boto>=2.38.0', 'filechunkio>=1.6', 
-                     'moto>=0.4.14', 'ftputil>=3.2', 'pysftp>=0.2.8', 'requests>=2.8.1', 'dropbox>=3.38'],
+                     'moto>=0.4.14', 'ftputil>=3.2', 'pysftp>=0.2.8', 'requests>=2.8.1', 'dropbox>=5.2'],
     cmdclass={'test': NoseTestCommand},
     classifiers=
     ["Development Status :: 5 - Production/Stable", "Environment :: Console",
diff --git a/snakemake-tutorial.html b/snakemake-tutorial.html
index c712169..7e6ef63 100644
--- a/snakemake-tutorial.html
+++ b/snakemake-tutorial.html
@@ -72,7 +72,7 @@ code > span.er { color: #ff0000; font-weight: bold; }
 <p>answer with <strong>yes</strong>. Along with a minimal Python 3 environment, Miniconda contains the package manager <a href="http://conda.pydata.org">Conda</a>. After opening a <strong>new terminal</strong>, you can use the new <code>conda</code> command to install software packages and create isolated environments to, e.g., use different versions of the same package. We will later use <a href="http://conda.pydata.org">Conda</a> to create an isolated enviroment with all required softw [...]
 <h2 id="step-2-preparing-a-working-directory">Step 2: Preparing a working directory</h2>
 <p>First, <strong>create a new directory</strong> <code>snakemake-tutorial</code> at a reasonable place and <strong>change into that directory</strong> in your terminal. If you use a Vagrant Linux VM from Windows as described above, create the directory under <code>/vagrant/</code>, so that the contents are shared with your host system (you can then edit all files from within Windows with an editor that supports Unix line breaks). In this directory, we will later create an example workfl [...]
-<pre class="sourceCode bash"><code class="sourceCode bash"><span class="kw">wget</span> https://bitbucket.org/johanneskoester/snakemake/downloads/snakemake-tutorial-data.tar.gz
+<pre class="sourceCode bash"><code class="sourceCode bash"><span class="kw">wget</span> https://bitbucket.org/snakemake/snakemake/downloads/snakemake-tutorial-data.tar.gz
 <span class="kw">tar</span> -xf snakemake-tutorial-data.tar.gz</code></pre>
 <p>This will create a <code>data</code> folder and a <code>requirements.txt</code> file in the working directory.</p>
 <h2 id="step-3-creating-an-environment-with-the-required-software">Step 3: Creating an environment with the required software</h2>
diff --git a/snakemake/__init__.py b/snakemake/__init__.py
index 33ba87c..2e8a2ad 100644
--- a/snakemake/__init__.py
+++ b/snakemake/__init__.py
@@ -130,7 +130,7 @@ def snakemake(snakefile,
         drmaa (str):                if not None use DRMAA for cluster support, str specifies native args passed to the cluster when submitting a job
         jobname (str):              naming scheme for cluster job scripts (default "snakejob.{rulename}.{jobid}.sh")
         immediate_submit (bool):    immediately submit all cluster jobs, regardless of dependencies (default False)
-        standalone (bool):          kill all processes very rudely in case of failure (do not use this if you use this API) (default False)
+        standalone (bool):          kill all processes very rudely in case of failure (do not use this if you use this API) (default False) (deprecated)
         ignore_ambiguity (bool):    ignore ambiguous rules and always take the first possible one (default False)
         snakemakepath (str):        path to the snakemake executable (default None)
         lock (bool):                lock the working directory when executing the workflow (default True)
@@ -277,15 +277,6 @@ def snakemake(snakefile,
                         overwrite_clusterconfig=cluster_config,
                         config_args=config_args,
                         debug=debug)
-
-    if standalone:
-        try:
-            # set the process group
-            os.setpgrp()
-        except:
-            # ignore: if it does not work we can still work without it
-            pass
-
     success = True
     try:
         workflow.include(snakefile,
@@ -739,7 +730,7 @@ def get_argument_parser():
                         help="Remove a lock on the working directory.")
     parser.add_argument(
         "--cleanup-metadata", "--cm",
-        nargs="*",
+        nargs="+",
         metavar="FILE",
         help="Cleanup the metadata "
         "of given files. That means that snakemake removes any tracked "
@@ -751,8 +742,7 @@ def get_argument_parser():
         "jobs the output of which is recognized as incomplete.")
     parser.add_argument("--ignore-incomplete", "--ii",
                         action="store_true",
-                        help="Ignore "
-                        "any incomplete jobs.")
+                        help="Do not check for incomplete output files.")
     parser.add_argument(
         "--list-version-changes", "--lv",
         action="store_true",
@@ -835,7 +825,7 @@ def get_argument_parser():
     parser.add_argument(
         "--no-hooks",
         action="store_true",
-        help="Do not invoke onsuccess or onerror hooks after execution.")
+        help="Do not invoke onstart, onsuccess or onerror hooks after execution.")
     parser.add_argument(
         "--print-compilation",
         action="store_true",
diff --git a/snakemake/dag.py b/snakemake/dag.py
index eb2e8b5..1b33df1 100644
--- a/snakemake/dag.py
+++ b/snakemake/dag.py
@@ -20,14 +20,21 @@ from snakemake.exceptions import MissingRuleException, AmbiguousRuleException
 from snakemake.exceptions import CyclicGraphException, MissingOutputException
 from snakemake.exceptions import IncompleteFilesException
 from snakemake.exceptions import PeriodicWildcardError
-from snakemake.exceptions import RemoteFileException
+from snakemake.exceptions import RemoteFileException, WorkflowError
 from snakemake.exceptions import UnexpectedOutputException, InputFunctionException
 from snakemake.logging import logger
 from snakemake.output_index import OutputIndex
 
+# Workaround for Py <3.5 prior to existence of RecursionError
+try:
+    RecursionError
+except NameError:
+    RecursionError = RuntimeError
+
 
 class DAG:
-    def __init__(self, workflow,
+    def __init__(self,
+                 workflow,
                  rules=None,
                  dryrun=False,
                  targetfiles=None,
@@ -83,7 +90,7 @@ class DAG:
             self.forcerules.update(forcerules)
         if forcefiles:
             self.forcefiles.update(forcefiles)
-        if untilrules: # keep only the rule names
+        if untilrules:  # keep only the rule names
             self.untilrules.update(set(rule.name for rule in untilrules))
         if untilfiles:
             self.untilfiles.update(untilfiles)
@@ -115,7 +122,6 @@ class DAG:
         self.set_until_jobs()
         self.delete_omitfrom_jobs()
 
-
     def update_output_index(self):
         self.output_index = OutputIndex(self.rules)
 
@@ -131,9 +137,9 @@ class DAG:
                     raise IncompleteFilesException(incomplete)
 
     def check_dynamic(self):
-        for job in filter(lambda job: (
-            job.dynamic_output and not self.needrun(job)
-        ), self.jobs):
+        for job in filter(
+                lambda job: (job.dynamic_output and not self.needrun(job)),
+                self.jobs):
             self.update_dynamic(job)
 
     @property
@@ -150,7 +156,8 @@ class DAG:
     def needrun_jobs(self):
         """ Jobs that need to be executed. """
         for job in filter(self.needrun,
-                          self.bfs(self.dependencies, *self.targetjobs,
+                          self.bfs(self.dependencies,
+                                   *self.targetjobs,
                                    stop=self.noneedrun_finished)):
             yield job
 
@@ -217,17 +224,17 @@ class DAG:
 
     @property
     def incomplete_files(self):
-        return list(chain(*(
-            job.output for job in filter(self.workflow.persistence.incomplete,
-                                         filterfalse(self.needrun, self.jobs))
-        )))
+        return list(chain(*(job.output
+                            for job in
+                            filter(self.workflow.persistence.incomplete,
+                                   filterfalse(self.needrun, self.jobs)))))
 
     @property
     def newversion_files(self):
-        return list(chain(*(
-            job.output
-            for job in filter(self.workflow.persistence.newversion, self.jobs)
-        )))
+        return list(chain(*(job.output
+                            for job in
+                            filter(self.workflow.persistence.newversion,
+                                   self.jobs))))
 
     def missing_temp(self, job):
         """
@@ -247,14 +254,14 @@ class DAG:
 
         input_maxtime = job.input_maxtime
         if input_maxtime is not None:
-            output_mintime = job.output_mintime
+            output_mintime = job.output_mintime_local
             if output_mintime is not None and output_mintime < input_maxtime:
                 raise RuleException(
                     "Output files {} are older than input "
                     "files. Did you extract an archive? Make sure that output "
                     "files have a more recent modification date than the "
-                    "archive, e.g. by using 'touch'.".format(
-                        ", ".join(job.expanded_output)),
+                    "archive, e.g. by using 'touch'.".format(", ".join(
+                        job.expanded_output)),
                     rule=job.rule)
 
     def unshadow_output(self, job):
@@ -264,7 +271,8 @@ class DAG:
         cwd = os.getcwd()
         for real_output in job.expanded_output:
             shadow_output = os.path.join(job.shadow_dir, real_output)
-            if os.path.realpath(shadow_output) == os.path.realpath(real_output):
+            if os.path.realpath(shadow_output) == os.path.realpath(
+                    real_output):
                 continue
             logger.info("Moving shadow output {} to destination {}".format(
                 shadow_output, real_output))
@@ -304,7 +312,6 @@ class DAG:
             for f in filter(job_.temp_output.__contains__, files):
                 yield f
 
-
     def handle_temp(self, job):
         """ Remove temp files if they are no longer needed. """
         if self.notemp:
@@ -325,7 +332,7 @@ class DAG:
 
         for f in unneeded_files():
             logger.info("Removing temporary output file {}.".format(f))
-            f.remove()
+            f.remove(remove_non_empty_dir=True)
 
     def handle_remote(self, job):
         """ Remove local files if they are no longer needed, and upload to S3. """
@@ -352,7 +359,7 @@ class DAG:
                     yield f
 
         for f in job.expanded_output:
-            if f.is_remote and not f.exists_remote:
+            if f.is_remote:
                 f.upload_to_remote()
                 remote_mtime = f.mtime
                 # immediately force local mtime to match remote,
@@ -372,7 +379,6 @@ class DAG:
 
         job.rmdir_empty_remote_dirs()
 
-
     def jobid(self, job):
         if job not in self._jobid:
             self._jobid[job] = len(self._jobid)
@@ -410,6 +416,17 @@ class DAG:
             except (MissingInputException, CyclicGraphException,
                     PeriodicWildcardError) as ex:
                 exceptions.append(ex)
+            except RecursionError as e:
+                raise WorkflowError(
+                    e, "If building the DAG exceeds the recursion limit, "
+                    "this is likely due to a cyclic dependency."
+                    "E.g. you might have a sequence of rules that "
+                    "can generate their own input. Try to make "
+                    "the output files more specific. "
+                    "A common pattern is to have different prefixes "
+                    "in the output files of different rules." +
+                    "\nProblematic file pattern: {}".format(file) if file else
+                    "")
         if producer is None:
             if cycles:
                 job = cycles[0]
@@ -476,7 +493,7 @@ class DAG:
             updated_subworkflow_input = self.updated_subworkflow_files.intersection(
                 job.input)
             if (job not in self.omitforce and job.rule in self.forcerules or
-                not self.forcefiles.isdisjoint(job.output)):
+                    not self.forcefiles.isdisjoint(job.output)):
                 reason.forced = True
             elif updated_subworkflow_input:
                 reason.updated_input.update(updated_subworkflow_input)
@@ -485,9 +502,8 @@ class DAG:
                 if not job.output and not job.benchmark:
                     if job.input:
                         if job.rule.norun:
-                            reason.updated_input_run.update([f
-                                                             for f in job.input
-                                                             if not f.exists])
+                            reason.updated_input_run.update(
+                                [f for f in job.input if not f.exists])
                         else:
                             reason.nooutput = True
                     else:
@@ -497,14 +513,15 @@ class DAG:
                         missing_output = job.missing_output()
                     else:
                         missing_output = job.missing_output(
-                            requested=set(chain(*self.depending[job].values()))
-                            | self.targetfiles)
+                            requested=set(chain(*self.depending[job].values(
+                            ))) | self.targetfiles)
                     reason.missing_output.update(missing_output)
             if not reason:
                 output_mintime_ = output_mintime(job)
                 if output_mintime_:
                     updated_input = [
-                        f for f in job.input
+                        f
+                        for f in job.input
                         if f.exists and f.is_newer(output_mintime_)
                     ]
                     reason.updated_input.update(updated_input)
@@ -542,12 +559,10 @@ class DAG:
 
         self._len = len(_needrun)
 
-
     def in_until(self, job):
         return (job.rule.name in self.untilrules or
                 not self.untilfiles.isdisjoint(job.output))
 
-
     def in_omitfrom(self, job):
         return (job.rule.name in self.omitrules or
                 not self.omitfiles.isdisjoint(job.output))
@@ -569,11 +584,10 @@ class DAG:
         "Removes jobs downstream of jobs specified by --omit-from."
         if not self.omitrules and not self.omitfiles:
             return
-        downstream_jobs = list(self.downstream_of_omitfrom()) # need to cast as list before deleting jobs
+        downstream_jobs = list(self.downstream_of_omitfrom()
+                               )  # need to cast as list before deleting jobs
         for job in downstream_jobs:
-            self.delete_job(job,
-                            recursive=False,
-                            add_dependencies=True)
+            self.delete_job(job, recursive=False, add_dependencies=True)
 
     def set_until_jobs(self):
         "Removes jobs downstream of jobs specified by --omit-from."
@@ -583,8 +597,9 @@ class DAG:
 
     def update_priority(self):
         """ Update job priorities. """
-        prioritized = (lambda job: job.rule in self.priorityrules or
-                       not self.priorityfiles.isdisjoint(job.output))
+        prioritized = (
+            lambda job: job.rule in self.priorityrules or not self.priorityfiles.isdisjoint(job.output)
+        )
         for job in self.needrun_jobs:
             self._priority[job] = job.rule.priority
         for job in self.bfs(self.dependencies,
@@ -601,8 +616,10 @@ class DAG:
     def update_downstream_size(self):
         for job in self.needrun_jobs:
             self._downstream_size[job] = sum(
-                1 for _ in self.bfs(self.depending, job,
-                                    stop=self.noneedrun_finished)) - 1
+                1
+                for _ in self.bfs(self.depending,
+                                  job,
+                                  stop=self.noneedrun_finished)) - 1
 
     def update_temp_input_count(self):
         for job in self.needrun_jobs:
@@ -616,8 +633,8 @@ class DAG:
         self.update_temp_input_count()
 
     def _ready(self, job):
-        return self._finished.issuperset(
-            filter(self.needrun, self.dependencies[job]))
+        return self._finished.issuperset(filter(self.needrun,
+                                                self.dependencies[job]))
 
     def finish(self, job, update_dynamic=True):
         self._finished.add(job)
@@ -651,8 +668,8 @@ class DAG:
             # this happens e.g. in dryrun if output is not yet present
             return
 
-        depending = list(filter(lambda job_: not self.finished(job_),
-                                self.bfs(self.depending, job)))
+        depending = list(filter(lambda job_: not self.finished(job_), self.bfs(
+            self.depending, job)))
         newrule, non_dynamic_wildcards = job.rule.dynamic_branch(
             dynamic_wildcards,
             input=False)
@@ -668,7 +685,8 @@ class DAG:
                     self.specialize_rule(job_.rule, newrule_)
                     if not self.dynamic(job_):
                         logger.debug("Updating job {}.".format(job_))
-                        newjob_ = Job(newrule_, self,
+                        newjob_ = Job(newrule_,
+                                      self,
                                       targetfile=job_.targetfile)
 
                         unexpected_output = self.reason(
@@ -683,9 +701,7 @@ class DAG:
                         self.replace_job(job_, newjob_)
         return newjob
 
-    def delete_job(self, job,
-                   recursive=True,
-                   add_dependencies=False):
+    def delete_job(self, job, recursive=True, add_dependencies=False):
         if job in self.targetjobs:
             self.targetjobs.remove(job)
         if add_dependencies:
@@ -783,7 +799,9 @@ class DAG:
     def dfs(self, direction, *jobs, stop=lambda job: False, post=True):
         visited = set()
         for job in jobs:
-            for job_ in self._dfs(direction, job, visited,
+            for job_ in self._dfs(direction,
+                                  job,
+                                  visited,
                                   stop=stop,
                                   post=post):
                 yield job_
@@ -801,7 +819,6 @@ class DAG:
         if post:
             yield job
 
-
     def is_isomorph(self, job1, job2):
         if job1.rule != job2.rule:
             return False
@@ -935,7 +952,8 @@ class DAG:
                          node2style=node2style,
                          node2label=node2label)
 
-    def _dot(self, graph,
+    def _dot(self,
+             graph,
              node2rule=lambda node: node,
              node2style=lambda node: "rounded",
              node2label=lambda node: node):
@@ -1042,8 +1060,9 @@ class DAG:
                     max_jobs))
         else:
             logger.d3dag(nodes=[node(job) for job in jobs],
-                         edges=[edge(dep, job) for job in jobs for dep in
-                                self.dependencies[job] if self.needrun(dep)])
+                         edges=[edge(dep, job)
+                                for job in jobs for dep in self.dependencies[
+                                    job] if self.needrun(dep)])
 
     def stats(self):
         rules = Counter()
diff --git a/snakemake/exceptions.py b/snakemake/exceptions.py
index 2e564c3..be14fd0 100644
--- a/snakemake/exceptions.py
+++ b/snakemake/exceptions.py
@@ -111,15 +111,14 @@ def print_exception(ex, linemaps):
 
 class WorkflowError(Exception):
     @staticmethod
-    def format_args(args):
-        for arg in args:
-            if isinstance(arg, str):
-                yield arg
-            else:
-                yield "{}: {}".format(arg.__class__.__name__, str(arg))
+    def format_arg(arg):
+        if isinstance(arg, str):
+            return arg
+        else:
+            return "{}: {}".format(arg.__class__.__name__, str(arg))
 
     def __init__(self, *args, lineno=None, snakefile=None, rule=None):
-        super().__init__("\n".join(self.format_args(args)))
+        super().__init__("\n".join(self.format_arg(arg) for arg in args))
         if rule is not None:
             self.lineno = rule.lineno
             self.snakefile = rule.snakefile
@@ -177,7 +176,10 @@ class RuleException(Exception):
 
 
 class InputFunctionException(WorkflowError):
-    pass
+    def __init__(self, msg, wildcards=None, lineno=None, snakefile=None, rule=None):
+        msg = self.format_arg(msg) + "\nWildcards:\n" + "\n".join(
+            "{}={}".format(name, value) for name, value in wildcards.items())
+        super().__init__(msg, lineno=lineno, snakefile=snakefile, rule=rule)
 
 
 class MissingOutputException(RuleException):
diff --git a/snakemake/executors.py b/snakemake/executors.py
index 42931ae..27f1061 100644
--- a/snakemake/executors.py
+++ b/snakemake/executors.py
@@ -377,7 +377,7 @@ class ClusterExecutor(RealExecutor):
                          overwrite_config=overwrite_config,
                          workflow=self.workflow,
                          cores=self.cores,
-                         properties=job.json(),
+                         properties=json.dumps(job.properties(cluster=self.cluster_params(job))),
                          latency_wait=self.latency_wait,
                          benchmark_repeats=self.benchmark_repeats,
                          target=target, wait_for_files=" ".join(wait_for_files),
@@ -392,10 +392,20 @@ class ClusterExecutor(RealExecutor):
                 "Make sure that your custom jobscript it up to date.".format(e))
         os.chmod(jobscript, os.stat(jobscript).st_mode | stat.S_IXUSR)
 
-    def cluster_wildcards(self, job):
+    def cluster_params(self, job):
+        """Return wildcards object for job from cluster_config."""
+
         cluster = self.cluster_config.get("__default__", dict()).copy()
         cluster.update(self.cluster_config.get(job.rule.name, dict()))
-        return Wildcards(fromdict=cluster)
+        # Format values with available parameters from the job.
+        for key, value in list(cluster.items()):
+            if isinstance(value, str):
+                cluster[key] = job.format_wildcards(value)
+
+        return cluster
+
+    def cluster_wildcards(self, job):
+        return Wildcards(fromdict=self.cluster_params(job))
 
 
 GenericClusterJob = namedtuple("GenericClusterJob", "job callback error_callback jobscript jobfinished jobfailed")
@@ -680,16 +690,16 @@ class DRMAAExecutor(ClusterExecutor):
                     try:
                         retval = self.session.wait(active_job.jobid,
                                                    drmaa.Session.TIMEOUT_NO_WAIT)
-                    except drmaa.errors.InternalException as e:
+                    except drmaa.errors.ExitTimeoutException as e:
+                        # job still active
+                        self.active_jobs.append(active_job)
+                        continue
+                    except (drmaa.errors.InternalException, Exception) as e:
                         print_exception(WorkflowError("DRMAA Error: {}".format(e)),
                                         self.workflow.linemaps)
                         os.remove(active_job.jobscript)
                         active_job.error_callback(active_job.job)
                         continue
-                    except drmaa.errors.ExitTimeoutException as e:
-                        # job still active
-                        self.active_jobs.append(active_job)
-                        continue
                     # job exited
                     os.remove(active_job.jobscript)
                     if retval.hasExited and retval.exitStatus == 0:
diff --git a/snakemake/io.py b/snakemake/io.py
index 0e7999b..16ee45c 100644
--- a/snakemake/io.py
+++ b/snakemake/io.py
@@ -4,6 +4,7 @@ __email__ = "koester at jimmy.harvard.edu"
 __license__ = "MIT"
 
 import os
+import shutil
 import re
 import stat
 import time
@@ -15,16 +16,24 @@ from snakemake.exceptions import MissingOutputException, WorkflowError, Wildcard
 from snakemake.logging import logger
 from inspect import isfunction, ismethod
 
+
 def lstat(f):
-    return os.stat(f, follow_symlinks=os.stat not in os.supports_follow_symlinks)
+    return os.stat(f,
+                   follow_symlinks=os.stat not in os.supports_follow_symlinks)
 
 
 def lutime(f, times):
-    return os.utime(f, times, follow_symlinks=os.utime not in os.supports_follow_symlinks)
+    return os.utime(
+        f,
+        times,
+        follow_symlinks=os.utime not in os.supports_follow_symlinks)
 
 
 def lchmod(f, mode):
-    return os.chmod(f, mode, follow_symlinks=os.chmod not in os.supports_follow_symlinks)
+    return os.chmod(
+        f,
+        mode,
+        follow_symlinks=os.chmod not in os.supports_follow_symlinks)
 
 
 def IOFile(file, rule=None):
@@ -54,13 +63,16 @@ class _IOFile(str):
             A decorator so that if the file is remote and has a version
             of the same file-related function, call that version instead.
         """
+
         @functools.wraps(func)
         def wrapper(self, *args, **kwargs):
             if self.is_remote:
                 self.update_remote_filepath()
-                if hasattr( self.remote_object, func.__name__):
-                    return getattr( self.remote_object, func.__name__)(*args, **kwargs)
+                if hasattr(self.remote_object, func.__name__):
+                    return getattr(self.remote_object, func.__name__)(*args, **
+                                                                      kwargs)
             return func(self, *args, **kwargs)
+
         return wrapper
 
     @property
@@ -107,7 +119,7 @@ class _IOFile(str):
     @property
     def protected(self):
         return self.exists_local and not os.access(self.file, os.W_OK)
-    
+
     @property
     @_refer_to_remote
     def mtime(self):
@@ -136,7 +148,8 @@ class _IOFile(str):
     def check_broken_symlink(self):
         """ Raise WorkflowError if file is a broken symlink. """
         if not self.exists_local and lstat(self.file):
-            raise WorkflowError("File {} seems to be a broken symlink.".format(self.file))
+            raise WorkflowError("File {} seems to be a broken symlink.".format(
+                self.file))
 
     def is_newer(self, time):
         return self.mtime > time
@@ -146,14 +159,13 @@ class _IOFile(str):
             logger.info("Downloading from remote: {}".format(self.file))
             self.remote_object.download()
         else:
-            raise RemoteFileException("The file to be downloaded does not seem to exist remotely.")
- 
+            raise RemoteFileException(
+                "The file to be downloaded does not seem to exist remotely.")
+
     def upload_to_remote(self):
-        if self.is_remote and not self.remote_object.exists():
+        if self.is_remote:
             logger.info("Uploading to remote: {}".format(self.file))
             self.remote_object.upload()
-        else:
-            raise RemoteFileException("The file to be uploaded does not seem to exist remotely.")
 
     def prepare(self):
         path_until_wildcard = re.split(self.dynamic_fill, self.file)[0]
@@ -167,8 +179,8 @@ class _IOFile(str):
                     raise e
 
     def protect(self):
-        mode = (lstat(self.file).st_mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~
-                stat.S_IWOTH)
+        mode = (lstat(self.file).st_mode & ~stat.S_IWUSR & ~stat.S_IWGRP
+                & ~stat.S_IWOTH)
         if os.path.isdir(self.file):
             for root, dirs, files in os.walk(self.file):
                 for d in dirs:
@@ -178,8 +190,8 @@ class _IOFile(str):
         else:
             lchmod(self.file, mode)
 
-    def remove(self):
-        remove(self.file)
+    def remove(self, remove_non_empty_dir=False):
+        remove(self.file, remove_non_empty_dir=False)
 
     def touch(self, times=None):
         """ times must be 2-tuple: (atime, mtime) """
@@ -203,7 +215,8 @@ class _IOFile(str):
             with open(self.file, "w") as f:
                 pass
 
-    def apply_wildcards(self, wildcards,
+    def apply_wildcards(self,
+                        wildcards,
                         fill_missing=False,
                         fail_dynamic=False):
         f = self._file
@@ -213,13 +226,15 @@ class _IOFile(str):
         # this bit ensures flags are transferred over to files after
         # wildcards are applied
 
-        file_with_wildcards_applied = IOFile(apply_wildcards(f, wildcards,
-                                      fill_missing=fill_missing,
-                                      fail_dynamic=fail_dynamic,
-                                      dynamic_fill=self.dynamic_fill),
-                                      rule=self.rule)
+        file_with_wildcards_applied = IOFile(
+            apply_wildcards(f,
+                            wildcards,
+                            fill_missing=fill_missing,
+                            fail_dynamic=fail_dynamic,
+                            dynamic_fill=self.dynamic_fill),
+            rule=self.rule)
 
-        file_with_wildcards_applied.clone_flags( self )
+        file_with_wildcards_applied.clone_flags(self)
 
         return file_with_wildcards_applied
 
@@ -297,14 +312,17 @@ def contains_wildcard(path):
     return _wildcard_regex.search(path) is not None
 
 
-def remove(file):
+def remove(file, remove_non_empty_dir=False):
     if os.path.exists(file):
         if os.path.isdir(file):
-            try:
-                os.removedirs(file)
-            except OSError:
-                # ignore non empty directories
-                pass
+            if remove_non_empty_dir:
+                shutil.rmtree(file)
+            else:
+                try:
+                    os.removedirs(file)
+                except OSError:
+                    # ignore non empty directories
+                    pass
         else:
             os.remove(file)
 
@@ -333,7 +351,8 @@ def regex(filepattern):
     return "".join(f)
 
 
-def apply_wildcards(pattern, wildcards,
+def apply_wildcards(pattern,
+                    wildcards,
                     fill_missing=False,
                     fail_dynamic=False,
                     dynamic_fill=None,
@@ -357,7 +376,8 @@ def apply_wildcards(pattern, wildcards,
 
 
 def not_iterable(value):
-    return isinstance(value, str) or not isinstance(value, Iterable)
+    return isinstance(value, str) or isinstance(value, dict) or not isinstance(
+        value, Iterable)
 
 
 class AnnotatedString(str):
@@ -383,6 +403,7 @@ def is_flagged(value, flag):
         return flag in value.flags and value.flags[flag]
     return False
 
+
 def get_flag_value(value, flag_type):
     if isinstance(value, AnnotatedString):
         if flag_type in value.flags:
@@ -390,6 +411,7 @@ def get_flag_value(value, flag_type):
         else:
             return None
 
+
 def temp(value):
     """
     A flag for an input or output file that shall be removed after usage.
@@ -398,8 +420,7 @@ def temp(value):
         raise SyntaxError(
             "Protected and temporary flags are mutually exclusive.")
     if is_flagged(value, "remote"):
-        raise SyntaxError(
-            "Remote and temporary flags are mutually exclusive.")
+        raise SyntaxError("Remote and temporary flags are mutually exclusive.")
     return flag(value, "temp")
 
 
@@ -414,8 +435,7 @@ def protected(value):
         raise SyntaxError(
             "Protected and temporary flags are mutually exclusive.")
     if is_flagged(value, "remote"):
-        raise SyntaxError(
-            "Remote and protected flags are mutually exclusive.")
+        raise SyntaxError("Remote and protected flags are mutually exclusive.")
     return flag(value, "protected")
 
 
@@ -440,6 +460,7 @@ def dynamic(value):
 def touch(value):
     return flag(value, "touch")
 
+
 def expand(*args, **wildcards):
     """
     Expand wildcards in given filepatterns.
@@ -467,8 +488,8 @@ def expand(*args, **wildcards):
 
     try:
         return [filepattern.format(**comb)
-                for comb in map(dict, combinator(*flatten(wildcards))) for
-                filepattern in filepatterns]
+                for comb in map(dict, combinator(*flatten(wildcards)))
+                for filepattern in filepatterns]
     except KeyError as e:
         raise WildcardError("No values given for wildcard {}.".format(e))
 
@@ -507,9 +528,9 @@ def glob_wildcards(pattern, files=None):
     pattern = re.compile(regex(pattern))
 
     if files is None:
-        files = ((os.path.join(dirpath, f) if dirpath != "." else f) 
-                    for dirpath, dirnames, filenames in os.walk(dirname) 
-                    for f in chain(filenames, dirnames))
+        files = ((os.path.join(dirpath, f) if dirpath != "." else f)
+                 for dirpath, dirnames, filenames in os.walk(dirname)
+                 for f in chain(filenames, dirnames))
 
     for f in files:
         match = re.match(pattern, f)
@@ -518,6 +539,7 @@ def glob_wildcards(pattern, files=None):
                 getattr(wildcards, name).append(value)
     return wildcards
 
+
 # TODO rewrite Namedlist!
 class Namedlist(list):
     """
@@ -675,7 +697,9 @@ def _load_configfile(configpath):
             try:
                 return yaml.load(f)
             except yaml.YAMLError:
-                raise WorkflowError("Config file is not valid JSON or YAML.")
+                raise WorkflowError("Config file is not valid JSON or YAML. "
+                                    "In case of YAML, make sure to not mix "
+                                    "whitespace and tab indentation.")
     except FileNotFoundError:
         raise WorkflowError("Config file {} not found.".format(configpath))
 
diff --git a/snakemake/jobs.py b/snakemake/jobs.py
index e8f0433..4e3a835 100644
--- a/snakemake/jobs.py
+++ b/snakemake/jobs.py
@@ -6,7 +6,6 @@ __license__ = "MIT"
 import os
 import sys
 import base64
-import json
 import tempfile
 
 from collections import defaultdict
@@ -43,7 +42,8 @@ class Job:
          self.dependencies) = rule.expand_wildcards(self.wildcards_dict)
 
         self.resources_dict = {
-            name: min(self.rule.workflow.global_resources.get(name, res), res)
+            name: min(
+                self.rule.workflow.global_resources.get(name, res), res)
             for name, res in rule.resources.items()
         }
         self.threads = self.resources_dict["_cores"]
@@ -86,8 +86,8 @@ class Job:
 
     @property
     def b64id(self):
-        return base64.b64encode((self.rule.name + "".join(self.output)
-                                 ).encode("utf-8")).decode("utf-8")
+        return base64.b64encode((self.rule.name + "".join(self.output)).encode(
+            "utf-8")).decode("utf-8")
 
     @property
     def inputsize(self):
@@ -135,9 +135,7 @@ class Job:
                     yield f_
                 for f, _ in expansion:
                     file_to_yield = IOFile(f, self.rule)
-
                     file_to_yield.clone_flags(f_)
-
                     yield file_to_yield
             else:
                 yield f
@@ -149,7 +147,9 @@ class Job:
             yield from self.expanded_output
         else:
             for f in self.expanded_output:
-                yield os.path.join(self.shadow_dir, f)
+                file_to_yield = IOFile(os.path.join(self.shadow_dir, f), self.rule)
+                file_to_yield.clone_flags(f)
+                yield file_to_yield
 
     @property
     def dynamic_wildcards(self):
@@ -169,10 +169,10 @@ class Job:
     def missing_input(self):
         """ Return missing input files. """
         # omit file if it comes from a subworkflow
-        return set(f for f in self.input
+        return set(f
+                   for f in self.input
                    if not f.exists and not f in self.subworkflow_input)
 
-
     @property
     def existing_remote_input(self):
         files = set()
@@ -211,6 +211,15 @@ class Job:
         return None
 
     @property
+    def output_mintime_local(self):
+        existing = [f.mtime_local for f in self.expanded_output if f.exists]
+        if self.benchmark and self.benchmark.exists:
+            existing.append(self.benchmark.mtime_local)
+        if existing:
+            return min(existing)
+        return None
+
+    @property
     def input_maxtime(self):
         """ Return newest input file. """
         existing = [f.mtime for f in self.input if f.exists]
@@ -263,7 +272,8 @@ class Job:
     def remote_input_newer_than_local(self):
         files = set()
         for f in self.remote_input:
-            if (f.exists_remote and f.exists_local) and (f.mtime > f.mtime_local):
+            if (f.exists_remote and f.exists_local) and (
+                    f.mtime > f.mtime_local):
                 files.add(f)
         return files
 
@@ -271,7 +281,8 @@ class Job:
     def remote_input_older_than_local(self):
         files = set()
         for f in self.remote_input:
-            if (f.exists_remote and f.exists_local) and (f.mtime < f.mtime_local):
+            if (f.exists_remote and f.exists_local) and (
+                    f.mtime < f.mtime_local):
                 files.add(f)
         return files
 
@@ -279,7 +290,8 @@ class Job:
     def remote_output_newer_than_local(self):
         files = set()
         for f in self.remote_output:
-            if (f.exists_remote and f.exists_local) and (f.mtime > f.mtime_local):
+            if (f.exists_remote and f.exists_local) and (
+                    f.mtime > f.mtime_local):
                 files.add(f)
         return files
 
@@ -287,17 +299,11 @@ class Job:
     def remote_output_older_than_local(self):
         files = set()
         for f in self.remote_output:
-            if (f.exists_remote and f.exists_local) and (f.mtime < f.mtime_local):
+            if (f.exists_remote and f.exists_local) and (
+                    f.mtime < f.mtime_local):
                 files.add(f)
         return files
 
-    def transfer_updated_files(self):
-        for f in self.remote_output_older_than_local | self.remote_input_older_than_local:
-            f.upload_to_remote()
-
-        for f in self.remote_output_newer_than_local | self.remote_input_newer_than_local:
-            f.download_from_remote()
-
     @property
     def files_to_download(self):
         toDownload = set()
@@ -372,12 +378,14 @@ class Job:
             for dirpath, dirnames, filenames in os.walk(cwd):
                 # Must exclude .snakemake and its children to avoid infinite
                 # loop of symlinks.
-                if os.path.commonprefix([snakemake_dir, dirpath]) == snakemake_dir:
+                if os.path.commonprefix([snakemake_dir, dirpath
+                                         ]) == snakemake_dir:
                     continue
                 for dirname in dirnames:
                     if dirname == ".snakemake":
                         continue
-                    relative_source = os.path.relpath(os.path.join(dirpath, dirname))
+                    relative_source = os.path.relpath(os.path.join(dirpath,
+                                                                   dirname))
                     shadow = os.path.join(self.shadow_dir, relative_source)
                     os.mkdir(shadow)
 
@@ -406,7 +414,8 @@ class Job:
     def empty_remote_dirs(self):
         for f in (set(self.output) | set(self.input)):
             if f.is_remote:
-                if os.path.exists(os.path.dirname(f)) and not len( os.listdir( os.path.dirname(f))):
+                if os.path.exists(os.path.dirname(f)) and not len(os.listdir(
+                        os.path.dirname(f))):
                     yield os.path.dirname(f)
 
     def rmdir_empty_remote_dirs(self):
@@ -414,7 +423,7 @@ class Job:
             try:
                 os.removedirs(d)
             except:
-                pass # it's ok if we can't remove the leaf
+                pass  # it's ok if we can't remove the leaf
 
     def format_wildcards(self, string, **variables):
         """ Format a string with variables from the job. """
@@ -437,7 +446,9 @@ class Job:
         except IndexError as ex:
             raise RuleException("IndexError: " + str(ex), rule=self.rule)
 
-    def properties(self, omit_resources="_cores _nodes".split()):
+    def properties(self,
+                   omit_resources="_cores _nodes".split(),
+                   **aux_properties):
         resources = {
             name: res
             for name, res in self.resources.items()
@@ -451,22 +462,21 @@ class Job:
             "output": self.output,
             "params": params,
             "threads": self.threads,
-            "resources": resources
+            "resources": resources,
         }
+        properties.update(aux_properties)
         return properties
 
-    def json(self):
-        return json.dumps(self.properties())
-
     def __repr__(self):
         return self.rule.name
 
     def __eq__(self, other):
         if other is None:
             return False
-        return (self.rule == other.rule and (
-            self.dynamic_output or self.wildcards_dict == other.wildcards_dict)
-                and (self.dynamic_input or self.input == other.input))
+        return (self.rule == other.rule and
+                (self.dynamic_output or
+                 self.wildcards_dict == other.wildcards_dict) and
+                (self.dynamic_input or self.input == other.input))
 
     def __lt__(self, other):
         return self.rule.__lt__(other.rule)
@@ -508,15 +518,15 @@ class Reason:
                          "are always executed.")
             else:
                 if self.missing_output:
-                    s.append("Missing output files: {}".format(
-                        ", ".join(self.missing_output)))
+                    s.append("Missing output files: {}".format(", ".join(
+                        self.missing_output)))
                 if self.incomplete_output:
-                    s.append("Incomplete output files: {}".format(
-                        ", ".join(self.incomplete_output)))
+                    s.append("Incomplete output files: {}".format(", ".join(
+                        self.incomplete_output)))
                 updated_input = self.updated_input - self.updated_input_run
                 if updated_input:
-                    s.append("Updated input files: {}".format(
-                        ", ".join(updated_input)))
+                    s.append("Updated input files: {}".format(", ".join(
+                        updated_input)))
                 if self.updated_input_run:
                     s.append("Input files updated by another job: {}".format(
                         ", ".join(self.updated_input_run)))
diff --git a/snakemake/logging.py b/snakemake/logging.py
index c404913..9e52546 100644
--- a/snakemake/logging.py
+++ b/snakemake/logging.py
@@ -78,6 +78,8 @@ class Logger:
         self.stream_handler = None
         self.printshellcmds = False
         self.printreason = False
+        self.quiet = False
+        self.logfile = None
 
     def setup(self):
         # logfile output is done always
@@ -94,7 +96,8 @@ class Logger:
         os.remove(self.logfile)
 
     def get_logfile(self):
-        self.logfile_handler.flush()
+        if self.logfile is not None:
+            self.logfile_handler.flush()
         return self.logfile
 
     def handler(self, msg):
diff --git a/snakemake/output_index.py b/snakemake/output_index.py
index 2c9456f..0127f4b 100644
--- a/snakemake/output_index.py
+++ b/snakemake/output_index.py
@@ -43,11 +43,13 @@ class OutputIndex:
 
     def match(self, f):
         node = self.root
+        rules = set()
         for c in f:
             for rule in node.rules:
-                yield rule
+                rules.add(rule)
             node = node.children.get(c, None)
             if node is None:
-                return
+                return rules
         for rule in node.rules:
-            yield rule
+            rules.add(rule)
+        return rules
diff --git a/snakemake/parser.py b/snakemake/parser.py
index 7fda2be..6908f80 100644
--- a/snakemake/parser.py
+++ b/snakemake/parser.py
@@ -437,6 +437,7 @@ class Shell(Run):
     def end(self):
         # the end is detected. So we can savely reset the indent to zero here
         self.indent = 0
+        yield "\n"
         yield ")"
         yield "\n"
         for t in super().start():
@@ -632,6 +633,10 @@ class OnError(DecoratorKeywordState):
     decorator = "onerror"
     args = ["log"]
 
+class OnStart(DecoratorKeywordState):
+    decorator = "onstart"
+    args = ["log"]
+
 
 class Python(TokenAutomaton):
 
@@ -643,7 +648,8 @@ class Python(TokenAutomaton):
                        subworkflow=Subworkflow,
                        localrules=Localrules,
                        onsuccess=OnSuccess,
-                       onerror=OnError)
+                       onerror=OnError,
+                       onstart=OnStart)
 
     def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
         super().__init__(snakefile,
diff --git a/snakemake/persistence.py b/snakemake/persistence.py
index b33b519..0bc77e6 100644
--- a/snakemake/persistence.py
+++ b/snakemake/persistence.py
@@ -65,11 +65,13 @@ class Persistence:
             for lockfile in self._locks("input"):
                 with open(lockfile) as lock:
                     for f in lock:
+                        f = f.strip()
                         if f in outputfiles:
                             return True
             for lockfile in self._locks("output"):
                 with open(lockfile) as lock:
                     for f in lock:
+                        f = f.strip()
                         if f in outputfiles or f in inputfiles:
                             return True
         return False
diff --git a/snakemake/remote/FTP.py b/snakemake/remote/FTP.py
index da268d9..c11ff17 100644
--- a/snakemake/remote/FTP.py
+++ b/snakemake/remote/FTP.py
@@ -82,7 +82,11 @@ class RemoteObject(DomainObject):
     def mtime(self):
         if self.exists():
             with self.ftpc() as ftpc:
-                ftpc.synchronize_times()
+                try:
+                    # requires write access
+                    ftpc.synchronize_times()
+                except:
+                    pass
                 return ftpc.path.getmtime(self.remote_path)
         else:
             raise SFTPFileException("The file does not seem to exist remotely: %s" % self.file())
@@ -100,7 +104,11 @@ class RemoteObject(DomainObject):
                 # if the destination path does not exist
                 if make_dest_dirs:
                     os.makedirs(os.path.dirname(self.local_path), exist_ok=True)
-                ftpc.synchronize_times()
+                try:
+                    # requires write access
+                    ftpc.synchronize_times()
+                except:
+                    pass
                 ftpc.download(source=self.remote_path, target=self.local_path)
             else:
                 raise SFTPFileException("The file does not seem to exist remotely: %s" % self.file())
diff --git a/snakemake/remote/dropbox.py b/snakemake/remote/dropbox.py
index bf3a0de..8067ba8 100644
--- a/snakemake/remote/dropbox.py
+++ b/snakemake/remote/dropbox.py
@@ -9,14 +9,15 @@ from contextlib import contextmanager
 # module-specific
 from snakemake.remote import AbstractRemoteProvider, AbstractRemoteObject
 from snakemake.exceptions import DropboxFileException, WorkflowError
-import snakemake.io 
+import snakemake.io
 
 try:
     # third-party modules
     import dropbox # The official Dropbox API library
 except ImportError as e:
-    raise WorkflowError("The Python 3 package 'dropbox' " + 
-        "must be installed to use Dropbox remote() file functionality. %s" % e.msg)
+    raise WorkflowError("The Python 3 package 'dropbox' "
+                        "must be installed to use Dropbox remote() file "
+                        "functionality. %s" % e.msg)
 
 
 class RemoteProvider(AbstractRemoteProvider):
@@ -47,9 +48,9 @@ class RemoteObject(AbstractRemoteObject):
                 self._dropboxc.users_get_current_account()
             except dropbox.exceptions.AuthError as err:
                     DropboxFileException("ERROR: Invalid Dropbox OAuth access token; try re-generating an access token from the app console on the web.")
-        
+
     # === Implementations of abstract class members ===
-  
+
     def exists(self):
         try:
             metadata = self._dropboxc.files_get_metadata(self.remote_file())
@@ -83,8 +84,26 @@ class RemoteObject(AbstractRemoteObject):
             raise DropboxFileException("The file does not seem to exist remotely: %s" % self.remote_file())
 
     def upload(self, mode=dropbox.files.WriteMode('overwrite')):
-        with open(self.file(),'rb') as f:
-            self._dropboxc.files_upload(f, self.remote_file(), mode=mode)
+        size = os.path.getsize(self.file())
+        # Chunk file into 10MB slices because Dropbox does not accept more than 150MB chunks
+        chunksize = 10000000
+        with open(self.file(), mode='rb') as f:
+            data = f.read(chunksize)
+            # Start upload session
+            res = self._dropboxc.files_upload_session_start(data)
+            offset = len(data)
+
+            # Upload further chunks until file is complete
+            while len(data) == chunksize:
+                data = f.read(chunksize)
+                self._dropboxc.files_upload_session_append(data, res.session_id, offset)
+                offset += len(data)
+
+            # Finish session and store in the desired path
+            self._dropboxc.files_upload_session_finish(
+                f.read(chunksize),
+                dropbox.files.UploadSessionCursor(res.session_id, offset),
+                dropbox.files.CommitInfo(path=self.remote_file(), mode=mode))
 
     def remote_file(self):
         return "/"+self.file() if not self.file().startswith("/") else self.file()
diff --git a/snakemake/rules.py b/snakemake/rules.py
index cdd0d00..1f40a4e 100644
--- a/snakemake/rules.py
+++ b/snakemake/rules.py
@@ -295,19 +295,9 @@ class Rule:
             self._set_params_item(item, name=name)
 
     def _set_params_item(self, item, name=None):
-        if not_iterable(item) or callable(item):
-            self.params.append(item)
-            if name:
-                self.params.add_name(name)
-        else:
-            try:
-                start = len(self.params)
-                for i in item:
-                    self._set_params_item(i)
-                if name:
-                    self.params.set_name(name, start, end=len(self.params))
-            except TypeError:
-                raise SyntaxError("Params have to be specified as strings.")
+        self.params.append(item)
+        if name:
+            self.params.add_name(name)
 
     @property
     def log(self):
@@ -355,19 +345,17 @@ class Rule:
                 return apply_wildcards(p, wildcards)
             return p
 
-        def check_input_function(f):
-            if (not_iterable(f) and not isinstance(f, str)) or not all(isinstance(f_, str) for f_ in f):
+        def check_string_type(f):
+            if not isinstance(f, str):
                 raise RuleException(
                     "Input function did not return str or list of str.",
                     rule=self)
 
-        def check_param_function(f):
-            pass
-
         def _apply_wildcards(newitems, olditems, wildcards, wildcards_obj,
                              concretize=apply_wildcards,
-                             check_function_return=check_input_function,
-                             ruleio=None):
+                             check_return_type=check_string_type,
+                             ruleio=None,
+                             no_flattening=False):
             for name, item in olditems.allitems():
                 start = len(newitems)
                 is_iterable = True
@@ -376,13 +364,13 @@ class Rule:
                     try:
                         item = item(wildcards_obj)
                     except (Exception, BaseException) as e:
-                        raise InputFunctionException(e, rule=self)
-                    check_function_return(item)
+                        raise InputFunctionException(e, rule=self, wildcards=wildcards)
 
-                if not_iterable(item):
+                if not_iterable(item) or no_flattening:
                     item = [item]
                     is_iterable = False
                 for item_ in item:
+                    check_return_type(item_)
                     concrete = concretize(item_, wildcards)
                     newitems.append(concrete)
                     if ruleio is not None:
@@ -414,9 +402,12 @@ class Rule:
                              ruleio=ruleio)
 
             params = Params()
+            #When applying wildcards to params, the return type need not be
+            #a string, so the check is disabled.
             _apply_wildcards(params, self.params, wildcards, wildcards_obj,
                              concretize=concretize_param,
-                             check_function_return=check_param_function)
+                             check_return_type=lambda x: None,
+                             no_flattening=True)
 
             output = OutputFiles(o.apply_wildcards(wildcards)
                                  for o in self.output)
diff --git a/snakemake/script.py b/snakemake/script.py
index f059b66..317f128 100644
--- a/snakemake/script.py
+++ b/snakemake/script.py
@@ -6,6 +6,7 @@ __license__ = "MIT"
 import inspect
 import os
 import traceback
+import collections
 from urllib.request import urlopen
 from urllib.error import URLError
 
@@ -18,17 +19,25 @@ class REncoder:
 
     @classmethod
     def encode_value(cls, value):
-        if isinstance(value, list):
+        if isinstance(value, str):
+            return repr(value)
+        if isinstance(value, collections.Iterable):
+            # convert all iterables to vectors
             return cls.encode_list(value)
         elif isinstance(value, dict):
             return cls.encode_dict(value)
-        elif isinstance(value, str):
-            return repr(value)
         elif isinstance(value, bool):
             return "TRUE" if value else "FALSE"
         elif isinstance(value, int) or isinstance(value, float):
             return str(value)
         else:
+            # Try to convert from numpy if numpy is present
+            try:
+                import numpy as np
+                if isinstance(value, np.number):
+                    return str(value)
+            except ImportError:
+                pass
             raise ValueError(
                 "Unsupported value for conversion into R: {}".format(value))
 
@@ -82,8 +91,11 @@ def script(basedir, path, input, output, params, wildcards, threads, resources,
     Supports Python 3 and R.
     """
     if not path.startswith("http"):
-        path = path.lstrip("file://")
-        path = "file://" + os.path.abspath(os.path.join(basedir, path))
+        if path.startswith("file://"):
+            path = path[7:]
+        if not os.path.isabs(path):
+            path = os.path.abspath(os.path.join(basedir, path))
+        path = "file://" + path
     path = format(path, stepout=1)
 
     try:
diff --git a/snakemake/shell.py b/snakemake/shell.py
index 2c2a2c2..a7f569d 100644
--- a/snakemake/shell.py
+++ b/snakemake/shell.py
@@ -55,7 +55,7 @@ class shell:
 
         proc = sp.Popen("{} {} {}".format(
                             cls._process_prefix,
-                            cmd,
+                            cmd.rstrip(),
                             cls._process_suffix),
                         bufsize=-1,
                         shell=True,
diff --git a/snakemake/version.py b/snakemake/version.py
index 4c5818f..b202327 100644
--- a/snakemake/version.py
+++ b/snakemake/version.py
@@ -1 +1 @@
-__version__ = "3.5.5"
+__version__ = "3.6.1"
diff --git a/snakemake/workflow.py b/snakemake/workflow.py
index 42490a3..0363dae 100644
--- a/snakemake/workflow.py
+++ b/snakemake/workflow.py
@@ -70,6 +70,7 @@ class Workflow:
         self.config_args = config_args
         self._onsuccess = lambda log: None
         self._onerror = lambda log: None
+        self._onstart = lambda log: None
         self.debug = debug
         self._rulecount = 0
 
@@ -455,6 +456,9 @@ class Workflow:
         if dryrun and not len(dag):
             logger.info("Nothing to be done.")
 
+        if not dryrun and not no_hooks:
+            self._onstart(logger.get_logfile())
+
         success = scheduler.schedule()
 
         if success:
@@ -518,6 +522,9 @@ class Workflow:
             self.first_rule = first_rule
         self.included_stack.pop()
 
+    def onstart(self, func):
+        self._onstart = func
+
     def onsuccess(self, func):
         self._onsuccess = func
 
diff --git a/tests/test01/Snakefile b/tests/test01/Snakefile
index b943a2a..e9defea 100644
--- a/tests/test01/Snakefile
+++ b/tests/test01/Snakefile
@@ -9,6 +9,11 @@ min_version("2.1")
 TEST = "abc"
 
 
+onstart:
+    print("Workflow starting")
+    print("Log:")
+    print(log)
+
 onsuccess:
     print("Workflow finished")
     print("Log:")
diff --git a/tests/test_input_generator/Snakefile b/tests/test_input_generator/Snakefile
new file mode 100644
index 0000000..bc3db43
--- /dev/null
+++ b/tests/test_input_generator/Snakefile
@@ -0,0 +1,33 @@
+# What if an input function returns a generator rather than a list?
+# Pythonically, generators are preferred to lists.
+
+#This always worked fine:
+def input_func1(wildcards):
+    return [ "%s.%d.in" % (wildcards.base, n) for n in range(1,4) ]
+
+#This fails:
+def input_func2(wildcards):
+    return ( "%s.%d.in" % (wildcards.base, n) for n in range(1,4) )
+
+#As does this
+def input_func3(wildcards):
+    for n in range(1,4):
+        yield "%s.%d.in" % (wildcards.base, n)
+
+rule main:
+    input: "foo.out1", "foo.out2", "foo.out3"
+
+rule test1:
+    output: "{base}.out1"
+    input: input_func1
+    shell: "cat {input} > {output}"
+
+rule test2:
+    output: "{base}.out2"
+    input: input_func2
+    shell: "cat {input} > {output}"
+
+rule test3:
+    output: "{base}.out3"
+    input: input_func3
+    shell: "cat {input} > {output}"
diff --git a/tests/test_input_generator/expected-results/foo.out1 b/tests/test_input_generator/expected-results/foo.out1
new file mode 100644
index 0000000..01e79c3
--- /dev/null
+++ b/tests/test_input_generator/expected-results/foo.out1
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git a/tests/test_input_generator/expected-results/foo.out2 b/tests/test_input_generator/expected-results/foo.out2
new file mode 100644
index 0000000..01e79c3
--- /dev/null
+++ b/tests/test_input_generator/expected-results/foo.out2
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git a/tests/test_input_generator/expected-results/foo.out3 b/tests/test_input_generator/expected-results/foo.out3
new file mode 100644
index 0000000..01e79c3
--- /dev/null
+++ b/tests/test_input_generator/expected-results/foo.out3
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git a/tests/test_input_generator/foo.1.in b/tests/test_input_generator/foo.1.in
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/tests/test_input_generator/foo.1.in
@@ -0,0 +1 @@
+1
diff --git a/tests/test_input_generator/foo.2.in b/tests/test_input_generator/foo.2.in
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/tests/test_input_generator/foo.2.in
@@ -0,0 +1 @@
+2
diff --git a/tests/test_input_generator/foo.3.in b/tests/test_input_generator/foo.3.in
new file mode 100644
index 0000000..00750ed
--- /dev/null
+++ b/tests/test_input_generator/foo.3.in
@@ -0,0 +1 @@
+3
diff --git a/tests/test_script/Snakefile b/tests/test_script/Snakefile
index 8486300..ff9dbae 100644
--- a/tests/test_script/Snakefile
+++ b/tests/test_script/Snakefile
@@ -1,9 +1,14 @@
 
+configfile: "config.yaml"
+
+
 rule:
     input:
         "test.in"
     output:
         txt="test.out"
+    params:
+        xy=True
     script:
         "scripts/test.R"
 
diff --git a/tests/test_script/config.yaml b/tests/test_script/config.yaml
new file mode 100644
index 0000000..7fa901f
--- /dev/null
+++ b/tests/test_script/config.yaml
@@ -0,0 +1 @@
+test: true
diff --git a/tests/test_script/scripts/test.R b/tests/test_script/scripts/test.R
index 1b146b0..db02c1d 100644
--- a/tests/test_script/scripts/test.R
+++ b/tests/test_script/scripts/test.R
@@ -3,6 +3,12 @@ print(snakemake at threads)
 print(snakemake at log)
 print(snakemake at config)
 print(snakemake at params)
+if(snakemake at params[["xy"]] != TRUE) {
+    stop("Error evaluating param.")
+}
+if(snakemake at config[["test"]] != TRUE) {
+    stop("Error evaluating config.")
+}
 
 values <- scan(snakemake at input[[1]])
 write(values, file = snakemake at output[["txt"]])
diff --git a/tests/testapi.py b/tests/testapi.py
new file mode 100644
index 0000000..1598274
--- /dev/null
+++ b/tests/testapi.py
@@ -0,0 +1,13 @@
+"""
+Tests for Snakemake’s API
+"""
+from snakemake import snakemake
+import tempfile
+import os.path
+
+def test_keep_logger():
+    with tempfile.TemporaryDirectory() as tmpdir:
+        path = os.path.join(tmpdir, 'Snakefile')
+        with open(path, 'w') as f:
+            print("rule:\n  output: 'result.txt'\n  shell: 'touch {output}'", file=f)
+        snakemake(path, workdir=tmpdir, keep_logger=True)
diff --git a/tests/tests.py b/tests/tests.py
index e428e23..168b616 100644
--- a/tests/tests.py
+++ b/tests/tests.py
@@ -316,6 +316,9 @@ def test_nonstr_params():
     run(dpath("test_nonstr_params"))
 
 
+def test_input_generator():
+    run(dpath("test_input_generator"))
+
 if __name__ == '__main__':
     import nose
     nose.run(defaultTest=__name__)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/snakemake.git



More information about the debian-med-commit mailing list