[med-svn] [snakemake] 01/05: Imported Upstream version 3.7.1+dfsg
Kevin Murray
daube-guest at moszumanska.debian.org
Tue Jun 28 05:07:39 UTC 2016
This is an automated email from the git hooks/post-receive script.
daube-guest pushed a commit to branch master
in repository snakemake.
commit 944514417a5c6f0c73994869e17e9f798e31c277
Author: Kevin Murray <spam at kdmurray.id.au>
Date: Tue Jun 28 14:53:12 2016 +1000
Imported Upstream version 3.7.1+dfsg
---
CHANGELOG.md | 14 ++++++++++++++
environment.yml | 15 +++++++++++++++
setup.py | 2 +-
snakemake/__init__.py | 39 ++++++++++++++++++++++++++++++---------
snakemake/common.py | 1 +
snakemake/dag.py | 3 ++-
snakemake/executors.py | 40 +++++++++++++++++++++++++++++++++-------
snakemake/io.py | 11 +++++------
snakemake/jobs.py | 15 ++++++++++-----
snakemake/logging.py | 27 +++++++++++++++++++++------
snakemake/remote/HTTP.py | 14 ++++++++++----
snakemake/rules.py | 16 ++++++----------
snakemake/scheduler.py | 7 +++++--
snakemake/utils.py | 24 ++++++++++++++++++++++++
snakemake/version.py | 2 +-
snakemake/workflow.py | 12 +++++++-----
16 files changed, 185 insertions(+), 57 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 581916d..43dddda 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,19 @@
# Change Log
+## [3.7.1] - 2016-05-16
+### Changed
+- Fixed a missing import of the multiprocessing module.
+
+## [3.7.0] - 2016-05-05
+### Added
+- The entries in `resources` and the `threads` job attribute can now be callables that must return `int` values.
+- Multiple `--cluster-config` arguments can be given to the Snakemake command line. Later one override earlier ones.
+- In the API, multiple `cluster_config` paths can be given as a list, alternatively to the previous behaviour of expecting one string for this parameter.
+- When submitting cluster jobs (either through `--cluster` or `--drmaa`), you can now use `--max-jobs-per-second` to limit the number of jobs being submitted (also available through Snakemake API). Some cluster installations have problems with too many jobs per second.
+- Wildcard values are now printed upon job execution in addition to input and output files.
+### Changed
+- Fixed a bug with HTTP remote providers.
+
## [3.6.1] - 2016-04-08
### Changed
- Work around missing RecursionError in Python < 3.5
diff --git a/environment.yml b/environment.yml
new file mode 100644
index 0000000..828922d
--- /dev/null
+++ b/environment.yml
@@ -0,0 +1,15 @@
+channels:
+ - bioconda
+ - r
+dependencies:
+ - rpy2 >=0.7.6
+ - boto
+ - moto
+ - httpretty ==0.8.10
+ - filechunkio
+ - pyyaml
+ - nose
+ - ftputil
+ - pysftp
+ - requests
+ - dropbox
diff --git a/setup.py b/setup.py
index 6e8abeb..3536c17 100644
--- a/setup.py
+++ b/setup.py
@@ -53,7 +53,7 @@ setup(
"snakemake-bash-completion = snakemake:bash_completion"]
},
package_data={'': ['*.css', '*.sh', '*.html']},
- tests_require=['rpy2', 'docutils', 'nose>=1.3', 'boto>=2.38.0', 'filechunkio>=1.6',
+ tests_require=['rpy2', 'httpretty==0.8.10', 'docutils', 'nose>=1.3', 'boto>=2.38.0', 'filechunkio>=1.6',
'moto>=0.4.14', 'ftputil>=3.2', 'pysftp>=0.2.8', 'requests>=2.8.1', 'dropbox>=5.2'],
cmdclass={'test': NoseTestCommand},
classifiers=
diff --git a/snakemake/__init__.py b/snakemake/__init__.py
index 2e8a2ad..a5f040a 100644
--- a/snakemake/__init__.py
+++ b/snakemake/__init__.py
@@ -9,7 +9,6 @@ import glob
import argparse
from argparse import ArgumentError
import logging as _logging
-import multiprocessing
import re
import sys
import inspect
@@ -23,6 +22,7 @@ from snakemake.logging import setup_logger, logger
from snakemake.version import __version__
from snakemake.io import load_configfile
from snakemake.shell import shell
+from snakemake.utils import update_config, available_cpu_count
def snakemake(snakefile,
@@ -93,6 +93,7 @@ def snakemake(snakefile,
updated_files=None,
log_handler=None,
keep_logger=False,
+ max_jobs_per_second=None,
verbose=False):
"""Run snakemake on a given snakefile.
@@ -125,7 +126,7 @@ def snakemake(snakefile,
quiet (bool): do not print any default job information (default False)
keepgoing (bool): keep goind upon errors (default False)
cluster (str): submission command of a cluster or batch system to use, e.g. qsub (default None)
- cluster_config (str): configuration file for cluster options (default None)
+ cluster_config (str,list): configuration file for cluster options, or list thereof (default None)
cluster_sync (str): blocking cluster submission command (like SGE 'qsub -sync y') (default None)
drmaa (str): if not None use DRMAA for cluster support, str specifies native args passed to the cluster when submitting a job
jobname (str): naming scheme for cluster job scripts (default "snakejob.{rulename}.{jobid}.sh")
@@ -163,6 +164,7 @@ def snakemake(snakefile,
updated_files(list): a list that will be filled with the files that are updated or created during the workflow execution
verbose(bool): show additional debug output (default False)
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
+ max_jobs_per_second: maximal number of cluster/drmaa jobs per second, None to impose no limit (default None)
:level:
the log level ("info", "error", "debug", "progress", "job_info")
@@ -216,8 +218,18 @@ def snakemake(snakefile,
else:
nodes = sys.maxsize
+ if isinstance(cluster_config, str):
+ # Loading configuration from one file is still supported for
+ # backward compatibility
+ cluster_config = [cluster_config]
if cluster_config:
- cluster_config = load_configfile(cluster_config)
+ # Load all configuration files
+ configs = [load_configfile(f) for f in cluster_config]
+ # Merge in the order as specified, overriding earlier values with
+ # later ones
+ cluster_config = configs[0]
+ for other in configs[1:]:
+ update_config(cluster_config, other)
else:
cluster_config = dict()
@@ -356,6 +368,7 @@ def snakemake(snakefile,
cluster_sync=cluster_sync,
jobname=jobname,
drmaa=drmaa,
+ max_jobs_per_second=max_jobs_per_second,
printd3dag=printd3dag,
immediate_submit=immediate_submit,
ignore_ambiguity=ignore_ambiguity,
@@ -478,7 +491,7 @@ def get_argument_parser():
parser.add_argument(
"--cores", "--jobs", "-j",
action="store",
- const=multiprocessing.cpu_count(),
+ const=available_cpu_count(),
nargs="?",
metavar="N",
type=int,
@@ -488,7 +501,7 @@ def get_argument_parser():
parser.add_argument(
"--local-cores",
action="store",
- default=multiprocessing.cpu_count(),
+ default=available_cpu_count(),
metavar="N",
type=int,
help=
@@ -679,12 +692,15 @@ def get_argument_parser():
parser.add_argument(
"--cluster-config", "-u",
metavar="FILE",
+ default=[],
+ action="append",
help=
("A JSON or YAML file that defines the wildcards used in 'cluster'"
- "for specific rules, instead of having them specified in the Snakefile."
+ "for specific rules, instead of having them specified in the Snakefile. "
"For example, for rule 'job' you may define: "
- "{ 'job' : { 'time' : '24:00:00' } } "
- "to specify the time for rule 'job'.\n")),
+ "{ 'job' : { 'time' : '24:00:00' } } to specify the time for rule 'job'. "
+ "You can specify more than one file. The configuration files are merged "
+ "with later values overriding earlier ones.")),
parser.add_argument(
"--immediate-submit", "--is",
action="store_true",
@@ -811,6 +827,10 @@ def get_argument_parser():
nargs="+",
help=
"Only use given rules. If omitted, all rules in Snakefile are used.")
+ parser.add_argument(
+ "--max-jobs-per-second", default=None, type=float,
+ help=
+ "Maximal number of cluster/drmaa jobs per second, default is no limit")
parser.add_argument('--timestamp', '-T',
action='store_true',
help='Add a timestamp to all logging output')
@@ -993,7 +1013,8 @@ def main():
wait_for_files=args.wait_for_files,
keep_target_files=args.keep_target_files,
keep_shadow=args.keep_shadow,
- allowed_rules=args.allowed_rules)
+ allowed_rules=args.allowed_rules,
+ max_jobs_per_second=args.max_jobs_per_second)
if args.profile:
with open(args.profile, "w") as out:
diff --git a/snakemake/common.py b/snakemake/common.py
new file mode 100644
index 0000000..916866c
--- /dev/null
+++ b/snakemake/common.py
@@ -0,0 +1 @@
+DYNAMIC_FILL = "__snakemake_dynamic__"
diff --git a/snakemake/dag.py b/snakemake/dag.py
index 1b33df1..cdc57a0 100644
--- a/snakemake/dag.py
+++ b/snakemake/dag.py
@@ -24,6 +24,7 @@ from snakemake.exceptions import RemoteFileException, WorkflowError
from snakemake.exceptions import UnexpectedOutputException, InputFunctionException
from snakemake.logging import logger
from snakemake.output_index import OutputIndex
+from snakemake.common import DYNAMIC_FILL
# Workaround for Py <3.5 prior to existence of RecursionError
try:
@@ -936,7 +937,7 @@ class DAG:
def format_wildcard(wildcard):
name, value = wildcard
- if _IOFile.dynamic_fill in value:
+ if DYNAMIC_FILL in value:
value = "..."
return "{}: {}".format(name, value)
diff --git a/snakemake/executors.py b/snakemake/executors.py
index 27f1061..310cf50 100644
--- a/snakemake/executors.py
+++ b/snakemake/executors.py
@@ -93,6 +93,7 @@ class AbstractExecutor:
job.dynamic_output)),
log=list(job.log),
benchmark=job.benchmark,
+ wildcards=job.wildcards_dict,
reason=str(self.dag.reason(job)),
resources=job.resources_dict,
priority="highest"
@@ -269,7 +270,9 @@ class ClusterExecutor(RealExecutor):
printshellcmds=False,
latency_wait=3,
benchmark_repeats=1,
- cluster_config=None, local_input=None):
+ cluster_config=None,
+ local_input=None,
+ max_jobs_per_second=None):
local_input = local_input or []
super().__init__(workflow, dag,
printreason=printreason,
@@ -315,6 +318,12 @@ class ClusterExecutor(RealExecutor):
self.cores = cores if cores else ""
self.cluster_config = cluster_config if cluster_config else dict()
+ self.max_jobs_per_second = max_jobs_per_second
+ if self.max_jobs_per_second:
+ self.rate_lock = threading.RLock()
+ self.rate_interval = 1 / self.max_jobs_per_second
+ self.rate_last_called = 0
+
self.active_jobs = list()
self.lock = threading.Lock()
self.wait = True
@@ -331,7 +340,18 @@ class ClusterExecutor(RealExecutor):
def cancel(self):
self.shutdown()
+ def _limit_rate(self):
+ """Called in ``_run()`` for rate-limiting"""
+ with self.rate_lock:
+ elapsed = time.clock() - self.rate_last_called
+ wait = self.rate_interval - elapsed
+ if wait > 0:
+ time.sleep(wait)
+ self.rate_last_called = time.clock()
+
def _run(self, job, callback=None, error_callback=None):
+ if self.max_jobs_per_second:
+ self._limit_rate()
super()._run(job, callback=callback, error_callback=error_callback)
logger.shellcmd(job.shellcmd)
@@ -420,7 +440,8 @@ class GenericClusterExecutor(ClusterExecutor):
quiet=False,
printshellcmds=False,
latency_wait=3,
- benchmark_repeats=1):
+ benchmark_repeats=1,
+ max_jobs_per_second=None):
super().__init__(workflow, dag, cores,
jobname=jobname,
printreason=printreason,
@@ -428,7 +449,8 @@ class GenericClusterExecutor(ClusterExecutor):
printshellcmds=printshellcmds,
latency_wait=latency_wait,
benchmark_repeats=benchmark_repeats,
- cluster_config=cluster_config)
+ cluster_config=cluster_config,
+ max_jobs_per_second=max_jobs_per_second)
self.submitcmd = submitcmd
self.external_jobid = dict()
self.exec_job += ' && touch "{jobfinished}" || touch "{jobfailed}"'
@@ -525,7 +547,8 @@ class SynchronousClusterExecutor(ClusterExecutor):
quiet=False,
printshellcmds=False,
latency_wait=3,
- benchmark_repeats=1):
+ benchmark_repeats=1,
+ max_jobs_per_second=None):
super().__init__(workflow, dag, cores,
jobname=jobname,
printreason=printreason,
@@ -533,7 +556,8 @@ class SynchronousClusterExecutor(ClusterExecutor):
printshellcmds=printshellcmds,
latency_wait=latency_wait,
benchmark_repeats=benchmark_repeats,
- cluster_config=cluster_config, )
+ cluster_config=cluster_config,
+ max_jobs_per_second=max_jobs_per_second)
self.submitcmd = submitcmd
self.external_jobid = dict()
@@ -609,7 +633,8 @@ class DRMAAExecutor(ClusterExecutor):
drmaa_args="",
latency_wait=3,
benchmark_repeats=1,
- cluster_config=None, ):
+ cluster_config=None,
+ max_jobs_per_second=None):
super().__init__(workflow, dag, cores,
jobname=jobname,
printreason=printreason,
@@ -617,7 +642,8 @@ class DRMAAExecutor(ClusterExecutor):
printshellcmds=printshellcmds,
latency_wait=latency_wait,
benchmark_repeats=benchmark_repeats,
- cluster_config=cluster_config, )
+ cluster_config=cluster_config,
+ max_jobs_per_second=max_jobs_per_second)
try:
import drmaa
except ImportError:
diff --git a/snakemake/io.py b/snakemake/io.py
index 16ee45c..384e300 100644
--- a/snakemake/io.py
+++ b/snakemake/io.py
@@ -15,6 +15,7 @@ from collections import Iterable, namedtuple
from snakemake.exceptions import MissingOutputException, WorkflowError, WildcardError, RemoteFileException
from snakemake.logging import logger
from inspect import isfunction, ismethod
+from snakemake.common import DYNAMIC_FILL
def lstat(f):
@@ -47,8 +48,6 @@ class _IOFile(str):
A file that is either input or output of a rule.
"""
- dynamic_fill = "__snakemake_dynamic__"
-
def __new__(cls, file):
obj = str.__new__(cls, file)
obj._is_function = isfunction(file) or ismethod(file)
@@ -168,7 +167,7 @@ class _IOFile(str):
self.remote_object.upload()
def prepare(self):
- path_until_wildcard = re.split(self.dynamic_fill, self.file)[0]
+ path_until_wildcard = re.split(DYNAMIC_FILL, self.file)[0]
dir = os.path.dirname(path_until_wildcard)
if len(dir) > 0 and not os.path.exists(dir):
try:
@@ -191,7 +190,7 @@ class _IOFile(str):
lchmod(self.file, mode)
def remove(self, remove_non_empty_dir=False):
- remove(self.file, remove_non_empty_dir=False)
+ remove(self.file, remove_non_empty_dir)
def touch(self, times=None):
""" times must be 2-tuple: (atime, mtime) """
@@ -231,7 +230,7 @@ class _IOFile(str):
wildcards,
fill_missing=fill_missing,
fail_dynamic=fail_dynamic,
- dynamic_fill=self.dynamic_fill),
+ dynamic_fill=DYNAMIC_FILL),
rule=self.rule)
file_with_wildcards_applied.clone_flags(self)
@@ -260,7 +259,7 @@ class _IOFile(str):
return self.regex().match(target) or None
def format_dynamic(self):
- return self.replace(self.dynamic_fill, "{*}")
+ return self.replace(DYNAMIC_FILL, "{*}")
def clone_flags(self, other):
if isinstance(self._file, str):
diff --git a/snakemake/jobs.py b/snakemake/jobs.py
index 4e3a835..53bab86 100644
--- a/snakemake/jobs.py
+++ b/snakemake/jobs.py
@@ -18,6 +18,7 @@ from snakemake.utils import format, listfiles
from snakemake.exceptions import RuleException, ProtectedOutputException
from snakemake.exceptions import UnexpectedOutputException
from snakemake.logging import logger
+from snakemake.common import DYNAMIC_FILL
def jobfiles(jobs, type):
@@ -41,11 +42,15 @@ class Job:
self.ruleio,
self.dependencies) = rule.expand_wildcards(self.wildcards_dict)
- self.resources_dict = {
- name: min(
+ self.resources_dict = {}
+ for name, res in rule.resources.items():
+ if callable(res):
+ res = res(self.wildcards)
+ if not isinstance(res, int):
+ raise ValueError("Callable for resources must return int")
+ self.resources_dict[name] = min(
self.rule.workflow.global_resources.get(name, res), res)
- for name, res in rule.resources.items()
- }
+
self.threads = self.resources_dict["_cores"]
self.resources = Resources(fromdict=self.resources_dict)
self.shadow_dir = None
@@ -491,7 +496,7 @@ class Job:
""" Expand dynamic files. """
return list(listfiles(pattern,
restriction=self.wildcards,
- omit_value=_IOFile.dynamic_fill))
+ omit_value=DYNAMIC_FILL))
class Reason:
diff --git a/snakemake/logging.py b/snakemake/logging.py
index 9e52546..b604cca 100644
--- a/snakemake/logging.py
+++ b/snakemake/logging.py
@@ -11,6 +11,9 @@ import os
import json
from multiprocessing import Lock
import tempfile
+from functools import partial
+
+from snakemake.common import DYNAMIC_FILL
class ColorizingStreamHandler(_logging.StreamHandler):
@@ -171,25 +174,33 @@ class Logger:
yield "{}rule {}:".format("local" if msg["local"] else "",
msg["name"])
- for item in "input output log".split():
+ for item in ["input", "output", "log"]:
fmt = format_item(item, omit=[], valueformat=", ".join)
if fmt != None:
yield fmt
+
singleitems = ["benchmark"]
- if self.printreason:
- singleitems.append("reason")
for item in singleitems:
fmt = format_item(item, omit=None)
if fmt != None:
yield fmt
+
+ wildcards = format_wildcards(msg["wildcards"])
+ if wildcards:
+ yield "\twildcards: " + wildcards
+
for item, omit in zip("priority threads".split(), [0, 1]):
fmt = format_item(item, omit=omit)
if fmt != None:
yield fmt
+
resources = format_resources(msg["resources"])
if resources:
yield "\tresources: " + resources
+ if self.printreason:
+ singleitems.append("reason")
+
level = msg["level"]
if level == "info":
self.logger.warning(msg["msg"])
@@ -228,10 +239,14 @@ class Logger:
print(json.dumps({"nodes": msg["nodes"], "links": msg["edges"]}))
-def format_resources(resources, omit_resources="_cores _nodes".split()):
+def format_dict(dict, omit_keys=[], omit_values=[]):
return ", ".join("{}={}".format(name, value)
- for name, value in resources.items()
- if name not in omit_resources)
+ for name, value in dict.items()
+ if name not in omit_keys and value not in omit_values)
+
+
+format_resources = partial(format_dict, omit_keys={"_cores", "_nodes"})
+format_wildcards = partial(format_dict, omit_values={DYNAMIC_FILL})
def format_resource_names(resources, omit_resources="_cores _nodes".split()):
diff --git a/snakemake/remote/HTTP.py b/snakemake/remote/HTTP.py
index 11959f6..af035ee 100644
--- a/snakemake/remote/HTTP.py
+++ b/snakemake/remote/HTTP.py
@@ -56,11 +56,17 @@ class RemoteObject(DomainObject):
for k,v in self.kwargs.items():
kwargs_to_use[k] = v
+ # Check that in case authentication kwargs are provided, they are either ("username", "password") combination
+ # or "auth", but not both.
+ if kwargs_to_use["username"] and kwargs_to_use["password"] and kwargs_to_use["auth"]:
+ raise TypeError("Authentication accepts either username and password or requests.auth object")
+ # If "username" and "password" kwargs are provided, use those to construct a tuple for "auth". Neither
+ # requests.head() nor requests.get() accept them as-is.
if kwargs_to_use["username"] and kwargs_to_use["password"]:
- kwargs_to_use["auth"] = ('user', 'pass')
- else:
- del kwargs_to_use["username"]
- del kwargs_to_use["password"]
+ kwargs_to_use["auth"] = (kwargs_to_use["username"], kwargs_to_use["password"])
+ # Delete "username" and "password" from kwargs
+ del kwargs_to_use["username"]
+ del kwargs_to_use["password"]
url = self._iofile._file + self.additional_request_string
# default to HTTPS
diff --git a/snakemake/rules.py b/snakemake/rules.py
index 1f40a4e..f4ad9b1 100644
--- a/snakemake/rules.py
+++ b/snakemake/rules.py
@@ -238,18 +238,14 @@ class Rule:
self.dependencies[item] = item.rule
_item = IOFile(item, rule=self)
if is_flagged(item, "temp"):
- if not output:
- raise SyntaxError("Only output files may be temporary")
- self.temp_output.add(_item)
+ if output:
+ self.temp_output.add(_item)
if is_flagged(item, "protected"):
- if not output:
- raise SyntaxError("Only output files may be protected")
- self.protected_output.add(_item)
+ if output:
+ self.protected_output.add(_item)
if is_flagged(item, "touch"):
- if not output:
- raise SyntaxError(
- "Only output files may be marked for touching.")
- self.touch_output.add(_item)
+ if output:
+ self.touch_output.add(_item)
if is_flagged(item, "dynamic"):
if output:
self.dynamic_output.add(_item)
diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py
index ca7a1ce..fa7f58f 100644
--- a/snakemake/scheduler.py
+++ b/snakemake/scheduler.py
@@ -40,6 +40,7 @@ class JobScheduler:
printreason=False,
printshellcmds=False,
keepgoing=False,
+ max_jobs_per_second=None,
latency_wait=3,
benchmark_repeats=1,
greediness=1.0):
@@ -111,7 +112,8 @@ class JobScheduler:
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
- benchmark_repeats=benchmark_repeats, )
+ benchmark_repeats=benchmark_repeats,
+ max_jobs_per_second=max_jobs_per_second)
if immediate_submit:
self.job_reward = self.dryrun_job_reward
self._submit_callback = partial(self._proceed,
@@ -128,7 +130,8 @@ class JobScheduler:
printshellcmds=printshellcmds,
latency_wait=latency_wait,
benchmark_repeats=benchmark_repeats,
- cluster_config=cluster_config, )
+ cluster_config=cluster_config,
+ max_jobs_per_second=max_jobs_per_second)
else:
# local execution or execution of cluster job
# calculate how many parallel workers the executor shall spawn
diff --git a/snakemake/utils.py b/snakemake/utils.py
index fbfa9ed..daf742e 100644
--- a/snakemake/utils.py
+++ b/snakemake/utils.py
@@ -11,6 +11,7 @@ import inspect
import textwrap
from itertools import chain
from collections import Mapping
+import multiprocessing
from snakemake.io import regex, Namedlist
from snakemake.logging import logger
@@ -259,3 +260,26 @@ def set_protected_output(*rules):
logger.debug(
"setting output of rule '{rule}' to protected".format(rule=rule))
rule.protected_output = set(rule.output)
+
+
+def available_cpu_count():
+ """
+ Return the number of available virtual or physical CPUs on this system.
+ The number of available CPUs can be smaller than the total number of CPUs
+ when the cpuset(7) mechanism is in use, as is the case on some cluster
+ systems.
+
+ Adapted from http://stackoverflow.com/a/1006301/715090
+ """
+ try:
+ with open('/proc/self/status') as f:
+ status = f.read()
+ m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', status)
+ if m:
+ res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
+ if res > 0:
+ return res
+ except IOError:
+ pass
+
+ return multiprocessing.cpu_count()
diff --git a/snakemake/version.py b/snakemake/version.py
index b202327..975f691 100644
--- a/snakemake/version.py
+++ b/snakemake/version.py
@@ -1 +1 @@
-__version__ = "3.6.1"
+__version__ = "3.7.1"
diff --git a/snakemake/workflow.py b/snakemake/workflow.py
index 0363dae..3cee989 100644
--- a/snakemake/workflow.py
+++ b/snakemake/workflow.py
@@ -212,6 +212,7 @@ class Workflow:
keep_target_files=False,
keep_shadow=False,
allowed_rules=None,
+ max_jobs_per_second=None,
greediness=1.0,
no_hooks=False):
@@ -234,7 +235,7 @@ class Workflow:
if not targets:
targets = [self.first_rule
] if self.first_rule is not None else list()
-
+
if prioritytargets is None:
prioritytargets = list()
if forcerun is None:
@@ -422,6 +423,7 @@ class Workflow:
cluster_config=cluster_config,
cluster_sync=cluster_sync,
jobname=jobname,
+ max_jobs_per_second=max_jobs_per_second,
immediate_submit=immediate_submit,
quiet=quiet,
keepgoing=keepgoing,
@@ -567,8 +569,8 @@ class Workflow:
if ruleinfo.params:
rule.set_params(*ruleinfo.params[0], **ruleinfo.params[1])
if ruleinfo.threads:
- if not isinstance(ruleinfo.threads, int):
- raise RuleException("Threads value has to be an integer.",
+ if not isinstance(ruleinfo.threads, int) and not callable(ruleinfo.threads):
+ raise RuleException("Threads value has to be an integer or a callable.",
rule=rule)
rule.resources["_cores"] = ruleinfo.threads
if ruleinfo.shadow_depth:
@@ -584,10 +586,10 @@ class Workflow:
args, resources = ruleinfo.resources
if args:
raise RuleException("Resources have to be named.")
- if not all(map(lambda r: isinstance(r, int),
+ if not all(map(lambda r: isinstance(r, int) or callable(r),
resources.values())):
raise RuleException(
- "Resources values have to be integers.",
+ "Resources values have to be integers or callables",
rule=rule)
rule.resources.update(resources)
if ruleinfo.priority:
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/snakemake.git
More information about the debian-med-commit
mailing list