[med-svn] [Git][med-team/snakemake][upstream] New upstream version 5.19.3
Rebecca N. Palmer
gitlab at salsa.debian.org
Sun Jun 28 23:05:03 BST 2020
Rebecca N. Palmer pushed to branch upstream at Debian Med / snakemake
Commits:
f52d699e by Rebecca N. Palmer at 2020-06-28T22:07:37+01:00
New upstream version 5.19.3
- - - - -
13 changed files:
- CHANGELOG.rst
- docs/executor_tutorial/google_lifesciences.rst
- snakemake/_version.py
- snakemake/dag.py
- snakemake/executors/google_lifesciences.py
- snakemake/io.py
- snakemake/jobs.py
- snakemake/remote/GS.py
- snakemake/remote/__init__.py
- snakemake/script.py
- snakemake/workflow.py
- tests/test_directory/Snakefile
- + tests/test_directory/input_dir/child
Changes:
=====================================
CHANGELOG.rst
=====================================
@@ -1,3 +1,12 @@
+[5.19.3] - 2020-06-16
+=====================
+Changed
+-------
+- Performance improvements for DAG generation (up to 7x in the google cloud, anything from a little to massive in a cluster, depending on the overall filesystem performance).
+- Made harcoded bucket in google cloud executor configurable.
+- Improved speed of --unlock command.
+
+
[5.19.2] - 2020-06-04
=====================
Changed
=====================================
docs/executor_tutorial/google_lifesciences.rst
=====================================
@@ -15,6 +15,7 @@ To go through this tutorial, you need the following software installed:
* Python_ ≥3.5
* Snakemake_ ≥5.16
+* git
You should install conda as outlined in the :ref:`tutorial <tutorial-setup>`,
and then install full snakemake with:
=====================================
snakemake/_version.py
=====================================
@@ -22,9 +22,9 @@ def get_keywords():
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
- git_refnames = " (HEAD -> master, tag: v5.19.2)"
- git_full = "3df86e764bc50c069163267ad2d95d02564c2c65"
- git_date = "2020-06-04 16:34:17 +0200"
+ git_refnames = " (tag: v5.19.3)"
+ git_full = "741edadfe9f6a380a395e7b10723104a4453b0f3"
+ git_date = "2020-06-16 20:40:22 +0200"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
=====================================
snakemake/dag.py
=====================================
@@ -17,7 +17,7 @@ import uuid
import math
from snakemake.io import PeriodicityDetector, wait_for_files, is_flagged
-from snakemake.jobs import Job, Reason, GroupJob
+from snakemake.jobs import Reason, JobFactory, GroupJobFactory, Job
from snakemake.exceptions import MissingInputException
from snakemake.exceptions import MissingRuleException, AmbiguousRuleException
from snakemake.exceptions import CyclicGraphException, MissingOutputException
@@ -124,6 +124,9 @@ class DAG:
self._progress = 0
self._group = dict()
+ self.job_factory = JobFactory()
+ self.group_job_factory = GroupJobFactory()
+
self.forcerules = set()
self.forcefiles = set()
self.untilrules = set()
@@ -146,6 +149,8 @@ class DAG:
if omitfiles:
self.omitfiles.update(omitfiles)
+ self.has_dynamic_rules = any(rule.dynamic_output for rule in self.rules)
+
self.omitforce = set()
self.batch = batch
@@ -318,11 +323,12 @@ class DAG:
def check_dynamic(self):
"""Check dynamic output and update downstream rules if necessary."""
- for job in filter(
- lambda job: (job.dynamic_output and not self.needrun(job)), self.jobs
- ):
- self.update_dynamic(job)
- self.postprocess()
+ if self.has_dynamic_rules:
+ for job in filter(
+ lambda job: (job.dynamic_output and not self.needrun(job)), self.jobs
+ ):
+ self.update_dynamic(job)
+ self.postprocess()
def is_edit_notebook_job(self, job):
return self.workflow.edit_notebook and job.targetfile in self.targetfiles
@@ -763,6 +769,13 @@ class DAG:
producer = dict()
exceptions = dict()
for file, jobs in potential_dependencies.items():
+ # If possible, obtain inventory information starting from
+ # given file and store it in the IOCache.
+ # This should provide faster access to existence and mtime information
+ # than querying file by file. If the file type does not support inventory
+ # information, this call is a no-op.
+ file.inventory()
+
if not jobs:
# no producing job found
if not file.exists:
@@ -816,13 +829,23 @@ class DAG:
def update_needrun(self):
""" Update the information whether a job needs to be executed. """
- def output_mintime(job):
- for job_ in self.bfs(self.depending, job):
- t = job_.output_mintime
- if t:
- return t
+ output_mintime = dict()
- def needrun(job):
+ def update_output_mintime(job):
+ try:
+ return output_mintime[job]
+ except KeyError:
+ for job_ in chain([job], self.depending[job]):
+ try:
+ t = output_mintime[job_]
+ except KeyError:
+ t = job_.output_mintime
+ if t is not None:
+ output_mintime[job] = t
+ return
+ output_mintime[job] = None
+
+ def update_needrun(job):
reason = self.reason(job)
noinitreason = not reason
updated_subworkflow_input = self.updated_subworkflow_files.intersection(
@@ -859,7 +882,7 @@ class DAG:
)
reason.missing_output.update(missing_output)
if not reason:
- output_mintime_ = output_mintime(job)
+ output_mintime_ = output_mintime.get(job)
if output_mintime_:
updated_input = [
f for f in job.input if f.exists and f.is_newer(output_mintime_)
@@ -867,7 +890,6 @@ class DAG:
reason.updated_input.update(updated_input)
if noinitreason and reason:
reason.derived = False
- return job
reason = self.reason
_needrun = self._needrun
@@ -875,9 +897,21 @@ class DAG:
depending = self.depending
_needrun.clear()
- candidates = set(self.jobs)
+ candidates = list(self.jobs)
+
+ # Update the output mintime of all jobs.
+ # We traverse them in BFS (level order) starting from target jobs.
+ # Then, we check output mintime of job itself and all direct descendants,
+ # which have already been visited in the level before.
+ # This way, we achieve a linear runtime.
+ for job in candidates:
+ update_output_mintime(job)
+
+ # update prior reason for all candidate jobs
+ for job in candidates:
+ update_needrun(job)
- queue = list(filter(reason, map(needrun, candidates)))
+ queue = list(filter(reason, candidates))
visited = set(queue)
while queue:
job = queue.pop(0)
@@ -964,7 +998,7 @@ class DAG:
# BFS into depending needrun jobs if in same group
# Note: never go up here (into depending), because it may contain
# jobs that have been sorted out due to e.g. ruleorder.
- group = GroupJob(
+ group = self.group_job_factory.new(
job.group,
(
job
@@ -1214,7 +1248,7 @@ class DAG:
assert targetfile is not None
return self.job_cache[key]
wildcards_dict = rule.get_wildcards(targetfile)
- job = Job(
+ job = self.job_factory.new(
rule,
self,
wildcards_dict=wildcards_dict,
=====================================
snakemake/executors/google_lifesciences.py
=====================================
@@ -86,6 +86,11 @@ class GoogleLifeSciencesExecutor(ClusterExecutor):
self.container_image = container_image or get_container_image()
self.regions = regions or ["us-east1", "us-west1", "us-central1"]
+ # The project name is required, either from client or environment
+ self.project = (
+ os.environ.get("GOOGLE_CLOUD_PROJECT") or self._bucket_service.project
+ )
+
# Determine API location based on user preference, and then regions
self._set_location(location)
@@ -94,11 +99,6 @@ class GoogleLifeSciencesExecutor(ClusterExecutor):
logger.debug("location=%s" % self.location)
logger.debug("container=%s" % self.container_image)
- # The project name is required, either from client or environment
- self.project = (
- os.environ.get("GOOGLE_CLOUD_PROJECT") or self._bucket_service.project
- )
-
# Keep track of build packages to clean up shutdown, and generate
self._build_packages = set()
targz = self._generate_build_source_package()
@@ -202,7 +202,7 @@ class GoogleLifeSciencesExecutor(ClusterExecutor):
locations = (
self._api.projects()
.locations()
- .list(name="projects/snakemake-testing")
+ .list(name="projects/{}".format(self.project))
.execute()
)
=====================================
snakemake/io.py
=====================================
@@ -77,19 +77,58 @@ else:
os.chmod(f, mode)
+class ExistsDict(dict):
+ def __init__(self, cache):
+ super().__init__()
+ self.cache = cache
+
+ def __getitem__(self, path):
+ # Always return False if not dict.
+ # The reason is that this is only called if the method contains below has returned True.
+ # Hence, we already know that either path is in dict, or inventory has never
+ # seen it, and hence it does not exist.
+ return self.get(path, False)
+
+ def __contains__(self, path):
+ # if already in inventory, always return True.
+ return self.cache.in_inventory(path) or super().__contains__(path)
+
+
class IOCache:
def __init__(self):
self.mtime = dict()
- self.exists_local = dict()
- self.exists_remote = dict()
+ self.exists_local = ExistsDict(self)
+ self.exists_remote = ExistsDict(self)
self.size = dict()
+ # Indicator whether an inventory has been created for the root of a given IOFile.
+ # In case of remote objects the root is the bucket or server host.
+ self.has_inventory = set()
self.active = True
+ def get_inventory_root(self, path):
+ """If eligible for inventory, get the root of a given path.
+
+ This code does not work on local Windows paths,
+ but inventory is disabled on Windows.
+ """
+ root = path.split("/", maxsplit=1)[0]
+ if root and root != "..":
+ return root
+
+ def needs_inventory(self, path):
+ root = self.get_inventory_root(path)
+ return root and root not in self.has_inventory
+
+ def in_inventory(self, path):
+ root = self.get_inventory_root(path)
+ return root and root in self.has_inventory
+
def clear(self):
self.mtime.clear()
self.size.clear()
self.exists_local.clear()
self.exists_remote.clear()
+ self.has_inventory.clear()
def deactivate(self):
self.clear()
@@ -155,6 +194,39 @@ class _IOFile(str):
return wrapper
+ def inventory(self):
+ """Starting from the given file, try to cache as much existence and
+ modification date information of this and other files as possible.
+ """
+ cache = self.rule.workflow.iocache
+ if cache.active and cache.needs_inventory(self):
+ if self.is_remote:
+ # info not yet in inventory, let's discover as much as we can
+ self.remote_object.inventory(cache)
+ elif not ON_WINDOWS:
+ # we don't want to mess with different path representations on windows
+ self._local_inventory(cache)
+
+ def _local_inventory(self, cache):
+ # for local files, perform BFS via os.scandir to determine existence of files
+ root = cache.get_inventory_root(self)
+ if root == self:
+ # there is no root directory that could be used
+ return
+ if os.path.exists(root):
+ queue = [root]
+ while queue:
+ path = queue.pop(0)
+ cache.exists_local[path] = True
+ with os.scandir(path) as scan:
+ for entry in scan:
+ if entry.is_dir():
+ queue.append(entry.path)
+ else:
+ cache.exists_local[entry.path] = True
+
+ cache.has_inventory.add(root)
+
@contextmanager
def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None):
"""Open this file. If necessary, download it from remote first.
@@ -273,20 +345,6 @@ class _IOFile(str):
@property
@iocache
def exists_local(self):
- if self.rule.workflow.iocache.active:
- # The idea is to first check existence of parent directories and
- # cache the results.
- # We omit the last ancestor, because this is always "." or "/" or a
- # drive letter.
- for p in self.parents(omit=1):
- try:
- if not p.exists_local:
- return False
- except:
- # In case of an error, we continue, because it can be that
- # we simply don't have the permissions to access a parent
- # directory.
- continue
return os.path.exists(self.file)
@property
@@ -294,23 +352,6 @@ class _IOFile(str):
def exists_remote(self):
if not self.is_remote:
return False
- if (
- self.rule.workflow.iocache.active
- and self.remote_object.provider.allows_directories
- ):
- # The idea is to first check existence of parent directories and
- # cache the results.
- # We omit the last 2 ancestors, because these are "." and the host
- # name of the remote location.
- for p in self.parents(omit=2):
- try:
- if not p.exists_remote:
- return False
- except:
- # In case of an error, we continue, because it can be that
- # we simply don't have the permissions to access a parent
- # directory in the remote.
- continue
return self.remote_object.exists()
@property
=====================================
snakemake/jobs.py
=====================================
@@ -75,9 +75,48 @@ class AbstractJob:
raise NotImplementedError()
+class JobFactory:
+ def __init__(self):
+ self.cache = dict()
+
+ def new(
+ self,
+ rule,
+ dag,
+ wildcards_dict=None,
+ format_wildcards=None,
+ targetfile=None,
+ update=False,
+ ):
+ if rule.is_branched:
+ # for distinguishing branched rules, we need input and output in addition
+ key = (
+ rule.name,
+ *rule.output,
+ *rule.input,
+ *sorted(wildcards_dict.items()),
+ )
+ else:
+ key = (rule.name, *sorted(wildcards_dict.items()))
+ if update:
+ # cache entry has to be replaced because job shall be constructed from scratch
+ obj = Job(rule, dag, wildcards_dict, format_wildcards, targetfile)
+ self.cache[key] = obj
+ else:
+ try:
+ # try to get job from cache
+ obj = self.cache[key]
+ except KeyError:
+ obj = Job(rule, dag, wildcards_dict, format_wildcards, targetfile)
+ self.cache[key] = obj
+ return obj
+
+
class Job(AbstractJob):
HIGHEST_PRIORITY = sys.maxsize
+ obj_cache = dict()
+
__slots__ = [
"rule",
"dag",
@@ -180,16 +219,14 @@ class Job(AbstractJob):
rule=self.rule,
)
self.subworkflow_input[f] = sub
- self._hash = self.rule.__hash__()
- for wildcard_value in self.wildcards_dict.values():
- self._hash ^= wildcard_value.__hash__()
def updated(self):
- job = Job(
+ job = self.dag.job_factory.new(
self.rule,
self.dag,
wildcards_dict=self.wildcards_dict,
targetfile=self.targetfile,
+ update=True,
)
job.is_updated = True
return job
@@ -494,6 +531,7 @@ class Job(AbstractJob):
@property
def output_mintime(self):
""" Return oldest output file. """
+
existing = [f.mtime for f in self.expanded_output if f.exists]
if self.benchmark and self.benchmark.exists:
existing.append(self.benchmark.mtime)
@@ -859,24 +897,12 @@ class Job(AbstractJob):
def __repr__(self):
return self.rule.name
- def __eq__(self, other):
- if other is None:
- return False
- return (
- self.rule == other.rule
- and (self.wildcards_dict == other.wildcards_dict)
- and (self.input == other.input)
- )
-
def __lt__(self, other):
return self.rule.__lt__(other.rule)
def __gt__(self, other):
return self.rule.__gt__(other.rule)
- def __hash__(self):
- return self._hash
-
def expand_dynamic(self, pattern):
""" Expand dynamic files. """
return list(
@@ -1037,8 +1063,25 @@ class Job(AbstractJob):
return 1
+class GroupJobFactory:
+ def __init__(self):
+ self.cache = dict()
+
+ def new(self, id, jobs):
+ jobs = frozenset(jobs)
+ key = (id, jobs)
+ try:
+ obj = self.cache[key]
+ except KeyError:
+ obj = GroupJob(id, jobs)
+ self.cache[key] = obj
+ return obj
+
+
class GroupJob(AbstractJob):
+ obj_cache = dict()
+
__slots__ = [
"groupid",
"jobs",
@@ -1054,7 +1097,7 @@ class GroupJob(AbstractJob):
def __init__(self, id, jobs):
self.groupid = id
- self.jobs = frozenset(jobs)
+ self.jobs = jobs
self.toposorted = None
self._resources = None
self._input = None
=====================================
snakemake/remote/GS.py
=====================================
@@ -11,6 +11,7 @@ import struct
from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider
from snakemake.exceptions import WorkflowError, CheckSumMismatchException
from snakemake.common import lazy_property
+import snakemake.io
from snakemake.utils import os_sync
try:
@@ -126,6 +127,29 @@ class RemoteObject(AbstractRemoteObject):
self._bucket = None
self._blob = None
+ def inventory(self, cache: snakemake.io.IOCache):
+ """Using client.list_blobs(), we want to iterate over the objects in
+ the "folder" of a bucket and store information about the IOFiles in the
+ provided cache (snakemake.io.IOCache) indexed by bucket/blob name.
+ This will be called by the first mention of a remote object, and
+ iterate over the entire bucket once (and then not need to again).
+ This includes:
+ - cache.exist_remote
+ - cache_mtime
+ - cache.size
+ """
+ for blob in self.client.list_blobs(
+ self.bucket_name, prefix=os.path.dirname(self.blob.name)
+ ):
+ # By way of being listed, it exists. mtime is a datetime object
+ name = "{}/{}".format(blob.bucket.name, blob.name)
+ cache.exists_remote[name] = True
+ cache.mtime[name] = blob.updated
+ cache.size[name] = blob.size
+ # Mark bucket as having an inventory, such that this method is
+ # only called once for this bucket.
+ cache.has_inventory.add(self.bucket_name)
+
# === Implementations of abstract class members ===
@retry.Retry(predicate=google_cloud_retry_predicate)
=====================================
snakemake/remote/__init__.py
=====================================
@@ -167,6 +167,14 @@ class AbstractRemoteObject:
self.provider = provider
self.protocol = protocol
+ def inventory(self, cache: snakemake.io.IOCache):
+ """From this file, try to find as much existence and modification date
+ information as possible.
+ """
+ # If this is implemented in a remote object, results have to be stored in
+ # the given IOCache object.
+ pass
+
@property
def _file(self):
if self._iofile is None:
=====================================
snakemake/script.py
=====================================
@@ -822,7 +822,7 @@ def get_source(path, basedir="."):
if path.startswith("file://"):
sourceurl = "file:" + pathname2url(path[7:])
elif path.startswith("git+file"):
- source = git_content(path)
+ source = git_content(path).encode()
(root_path, file_path, version) = split_git_path(path)
path = path.rstrip("@" + version)
else:
=====================================
snakemake/workflow.py
=====================================
@@ -651,13 +651,6 @@ class Workflow:
self.persistence.cleanup_metadata(f)
return True
- logger.info("Building DAG of jobs...")
- dag.init()
- dag.update_checkpoint_dependencies()
- # check incomplete has to run BEFORE any call to postprocess
- dag.check_incomplete()
- dag.check_dynamic()
-
if unlock:
try:
self.persistence.cleanup_locks()
@@ -669,6 +662,14 @@ class Workflow:
"you don't have the permissions?"
)
return False
+
+ logger.info("Building DAG of jobs...")
+ dag.init()
+ dag.update_checkpoint_dependencies()
+ # check incomplete has to run BEFORE any call to postprocess
+ dag.check_incomplete()
+ dag.check_dynamic()
+
try:
self.persistence.lock()
except IOError:
@@ -730,13 +731,14 @@ class Workflow:
self.globals.update(globals_backup)
dag.postprocess()
- # deactivate IOCache such that from now on we always get updated
- # size, existence and mtime information
- # ATTENTION: this may never be removed without really good reason.
- # Otherwise weird things may happen.
- self.iocache.deactivate()
- # clear and deactivate persistence cache, from now on we want to see updates
- self.persistence.deactivate_cache()
+ if not dryrun:
+ # deactivate IOCache such that from now on we always get updated
+ # size, existence and mtime information
+ # ATTENTION: this may never be removed without really good reason.
+ # Otherwise weird things may happen.
+ self.iocache.deactivate()
+ # clear and deactivate persistence cache, from now on we want to see updates
+ self.persistence.deactivate_cache()
if nodeps:
missing_input = [
=====================================
tests/test_directory/Snakefile
=====================================
@@ -1,6 +1,5 @@
shell.executable("bash")
-shell("mkdir -p input_dir && touch input_dir/child")
rule downstream:
input: "some/dir"
=====================================
tests/test_directory/input_dir/child
=====================================
View it on GitLab: https://salsa.debian.org/med-team/snakemake/-/commit/f52d699ee437805edf9e009a3e58242a810fd838
--
View it on GitLab: https://salsa.debian.org/med-team/snakemake/-/commit/f52d699ee437805edf9e009a3e58242a810fd838
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20200628/91980dcd/attachment-0001.html>
More information about the debian-med-commit
mailing list