[med-svn] [Git][med-team/snakemake][upstream] New upstream version 5.19.3

Rebecca N. Palmer gitlab at salsa.debian.org
Sun Jun 28 23:05:03 BST 2020



Rebecca N. Palmer pushed to branch upstream at Debian Med / snakemake


Commits:
f52d699e by Rebecca N. Palmer at 2020-06-28T22:07:37+01:00
New upstream version 5.19.3
- - - - -


13 changed files:

- CHANGELOG.rst
- docs/executor_tutorial/google_lifesciences.rst
- snakemake/_version.py
- snakemake/dag.py
- snakemake/executors/google_lifesciences.py
- snakemake/io.py
- snakemake/jobs.py
- snakemake/remote/GS.py
- snakemake/remote/__init__.py
- snakemake/script.py
- snakemake/workflow.py
- tests/test_directory/Snakefile
- + tests/test_directory/input_dir/child


Changes:

=====================================
CHANGELOG.rst
=====================================
@@ -1,3 +1,12 @@
+[5.19.3] - 2020-06-16
+=====================
+Changed
+-------
+- Performance improvements for DAG generation (up to 7x in the google cloud, anything from a little to massive in a cluster, depending on the overall filesystem performance).
+- Made harcoded bucket in google cloud executor configurable.
+- Improved speed of --unlock command.
+
+
 [5.19.2] - 2020-06-04
 =====================
 Changed


=====================================
docs/executor_tutorial/google_lifesciences.rst
=====================================
@@ -15,6 +15,7 @@ To go through this tutorial, you need the following software installed:
 
 * Python_ ≥3.5
 * Snakemake_ ≥5.16
+* git
 
 You should install conda as outlined in the :ref:`tutorial <tutorial-setup>`,
 and then install full snakemake with:


=====================================
snakemake/_version.py
=====================================
@@ -22,9 +22,9 @@ def get_keywords():
     # setup.py/versioneer.py will grep for the variable names, so they must
     # each be defined on a line of their own. _version.py will just call
     # get_keywords().
-    git_refnames = " (HEAD -> master, tag: v5.19.2)"
-    git_full = "3df86e764bc50c069163267ad2d95d02564c2c65"
-    git_date = "2020-06-04 16:34:17 +0200"
+    git_refnames = " (tag: v5.19.3)"
+    git_full = "741edadfe9f6a380a395e7b10723104a4453b0f3"
+    git_date = "2020-06-16 20:40:22 +0200"
     keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
     return keywords
 


=====================================
snakemake/dag.py
=====================================
@@ -17,7 +17,7 @@ import uuid
 import math
 
 from snakemake.io import PeriodicityDetector, wait_for_files, is_flagged
-from snakemake.jobs import Job, Reason, GroupJob
+from snakemake.jobs import Reason, JobFactory, GroupJobFactory, Job
 from snakemake.exceptions import MissingInputException
 from snakemake.exceptions import MissingRuleException, AmbiguousRuleException
 from snakemake.exceptions import CyclicGraphException, MissingOutputException
@@ -124,6 +124,9 @@ class DAG:
         self._progress = 0
         self._group = dict()
 
+        self.job_factory = JobFactory()
+        self.group_job_factory = GroupJobFactory()
+
         self.forcerules = set()
         self.forcefiles = set()
         self.untilrules = set()
@@ -146,6 +149,8 @@ class DAG:
         if omitfiles:
             self.omitfiles.update(omitfiles)
 
+        self.has_dynamic_rules = any(rule.dynamic_output for rule in self.rules)
+
         self.omitforce = set()
 
         self.batch = batch
@@ -318,11 +323,12 @@ class DAG:
 
     def check_dynamic(self):
         """Check dynamic output and update downstream rules if necessary."""
-        for job in filter(
-            lambda job: (job.dynamic_output and not self.needrun(job)), self.jobs
-        ):
-            self.update_dynamic(job)
-        self.postprocess()
+        if self.has_dynamic_rules:
+            for job in filter(
+                lambda job: (job.dynamic_output and not self.needrun(job)), self.jobs
+            ):
+                self.update_dynamic(job)
+            self.postprocess()
 
     def is_edit_notebook_job(self, job):
         return self.workflow.edit_notebook and job.targetfile in self.targetfiles
@@ -763,6 +769,13 @@ class DAG:
         producer = dict()
         exceptions = dict()
         for file, jobs in potential_dependencies.items():
+            # If possible, obtain inventory information starting from
+            # given file and store it in the IOCache.
+            # This should provide faster access to existence and mtime information
+            # than querying file by file. If the file type does not support inventory
+            # information, this call is a no-op.
+            file.inventory()
+
             if not jobs:
                 # no producing job found
                 if not file.exists:
@@ -816,13 +829,23 @@ class DAG:
     def update_needrun(self):
         """ Update the information whether a job needs to be executed. """
 
-        def output_mintime(job):
-            for job_ in self.bfs(self.depending, job):
-                t = job_.output_mintime
-                if t:
-                    return t
+        output_mintime = dict()
 
-        def needrun(job):
+        def update_output_mintime(job):
+            try:
+                return output_mintime[job]
+            except KeyError:
+                for job_ in chain([job], self.depending[job]):
+                    try:
+                        t = output_mintime[job_]
+                    except KeyError:
+                        t = job_.output_mintime
+                    if t is not None:
+                        output_mintime[job] = t
+                        return
+                output_mintime[job] = None
+
+        def update_needrun(job):
             reason = self.reason(job)
             noinitreason = not reason
             updated_subworkflow_input = self.updated_subworkflow_files.intersection(
@@ -859,7 +882,7 @@ class DAG:
                         )
                     reason.missing_output.update(missing_output)
             if not reason:
-                output_mintime_ = output_mintime(job)
+                output_mintime_ = output_mintime.get(job)
                 if output_mintime_:
                     updated_input = [
                         f for f in job.input if f.exists and f.is_newer(output_mintime_)
@@ -867,7 +890,6 @@ class DAG:
                     reason.updated_input.update(updated_input)
             if noinitreason and reason:
                 reason.derived = False
-            return job
 
         reason = self.reason
         _needrun = self._needrun
@@ -875,9 +897,21 @@ class DAG:
         depending = self.depending
 
         _needrun.clear()
-        candidates = set(self.jobs)
+        candidates = list(self.jobs)
+
+        # Update the output mintime of all jobs.
+        # We traverse them in BFS (level order) starting from target jobs.
+        # Then, we check output mintime of job itself and all direct descendants,
+        # which have already been visited in the level before.
+        # This way, we achieve a linear runtime.
+        for job in candidates:
+            update_output_mintime(job)
+
+        # update prior reason for all candidate jobs
+        for job in candidates:
+            update_needrun(job)
 
-        queue = list(filter(reason, map(needrun, candidates)))
+        queue = list(filter(reason, candidates))
         visited = set(queue)
         while queue:
             job = queue.pop(0)
@@ -964,7 +998,7 @@ class DAG:
             # BFS into depending needrun jobs if in same group
             # Note: never go up here (into depending), because it may contain
             # jobs that have been sorted out due to e.g. ruleorder.
-            group = GroupJob(
+            group = self.group_job_factory.new(
                 job.group,
                 (
                     job
@@ -1214,7 +1248,7 @@ class DAG:
             assert targetfile is not None
             return self.job_cache[key]
         wildcards_dict = rule.get_wildcards(targetfile)
-        job = Job(
+        job = self.job_factory.new(
             rule,
             self,
             wildcards_dict=wildcards_dict,


=====================================
snakemake/executors/google_lifesciences.py
=====================================
@@ -86,6 +86,11 @@ class GoogleLifeSciencesExecutor(ClusterExecutor):
         self.container_image = container_image or get_container_image()
         self.regions = regions or ["us-east1", "us-west1", "us-central1"]
 
+        # The project name is required, either from client or environment
+        self.project = (
+            os.environ.get("GOOGLE_CLOUD_PROJECT") or self._bucket_service.project
+        )
+
         # Determine API location based on user preference, and then regions
         self._set_location(location)
 
@@ -94,11 +99,6 @@ class GoogleLifeSciencesExecutor(ClusterExecutor):
         logger.debug("location=%s" % self.location)
         logger.debug("container=%s" % self.container_image)
 
-        # The project name is required, either from client or environment
-        self.project = (
-            os.environ.get("GOOGLE_CLOUD_PROJECT") or self._bucket_service.project
-        )
-
         # Keep track of build packages to clean up shutdown, and generate
         self._build_packages = set()
         targz = self._generate_build_source_package()
@@ -202,7 +202,7 @@ class GoogleLifeSciencesExecutor(ClusterExecutor):
         locations = (
             self._api.projects()
             .locations()
-            .list(name="projects/snakemake-testing")
+            .list(name="projects/{}".format(self.project))
             .execute()
         )
 


=====================================
snakemake/io.py
=====================================
@@ -77,19 +77,58 @@ else:
         os.chmod(f, mode)
 
 
+class ExistsDict(dict):
+    def __init__(self, cache):
+        super().__init__()
+        self.cache = cache
+
+    def __getitem__(self, path):
+        # Always return False if not dict.
+        # The reason is that this is only called if the method contains below has returned True.
+        # Hence, we already know that either path is in dict, or inventory has never
+        # seen it, and hence it does not exist.
+        return self.get(path, False)
+
+    def __contains__(self, path):
+        # if already in inventory, always return True.
+        return self.cache.in_inventory(path) or super().__contains__(path)
+
+
 class IOCache:
     def __init__(self):
         self.mtime = dict()
-        self.exists_local = dict()
-        self.exists_remote = dict()
+        self.exists_local = ExistsDict(self)
+        self.exists_remote = ExistsDict(self)
         self.size = dict()
+        # Indicator whether an inventory has been created for the root of a given IOFile.
+        # In case of remote objects the root is the bucket or server host.
+        self.has_inventory = set()
         self.active = True
 
+    def get_inventory_root(self, path):
+        """If eligible for inventory, get the root of a given path.
+
+        This code does not work on local Windows paths,
+        but inventory is disabled on Windows.
+        """
+        root = path.split("/", maxsplit=1)[0]
+        if root and root != "..":
+            return root
+
+    def needs_inventory(self, path):
+        root = self.get_inventory_root(path)
+        return root and root not in self.has_inventory
+
+    def in_inventory(self, path):
+        root = self.get_inventory_root(path)
+        return root and root in self.has_inventory
+
     def clear(self):
         self.mtime.clear()
         self.size.clear()
         self.exists_local.clear()
         self.exists_remote.clear()
+        self.has_inventory.clear()
 
     def deactivate(self):
         self.clear()
@@ -155,6 +194,39 @@ class _IOFile(str):
 
         return wrapper
 
+    def inventory(self):
+        """Starting from the given file, try to cache as much existence and 
+        modification date information of this and other files as possible.
+        """
+        cache = self.rule.workflow.iocache
+        if cache.active and cache.needs_inventory(self):
+            if self.is_remote:
+                # info not yet in inventory, let's discover as much as we can
+                self.remote_object.inventory(cache)
+            elif not ON_WINDOWS:
+                # we don't want to mess with different path representations on windows
+                self._local_inventory(cache)
+
+    def _local_inventory(self, cache):
+        # for local files, perform BFS via os.scandir to determine existence of files
+        root = cache.get_inventory_root(self)
+        if root == self:
+            # there is no root directory that could be used
+            return
+        if os.path.exists(root):
+            queue = [root]
+            while queue:
+                path = queue.pop(0)
+                cache.exists_local[path] = True
+                with os.scandir(path) as scan:
+                    for entry in scan:
+                        if entry.is_dir():
+                            queue.append(entry.path)
+                        else:
+                            cache.exists_local[entry.path] = True
+
+        cache.has_inventory.add(root)
+
     @contextmanager
     def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None):
         """Open this file. If necessary, download it from remote first. 
@@ -273,20 +345,6 @@ class _IOFile(str):
     @property
     @iocache
     def exists_local(self):
-        if self.rule.workflow.iocache.active:
-            # The idea is to first check existence of parent directories and
-            # cache the results.
-            # We omit the last ancestor, because this is always "." or "/" or a
-            # drive letter.
-            for p in self.parents(omit=1):
-                try:
-                    if not p.exists_local:
-                        return False
-                except:
-                    # In case of an error, we continue, because it can be that
-                    # we simply don't have the permissions to access a parent
-                    # directory.
-                    continue
         return os.path.exists(self.file)
 
     @property
@@ -294,23 +352,6 @@ class _IOFile(str):
     def exists_remote(self):
         if not self.is_remote:
             return False
-        if (
-            self.rule.workflow.iocache.active
-            and self.remote_object.provider.allows_directories
-        ):
-            # The idea is to first check existence of parent directories and
-            # cache the results.
-            # We omit the last 2 ancestors, because these are "." and the host
-            # name of the remote location.
-            for p in self.parents(omit=2):
-                try:
-                    if not p.exists_remote:
-                        return False
-                except:
-                    # In case of an error, we continue, because it can be that
-                    # we simply don't have the permissions to access a parent
-                    # directory in the remote.
-                    continue
         return self.remote_object.exists()
 
     @property


=====================================
snakemake/jobs.py
=====================================
@@ -75,9 +75,48 @@ class AbstractJob:
         raise NotImplementedError()
 
 
+class JobFactory:
+    def __init__(self):
+        self.cache = dict()
+
+    def new(
+        self,
+        rule,
+        dag,
+        wildcards_dict=None,
+        format_wildcards=None,
+        targetfile=None,
+        update=False,
+    ):
+        if rule.is_branched:
+            # for distinguishing branched rules, we need input and output in addition
+            key = (
+                rule.name,
+                *rule.output,
+                *rule.input,
+                *sorted(wildcards_dict.items()),
+            )
+        else:
+            key = (rule.name, *sorted(wildcards_dict.items()))
+        if update:
+            # cache entry has to be replaced because job shall be constructed from scratch
+            obj = Job(rule, dag, wildcards_dict, format_wildcards, targetfile)
+            self.cache[key] = obj
+        else:
+            try:
+                # try to get job from cache
+                obj = self.cache[key]
+            except KeyError:
+                obj = Job(rule, dag, wildcards_dict, format_wildcards, targetfile)
+                self.cache[key] = obj
+        return obj
+
+
 class Job(AbstractJob):
     HIGHEST_PRIORITY = sys.maxsize
 
+    obj_cache = dict()
+
     __slots__ = [
         "rule",
         "dag",
@@ -180,16 +219,14 @@ class Job(AbstractJob):
                             rule=self.rule,
                         )
                 self.subworkflow_input[f] = sub
-        self._hash = self.rule.__hash__()
-        for wildcard_value in self.wildcards_dict.values():
-            self._hash ^= wildcard_value.__hash__()
 
     def updated(self):
-        job = Job(
+        job = self.dag.job_factory.new(
             self.rule,
             self.dag,
             wildcards_dict=self.wildcards_dict,
             targetfile=self.targetfile,
+            update=True,
         )
         job.is_updated = True
         return job
@@ -494,6 +531,7 @@ class Job(AbstractJob):
     @property
     def output_mintime(self):
         """ Return oldest output file. """
+
         existing = [f.mtime for f in self.expanded_output if f.exists]
         if self.benchmark and self.benchmark.exists:
             existing.append(self.benchmark.mtime)
@@ -859,24 +897,12 @@ class Job(AbstractJob):
     def __repr__(self):
         return self.rule.name
 
-    def __eq__(self, other):
-        if other is None:
-            return False
-        return (
-            self.rule == other.rule
-            and (self.wildcards_dict == other.wildcards_dict)
-            and (self.input == other.input)
-        )
-
     def __lt__(self, other):
         return self.rule.__lt__(other.rule)
 
     def __gt__(self, other):
         return self.rule.__gt__(other.rule)
 
-    def __hash__(self):
-        return self._hash
-
     def expand_dynamic(self, pattern):
         """ Expand dynamic files. """
         return list(
@@ -1037,8 +1063,25 @@ class Job(AbstractJob):
         return 1
 
 
+class GroupJobFactory:
+    def __init__(self):
+        self.cache = dict()
+
+    def new(self, id, jobs):
+        jobs = frozenset(jobs)
+        key = (id, jobs)
+        try:
+            obj = self.cache[key]
+        except KeyError:
+            obj = GroupJob(id, jobs)
+            self.cache[key] = obj
+        return obj
+
+
 class GroupJob(AbstractJob):
 
+    obj_cache = dict()
+
     __slots__ = [
         "groupid",
         "jobs",
@@ -1054,7 +1097,7 @@ class GroupJob(AbstractJob):
 
     def __init__(self, id, jobs):
         self.groupid = id
-        self.jobs = frozenset(jobs)
+        self.jobs = jobs
         self.toposorted = None
         self._resources = None
         self._input = None


=====================================
snakemake/remote/GS.py
=====================================
@@ -11,6 +11,7 @@ import struct
 from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider
 from snakemake.exceptions import WorkflowError, CheckSumMismatchException
 from snakemake.common import lazy_property
+import snakemake.io
 from snakemake.utils import os_sync
 
 try:
@@ -126,6 +127,29 @@ class RemoteObject(AbstractRemoteObject):
         self._bucket = None
         self._blob = None
 
+    def inventory(self, cache: snakemake.io.IOCache):
+        """Using client.list_blobs(), we want to iterate over the objects in
+           the "folder" of a bucket and store information about the IOFiles in the
+           provided cache (snakemake.io.IOCache) indexed by bucket/blob name.
+           This will be called by the first mention of a remote object, and
+           iterate over the entire bucket once (and then not need to again). 
+           This includes:
+            - cache.exist_remote
+            - cache_mtime
+            - cache.size
+        """
+        for blob in self.client.list_blobs(
+            self.bucket_name, prefix=os.path.dirname(self.blob.name)
+        ):
+            # By way of being listed, it exists. mtime is a datetime object
+            name = "{}/{}".format(blob.bucket.name, blob.name)
+            cache.exists_remote[name] = True
+            cache.mtime[name] = blob.updated
+            cache.size[name] = blob.size
+        # Mark bucket as having an inventory, such that this method is
+        # only called once for this bucket.
+        cache.has_inventory.add(self.bucket_name)
+
     # === Implementations of abstract class members ===
 
     @retry.Retry(predicate=google_cloud_retry_predicate)


=====================================
snakemake/remote/__init__.py
=====================================
@@ -167,6 +167,14 @@ class AbstractRemoteObject:
         self.provider = provider
         self.protocol = protocol
 
+    def inventory(self, cache: snakemake.io.IOCache):
+        """From this file, try to find as much existence and modification date 
+        information as possible.
+        """
+        # If this is implemented in a remote object, results have to be stored in
+        # the given IOCache object.
+        pass
+
     @property
     def _file(self):
         if self._iofile is None:


=====================================
snakemake/script.py
=====================================
@@ -822,7 +822,7 @@ def get_source(path, basedir="."):
     if path.startswith("file://"):
         sourceurl = "file:" + pathname2url(path[7:])
     elif path.startswith("git+file"):
-        source = git_content(path)
+        source = git_content(path).encode()
         (root_path, file_path, version) = split_git_path(path)
         path = path.rstrip("@" + version)
     else:


=====================================
snakemake/workflow.py
=====================================
@@ -651,13 +651,6 @@ class Workflow:
                 self.persistence.cleanup_metadata(f)
             return True
 
-        logger.info("Building DAG of jobs...")
-        dag.init()
-        dag.update_checkpoint_dependencies()
-        # check incomplete has to run BEFORE any call to postprocess
-        dag.check_incomplete()
-        dag.check_dynamic()
-
         if unlock:
             try:
                 self.persistence.cleanup_locks()
@@ -669,6 +662,14 @@ class Workflow:
                     "you don't have the permissions?"
                 )
                 return False
+
+        logger.info("Building DAG of jobs...")
+        dag.init()
+        dag.update_checkpoint_dependencies()
+        # check incomplete has to run BEFORE any call to postprocess
+        dag.check_incomplete()
+        dag.check_dynamic()
+
         try:
             self.persistence.lock()
         except IOError:
@@ -730,13 +731,14 @@ class Workflow:
             self.globals.update(globals_backup)
 
         dag.postprocess()
-        # deactivate IOCache such that from now on we always get updated
-        # size, existence and mtime information
-        # ATTENTION: this may never be removed without really good reason.
-        # Otherwise weird things may happen.
-        self.iocache.deactivate()
-        # clear and deactivate persistence cache, from now on we want to see updates
-        self.persistence.deactivate_cache()
+        if not dryrun:
+            # deactivate IOCache such that from now on we always get updated
+            # size, existence and mtime information
+            # ATTENTION: this may never be removed without really good reason.
+            # Otherwise weird things may happen.
+            self.iocache.deactivate()
+            # clear and deactivate persistence cache, from now on we want to see updates
+            self.persistence.deactivate_cache()
 
         if nodeps:
             missing_input = [


=====================================
tests/test_directory/Snakefile
=====================================
@@ -1,6 +1,5 @@
 shell.executable("bash")
 
-shell("mkdir -p input_dir && touch input_dir/child")
 
 rule downstream:
     input: "some/dir"


=====================================
tests/test_directory/input_dir/child
=====================================



View it on GitLab: https://salsa.debian.org/med-team/snakemake/-/commit/f52d699ee437805edf9e009a3e58242a810fd838

-- 
View it on GitLab: https://salsa.debian.org/med-team/snakemake/-/commit/f52d699ee437805edf9e009a3e58242a810fd838
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20200628/91980dcd/attachment-0001.html>


More information about the debian-med-commit mailing list