[med-svn] [Git][med-team/python-pbcommand][upstream] 2 commits: New upstream version 1.1.1+git20201023.cc0ed3d

Steffen Möller gitlab at salsa.debian.org
Mon Dec 7 15:49:31 GMT 2020



Steffen Möller pushed to branch upstream at Debian Med / python-pbcommand


Commits:
b72d2670 by Steffen Moeller at 2020-12-07T16:24:18+01:00
New upstream version 1.1.1+git20201023.cc0ed3d
- - - - -
3e33ced7 by Steffen Moeller at 2020-12-07T16:42:02+01:00
New upstream version 2.1.1+git20201023.cc0ed3d
- - - - -


11 changed files:

- pbcommand/models/__init__.py
- pbcommand/models/common.py
- pbcommand/models/legacy.py
- pbcommand/models/report.py
- pbcommand/pb_io/common.py
- pbcommand/services/__init__.py
- pbcommand/services/_service_access_layer.py
- pbcommand/services/job_poller.py
- setup.py
- tests/test_models_legacy.py
- tests/test_models_report.py


Changes:

=====================================
pbcommand/models/__init__.py
=====================================
@@ -6,6 +6,6 @@ from .common import (FileType, FileTypes, TaskOptionTypes,
                      BasePacBioOption,
                      PacBioIntOption, PacBioBooleanOption,
                      PacBioStringOption, PacBioFloatOption,
-                     PacBioIntChoiceOption,
+                     PacBioIntChoiceOption, PacBioFileOption,
                      PacBioFloatChoiceOption, PacBioStringChoiceOption,
-                     PacBioAlarm, PipelinePreset)
+                     PacBioAlarm, PipelinePreset, EntryPoint)


=====================================
pbcommand/models/common.py
=====================================
@@ -930,7 +930,7 @@ def write_dict_to_json(d, file_name, permission="w"):
 
 
 RX_TASK_ID = re.compile(r'^([A-z0-9_]*)\.tasks\.([A-z0-9_]*)$')
-RX_TASK_OPTION_ID = re.compile(r'^([A-z0-9_]*)\.task_options\.([A-z0-9_\.]*)')
+RX_TASK_OPTION_ID = re.compile(r'^([A-z0-9_\.]*)')
 
 
 def _validate_id(prog, idtype, tid):
@@ -1076,6 +1076,13 @@ def _strict_validate_string_or_raise(value):
     raise TypeError(_type_error_msg(value, str))
 
 
+def _strict_validate_file_or_raise(value):
+    # Not supporting unicode in python2.
+    if isinstance(value, str) or value is None:
+        return value
+    raise TypeError(_type_error_msg(value, str))
+
+
 class PacBioIntOption(BasePacBioOption):
     OPTION_TYPE_ID = TaskOptionTypes.INT
 
@@ -1108,6 +1115,14 @@ class PacBioStringOption(BasePacBioOption):
         return _strict_validate_string_or_raise(value)
 
 
+class PacBioFileOption(BasePacBioOption):
+    OPTION_TYPE_ID = TaskOptionTypes.FILE
+
+    @classmethod
+    def validate_core_type(cls, value):
+        return _strict_validate_file_or_raise(value)
+
+
 def _strict_validate_default_and_choices(core_type_validator_func):
     """
 
@@ -1324,3 +1339,19 @@ class PipelinePreset:
             ("description", self.description),
             ("options", dict(self.options)),
             ("taskOptions", dict(self.task_options))])
+
+
+class EntryPoint(namedtuple("EntryPoint", ["entry_id", "file_type_id", "name", "optional"])):
+
+    @staticmethod
+    def from_dict(d):
+        return EntryPoint(d["entryId"], d["fileTypeId"], d["name"],
+                          d.get("optional", False))
+
+    @property
+    def short_name(self):
+        return self.file_type_id.split(".")[-1] + " XML"
+
+    @property
+    def file_ext(self):
+        return FileTypes.ALL()[self.file_type_id].ext


=====================================
pbcommand/models/legacy.py
=====================================
@@ -2,24 +2,15 @@
 Models carried over from pbsmrtpipe, still required for SMRT Link interface.
 """
 
-import itertools
+from collections import namedtuple
 import json
 
 
 class Pipeline:
 
     def __init__(self, idx, display_name, version, description, bindings,
-                 entry_bindings, parent_pipeline_ids=None, tags=(), task_options=None):
-        """
-
-        Both entry_points and bindings are provided as "simple" format (e.g, [("alpha:0", "beta:1"])
-
-        This really should have been abstracted away into containers to make the interface clear. This was a fundamental
-        design mistake.
-
-        :param bindings: List of "simple" binding format [("alpha:0:0", "beta:0:0")]
-        :param entry_bindings: List of "simple" bindings [("$entry:e1", "my_task:0")]
-        """
+                 entry_points, parent_pipeline_ids=None, tags=(),
+                 task_options=None):
 
         self.idx = idx
         self.version = version
@@ -28,7 +19,7 @@ class Pipeline:
         # set of [(a, b), ...]
         self.bindings = {x for x in bindings}
         # set of [(a, b), ...]
-        self.entry_bindings = {x for x in entry_bindings}
+        self.entry_points = entry_points
         # list of strings
         self.tags = tags
         if parent_pipeline_ids is None:
@@ -43,56 +34,41 @@ class Pipeline:
         return self.idx
 
     @property
-    def all_bindings(self):
-        return self.bindings | self.entry_bindings
+    def required_inputs(self):
+        return [o for o in self.entry_points if not o.optional]
+
+    @property
+    def optional_inputs(self):
+        return [o for o in self.entry_points if o.optional]
 
     def __repr__(self):
         # Only communicate the entry id
-        ek = [eid for eid, _ in self.entry_bindings]
+        ek = [e["entryId"] for e in self.entry_points]
         e = " ".join(ek)
         _d = dict(k=self.__class__.__name__, i=self.idx,
                   d=self.display_name, b=len(self.bindings), e=e)
         return "<{k} id={i} nbindings={b} entry bindings={e} >".format(**_d)
 
-    def summary(self):
+    def summary(self, verbose=True):
         outs = []
         f = outs.append
 
-        # out a list of tuples
-        def _printer(xs):
-            for a, b in xs:
-                sx = ' -> '.join([str(a), str(b)])
-                f("  " + sx)
+        def _format_inputs(inputs):
+            return ", ".join([ep.short_name for ep in inputs])
 
-        f("Pipeline Summary")
-        f("Pipeline Id: {}".format(self.pipeline_id))
-        f("Name       : {}".format(self.display_name))
-        f("Description: {}".format(self.description))
-        f("EntryPoints: {}".format(len(self.entry_bindings)))
-        _printer(sorted(self.entry_bindings, key=lambda x: x[0]))
+        f("Workflow Summary")
+        f("Workflow Id    : {}".format(self.pipeline_id))
+        f("Name           : {}".format(self.display_name))
+        f("Description    : {}".format(self.description))
+        f("Required Inputs: {}".format(_format_inputs(self.required_inputs)))
+        if len(self.optional_inputs) > 0:
+            f("Optional Inputs: {}".format(_format_inputs(self.optional_inputs)))
         if self.tags:
-            f("Tags       : {} ".format(", ".join(list(set(self.tags)))))
+            f("Tags           : {} ".format(", ".join(list(set(self.tags)))))
         if len(self.task_options) > 0:
             f("Task Options:")
-            for option_id, value in self.iter_options():
-                f("  {o} = {v}".format(o=option_id, v=value))
+            for opt in self.task_options:
+                f("  {o} = {v}".format(o=opt.option_id, v=opt.default))
+                if verbose:
+                    f("    {n} ({t})".format(n=opt.name, t=opt.OPTION_TYPE_ID))
         return "\n".join(outs)
-
-    def iter_options(self):
-        option_ids = sorted(self.task_options.keys())
-        for option_id in option_ids:
-            yield (option_id, self.task_options[option_id])
-
-    @staticmethod
-    def from_dict(d):
-        bindings = {}  # obsolete
-        epoints = [(e["entryId"], e["fileTypeId"]) for e in d["entryPoints"]]
-        # The pipeline instance only needs to the key-value pair
-        task_options = {o["id"]: o["default"] for o in d['taskOptions']}
-        return Pipeline(d['id'], d['name'], d['version'], d['description'],
-                        bindings, epoints, tags=d['tags'], task_options=task_options)
-
-    @staticmethod
-    def load_from_json(file_name):
-        with open(file_name, "r") as json_in:
-            return Pipeline.from_dict(json.loads(json_in.read()))


=====================================
pbcommand/models/report.py
=====================================
@@ -361,7 +361,7 @@ class Plot(BaseReportElement):
         return self.PLOT_TYPE
 
     def _get_attrs_simple(self):
-        return ['image', 'caption', 'title', 'plotType']
+        return ['image', 'caption', 'title', 'plotType', 'thumbnail']
 
     def _get_attrs_complex_list(self):
         return []
@@ -869,10 +869,15 @@ class Report(BaseReportElement):
             table = Table(table_id, title=title, columns=columns)
             return table
 
+        def _to_sum(values):
+            if len(values) > 0 and isinstance(values[0], str):
+                return ",".join(values)
+            return sum(values)
+
         def _sum_attributes(attributes_list):
             d = _merge_attributes_d(attributes_list)
             labels = _merge_attributes_names(attributes_list)
-            return [Attribute(k, sum(values), name=labels[k])
+            return [Attribute(k, _to_sum(values), name=labels[k])
                     for k, values in d.items()]
 
         def _merge_tables(tables):


=====================================
pbcommand/pb_io/common.py
=====================================
@@ -5,10 +5,11 @@ import warnings
 
 from pbcommand.models import (PipelineChunk, PipelineDataStoreViewRules,
                               TaskOptionTypes, PacBioFloatChoiceOption,
-                              PacBioStringChoiceOption,
+                              PacBioStringChoiceOption, PacBioFileOption,
                               PacBioIntChoiceOption, PacBioStringOption,
                               PacBioFloatOption, PacBioBooleanOption,
-                              PacBioIntOption, PipelinePreset)
+                              PacBioIntOption, PipelinePreset, EntryPoint)
+from pbcommand.models.legacy import Pipeline
 from pbcommand.schemas import validate_datastore_view_rules, validate_presets
 from pbcommand import to_ascii, to_utf8
 
@@ -121,7 +122,8 @@ def __simple_option_by_type(
     klass_map = {TaskOptionTypes.INT: PacBioIntOption,
                  TaskOptionTypes.FLOAT: PacBioFloatOption,
                  TaskOptionTypes.STR: PacBioStringOption,
-                 TaskOptionTypes.BOOL: PacBioBooleanOption}
+                 TaskOptionTypes.BOOL: PacBioBooleanOption,
+                 TaskOptionTypes.FILE: PacBioFileOption}
 
     k = klass_map[option_type]
 
@@ -207,3 +209,18 @@ def load_pipeline_presets_from(d):
         name=d.get('name', None),
         description=d.get('description', None))
     return presets
+
+
+ at _json_path_or_d
+def load_pipeline_interface_from(d):
+    bindings = {}  # obsolete
+    epts = [EntryPoint.from_dict(d) for d in d["entryPoints"]]
+    opts = [pacbio_option_from_dict(o) for o in d["taskOptions"]]
+    return Pipeline(d['id'],
+                    d['name'],
+                    d['version'],
+                    d['description'],
+                    bindings,
+                    epts,
+                    tags=d['tags'],
+                    task_options=opts)


=====================================
pbcommand/services/__init__.py
=====================================
@@ -1,8 +1,12 @@
 from .models import (JobExeError, JobResult, LogLevels,
                      ServiceResourceTypes, JobTypes, JobStates,
-                     ServiceJob, ServiceEntryPoint)
+                     ServiceJob, ServiceEntryPoint,
+                     add_smrtlink_server_args)
 # this module imports the models, so the model loading
 # must be called first to avoid cyclic-dependencies
-from ._service_access_layer import ServiceAccessLayer, SmrtLinkAuthClient
+from ._service_access_layer import (ServiceAccessLayer,
+                                    SmrtLinkAuthClient,
+                                    get_smrtlink_client,
+                                    get_smrtlink_client_from_args)
 # There's some crufty legacy naming. Using a cleaner model here
 SmrtLinkClient = ServiceAccessLayer


=====================================
pbcommand/services/_service_access_layer.py
=====================================
@@ -205,23 +205,6 @@ def _transform_job_tasks(j):
     return [JobTask.from_d(d) for d in j]
 
 
-def _import_dataset_by_type(dataset_type_or_id):
-
-    if isinstance(dataset_type_or_id, DataSetFileType):
-        ds_type_id = dataset_type_or_id.file_type_id
-    else:
-        ds_type_id = dataset_type_or_id
-
-    def wrapper(total_url, path, headers, avoid_duplicate_import=False):
-        _d = dict(datasetType=ds_type_id,
-                  path=path,
-                  avoidDuplicateImport=avoid_duplicate_import)
-        return _process_rpost_with_transform(
-            ServiceJob.from_d)(total_url, _d, headers)
-
-    return wrapper
-
-
 def _get_job_by_id_or_raise(sal, job_id, error_klass,
                             error_messge_extras=None):
     job = sal.get_job_by_id(job_id)
@@ -711,25 +694,34 @@ class ServiceAccessLayer:  # pragma: no cover
         return self._get_job_resource_type(
             JobTypes.MERGE_DS, job_id, ServiceResourceTypes.DATASTORE)
 
-    def _import_dataset(self, dataset_type, path,
-                        avoid_duplicate_import=False):
+    def import_dataset(self,
+                       path,
+                       avoid_duplicate_import=False):
         # This returns a job resource
         url = self._to_url(
             "{p}/{x}".format(x=JobTypes.IMPORT_DS, p=ServiceAccessLayer.ROOT_JOBS))
-        return _import_dataset_by_type(dataset_type)(
-            url, path, headers=self._get_headers(), avoid_duplicate_import=avoid_duplicate_import)
+        d = {
+            "path": path,
+            "avoidDuplicateImport": avoid_duplicate_import
+        }
+        return _process_rpost_with_transform(
+            ServiceJob.from_d)(url, d, headers=self._get_headers())
 
-    def run_import_dataset_by_type(self, dataset_type, path_to_xml,
-                                   avoid_duplicate_import=False):
-        job_or_error = self._import_dataset(
-            dataset_type,
+    def run_import_dataset(self,
+                           path_to_xml,
+                           avoid_duplicate_import=False):
+        job_or_error = self.import_dataset(
             path_to_xml,
             avoid_duplicate_import=avoid_duplicate_import)
-        custom_err_msg = "Import {d} {p}".format(p=path_to_xml, d=dataset_type)
+        custom_err_msg = "Import {p}".format(p=path_to_xml)
         job_id = _job_id_or_error(job_or_error, custom_err_msg=custom_err_msg)
         return _block_for_job_to_complete(
             self, job_id, sleep_time=self._sleep_time)
 
+    def run_import_dataset_by_type(self, dataset_type, path_to_xml,
+                                   avoid_duplicate_import=False):
+        return self.run_import_dataset(path_to_xml, avoid_duplicate_import)
+
     def _run_import_and_block(self, func, path, time_out=None):
         # func while be self.import_dataset_X
         job_or_error = func(path)
@@ -739,28 +731,32 @@ class ServiceAccessLayer:  # pragma: no cover
                                           sleep_time=self._sleep_time)
 
     def import_dataset_subread(self, path):
-        return self._import_dataset(FileTypes.DS_SUBREADS, path)
+        log.warn("DEPRECATED METHOD")
+        return self.import_dataset(path)
 
     def run_import_dataset_subread(self, path, time_out=10):
         return self._run_import_and_block(
             self.import_dataset_subread, path, time_out=time_out)
 
     def import_dataset_hdfsubread(self, path):
-        return self._import_dataset(FileTypes.DS_SUBREADS_H5, path)
+        log.warn("DEPRECATED METHOD")
+        return self.import_dataset(path)
 
     def run_import_dataset_hdfsubread(self, path, time_out=10):
         return self._run_import_and_block(
             self.import_dataset_hdfsubread, path, time_out=time_out)
 
     def import_dataset_reference(self, path):
-        return self._import_dataset(FileTypes.DS_REF, path)
+        log.warn("DEPRECATED METHOD")
+        return self.import_dataset(path)
 
     def run_import_dataset_reference(self, path, time_out=10):
         return self._run_import_and_block(
             self.import_dataset_reference, path, time_out=time_out)
 
     def import_dataset_barcode(self, path):
-        return self._import_dataset(FileTypes.DS_BARCODE, path)
+        log.warn("DEPRECATED METHOD")
+        return self.import_dataset(path)
 
     def run_import_dataset_barcode(self, path, time_out=10):
         return self._run_import_and_block(
@@ -1542,3 +1538,11 @@ def get_smrtlink_client(host, port, user=None, password=None, sleep_time=5):  #
         return SmrtLinkAuthClient(host, user, password, sleep_time=sleep_time)
     else:
         return ServiceAccessLayer(host, port, sleep_time=sleep_time)
+
+
+def get_smrtlink_client_from_args(args):
+    return get_smrtlink_client(
+        host=args.host,
+        port=args.port,
+        user=args.user,
+        password=args.password)


=====================================
pbcommand/services/job_poller.py
=====================================
@@ -9,7 +9,9 @@ import time
 import json
 import sys
 
-from requests.exceptions import HTTPError
+from urllib3.exceptions import ProtocolError
+
+from requests.exceptions import HTTPError, ConnectionError
 
 from pbcommand.cli.core import get_default_argparser_with_base_opts, pacbio_args_runner
 from pbcommand.services._service_access_layer import (get_smrtlink_client,
@@ -67,6 +69,14 @@ def poll_for_job_completion(job_id,
                     continue
                 else:
                     raise
+            except (ConnectionError, ProtocolError) as e:
+                log.warning("Connection error: {e}".format(e=str(e)))
+                log.info("Will retry in {d}s".format(d=retry_time))
+                time.sleep(retry_time)
+                # if a retryable error occurs, we increment the retry time
+                # up to a max of 30 minutes
+                retry_time = max(1800, retry_time + sleep_time)
+                continue
             else:
                 # if request succeeded, reset the retry_time
                 auth_errors = 0


=====================================
setup.py
=====================================
@@ -10,12 +10,12 @@ test_deps = [
     'pylint',
     'pytest',
     'pytest-cov',
-    'pytest-xdist',
+    'pytest-xdist == 1.34.0',
 ]
 
 setup(
     name='pbcommand',
-    version='2.1.1',
+    version='2.2.2',
     author='Pacific Biosciences',
     author_email='devnet at pacificbiosciences.com',
     description='Library and Tools for interfacing with PacBio® data CLI tools',


=====================================
tests/test_models_legacy.py
=====================================
@@ -1,6 +1,7 @@
 import tempfile
 
 from pbcommand.models.legacy import Pipeline
+from pbcommand.pb_io.common import load_pipeline_interface_from
 
 
 class TestLegacyModels:
@@ -60,6 +61,6 @@ class TestLegacyModels:
         json_file = tempfile.NamedTemporaryFile(suffix=".json").name
         with open(json_file, "w") as json_out:
             json_out.write(pipeline_json)
-        p = Pipeline.load_from_json(json_file)
+        p = load_pipeline_interface_from(json_file)
         assert p.pipeline_id == "cromwell.workflows.dev_diagnostic_subreads"
         s = p.summary()


=====================================
tests/test_models_report.py
=====================================
@@ -296,26 +296,30 @@ class TestReportModel:
         EXPECTED_VALUES = {
             "n_reads": 300,
             "n_zmws": 60,
+            "sample": "Person1,Person2"
         }
         NAMES = {
             "n_reads": "Number of reads",
-            "n_zmws": "Number of ZMWs"
+            "n_zmws": "Number of ZMWs",
+            "sample": "Sample"
         }
         chunks = [
             Report("pbcommand_test",
                    attributes=[
                        Attribute(id_="n_reads", value=50,
                                  name="Number of reads"),
-                       Attribute(id_="n_zmws", value=10, name="Number of ZMWs")],
+                       Attribute(id_="n_zmws", value=10, name="Number of ZMWs"),
+                       Attribute(id_="sample", value="Person1", name="Sample")],
                    dataset_uuids=["12345"]),
             Report("pbcommand_test",
                    attributes=[
                        Attribute(id_="n_reads", value=250,
                                  name="Number of reads"),
-                       Attribute(id_="n_zmws", value=50, name="Number of ZMWs")]),
+                       Attribute(id_="n_zmws", value=50, name="Number of ZMWs"),
+                       Attribute(id_="sample", value="Person2", name="Sample")]),
         ]
         r = Report.merge(chunks)
-        assert [a.id for a in r.attributes] == ["n_reads", "n_zmws"]
+        assert [a.id for a in r.attributes] == ["n_reads", "n_zmws", "sample"]
         assert r._dataset_uuids == ["12345"]
         for attr in r.attributes:
             assert attr.value == EXPECTED_VALUES[attr.id]



View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/8fca3229adf5cfc005c9373348f7b42529b96cd4...3e33ced73e5c5b53f57ac2c87faa0fa1278462a0

-- 
View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/8fca3229adf5cfc005c9373348f7b42529b96cd4...3e33ced73e5c5b53f57ac2c87faa0fa1278462a0
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20201207/742363bb/attachment-0001.html>


More information about the debian-med-commit mailing list