[med-svn] [Git][med-team/python-pbcommand][upstream] 2 commits: New upstream version 1.1.1+git20201023.cc0ed3d
Steffen Möller
gitlab at salsa.debian.org
Mon Dec 7 15:49:31 GMT 2020
Steffen Möller pushed to branch upstream at Debian Med / python-pbcommand
Commits:
b72d2670 by Steffen Moeller at 2020-12-07T16:24:18+01:00
New upstream version 1.1.1+git20201023.cc0ed3d
- - - - -
3e33ced7 by Steffen Moeller at 2020-12-07T16:42:02+01:00
New upstream version 2.1.1+git20201023.cc0ed3d
- - - - -
11 changed files:
- pbcommand/models/__init__.py
- pbcommand/models/common.py
- pbcommand/models/legacy.py
- pbcommand/models/report.py
- pbcommand/pb_io/common.py
- pbcommand/services/__init__.py
- pbcommand/services/_service_access_layer.py
- pbcommand/services/job_poller.py
- setup.py
- tests/test_models_legacy.py
- tests/test_models_report.py
Changes:
=====================================
pbcommand/models/__init__.py
=====================================
@@ -6,6 +6,6 @@ from .common import (FileType, FileTypes, TaskOptionTypes,
BasePacBioOption,
PacBioIntOption, PacBioBooleanOption,
PacBioStringOption, PacBioFloatOption,
- PacBioIntChoiceOption,
+ PacBioIntChoiceOption, PacBioFileOption,
PacBioFloatChoiceOption, PacBioStringChoiceOption,
- PacBioAlarm, PipelinePreset)
+ PacBioAlarm, PipelinePreset, EntryPoint)
=====================================
pbcommand/models/common.py
=====================================
@@ -930,7 +930,7 @@ def write_dict_to_json(d, file_name, permission="w"):
RX_TASK_ID = re.compile(r'^([A-z0-9_]*)\.tasks\.([A-z0-9_]*)$')
-RX_TASK_OPTION_ID = re.compile(r'^([A-z0-9_]*)\.task_options\.([A-z0-9_\.]*)')
+RX_TASK_OPTION_ID = re.compile(r'^([A-z0-9_\.]*)')
def _validate_id(prog, idtype, tid):
@@ -1076,6 +1076,13 @@ def _strict_validate_string_or_raise(value):
raise TypeError(_type_error_msg(value, str))
+def _strict_validate_file_or_raise(value):
+ # Not supporting unicode in python2.
+ if isinstance(value, str) or value is None:
+ return value
+ raise TypeError(_type_error_msg(value, str))
+
+
class PacBioIntOption(BasePacBioOption):
OPTION_TYPE_ID = TaskOptionTypes.INT
@@ -1108,6 +1115,14 @@ class PacBioStringOption(BasePacBioOption):
return _strict_validate_string_or_raise(value)
+class PacBioFileOption(BasePacBioOption):
+ OPTION_TYPE_ID = TaskOptionTypes.FILE
+
+ @classmethod
+ def validate_core_type(cls, value):
+ return _strict_validate_file_or_raise(value)
+
+
def _strict_validate_default_and_choices(core_type_validator_func):
"""
@@ -1324,3 +1339,19 @@ class PipelinePreset:
("description", self.description),
("options", dict(self.options)),
("taskOptions", dict(self.task_options))])
+
+
+class EntryPoint(namedtuple("EntryPoint", ["entry_id", "file_type_id", "name", "optional"])):
+
+ @staticmethod
+ def from_dict(d):
+ return EntryPoint(d["entryId"], d["fileTypeId"], d["name"],
+ d.get("optional", False))
+
+ @property
+ def short_name(self):
+ return self.file_type_id.split(".")[-1] + " XML"
+
+ @property
+ def file_ext(self):
+ return FileTypes.ALL()[self.file_type_id].ext
=====================================
pbcommand/models/legacy.py
=====================================
@@ -2,24 +2,15 @@
Models carried over from pbsmrtpipe, still required for SMRT Link interface.
"""
-import itertools
+from collections import namedtuple
import json
class Pipeline:
def __init__(self, idx, display_name, version, description, bindings,
- entry_bindings, parent_pipeline_ids=None, tags=(), task_options=None):
- """
-
- Both entry_points and bindings are provided as "simple" format (e.g, [("alpha:0", "beta:1"])
-
- This really should have been abstracted away into containers to make the interface clear. This was a fundamental
- design mistake.
-
- :param bindings: List of "simple" binding format [("alpha:0:0", "beta:0:0")]
- :param entry_bindings: List of "simple" bindings [("$entry:e1", "my_task:0")]
- """
+ entry_points, parent_pipeline_ids=None, tags=(),
+ task_options=None):
self.idx = idx
self.version = version
@@ -28,7 +19,7 @@ class Pipeline:
# set of [(a, b), ...]
self.bindings = {x for x in bindings}
# set of [(a, b), ...]
- self.entry_bindings = {x for x in entry_bindings}
+ self.entry_points = entry_points
# list of strings
self.tags = tags
if parent_pipeline_ids is None:
@@ -43,56 +34,41 @@ class Pipeline:
return self.idx
@property
- def all_bindings(self):
- return self.bindings | self.entry_bindings
+ def required_inputs(self):
+ return [o for o in self.entry_points if not o.optional]
+
+ @property
+ def optional_inputs(self):
+ return [o for o in self.entry_points if o.optional]
def __repr__(self):
# Only communicate the entry id
- ek = [eid for eid, _ in self.entry_bindings]
+ ek = [e["entryId"] for e in self.entry_points]
e = " ".join(ek)
_d = dict(k=self.__class__.__name__, i=self.idx,
d=self.display_name, b=len(self.bindings), e=e)
return "<{k} id={i} nbindings={b} entry bindings={e} >".format(**_d)
- def summary(self):
+ def summary(self, verbose=True):
outs = []
f = outs.append
- # out a list of tuples
- def _printer(xs):
- for a, b in xs:
- sx = ' -> '.join([str(a), str(b)])
- f(" " + sx)
+ def _format_inputs(inputs):
+ return ", ".join([ep.short_name for ep in inputs])
- f("Pipeline Summary")
- f("Pipeline Id: {}".format(self.pipeline_id))
- f("Name : {}".format(self.display_name))
- f("Description: {}".format(self.description))
- f("EntryPoints: {}".format(len(self.entry_bindings)))
- _printer(sorted(self.entry_bindings, key=lambda x: x[0]))
+ f("Workflow Summary")
+ f("Workflow Id : {}".format(self.pipeline_id))
+ f("Name : {}".format(self.display_name))
+ f("Description : {}".format(self.description))
+ f("Required Inputs: {}".format(_format_inputs(self.required_inputs)))
+ if len(self.optional_inputs) > 0:
+ f("Optional Inputs: {}".format(_format_inputs(self.optional_inputs)))
if self.tags:
- f("Tags : {} ".format(", ".join(list(set(self.tags)))))
+ f("Tags : {} ".format(", ".join(list(set(self.tags)))))
if len(self.task_options) > 0:
f("Task Options:")
- for option_id, value in self.iter_options():
- f(" {o} = {v}".format(o=option_id, v=value))
+ for opt in self.task_options:
+ f(" {o} = {v}".format(o=opt.option_id, v=opt.default))
+ if verbose:
+ f(" {n} ({t})".format(n=opt.name, t=opt.OPTION_TYPE_ID))
return "\n".join(outs)
-
- def iter_options(self):
- option_ids = sorted(self.task_options.keys())
- for option_id in option_ids:
- yield (option_id, self.task_options[option_id])
-
- @staticmethod
- def from_dict(d):
- bindings = {} # obsolete
- epoints = [(e["entryId"], e["fileTypeId"]) for e in d["entryPoints"]]
- # The pipeline instance only needs to the key-value pair
- task_options = {o["id"]: o["default"] for o in d['taskOptions']}
- return Pipeline(d['id'], d['name'], d['version'], d['description'],
- bindings, epoints, tags=d['tags'], task_options=task_options)
-
- @staticmethod
- def load_from_json(file_name):
- with open(file_name, "r") as json_in:
- return Pipeline.from_dict(json.loads(json_in.read()))
=====================================
pbcommand/models/report.py
=====================================
@@ -361,7 +361,7 @@ class Plot(BaseReportElement):
return self.PLOT_TYPE
def _get_attrs_simple(self):
- return ['image', 'caption', 'title', 'plotType']
+ return ['image', 'caption', 'title', 'plotType', 'thumbnail']
def _get_attrs_complex_list(self):
return []
@@ -869,10 +869,15 @@ class Report(BaseReportElement):
table = Table(table_id, title=title, columns=columns)
return table
+ def _to_sum(values):
+ if len(values) > 0 and isinstance(values[0], str):
+ return ",".join(values)
+ return sum(values)
+
def _sum_attributes(attributes_list):
d = _merge_attributes_d(attributes_list)
labels = _merge_attributes_names(attributes_list)
- return [Attribute(k, sum(values), name=labels[k])
+ return [Attribute(k, _to_sum(values), name=labels[k])
for k, values in d.items()]
def _merge_tables(tables):
=====================================
pbcommand/pb_io/common.py
=====================================
@@ -5,10 +5,11 @@ import warnings
from pbcommand.models import (PipelineChunk, PipelineDataStoreViewRules,
TaskOptionTypes, PacBioFloatChoiceOption,
- PacBioStringChoiceOption,
+ PacBioStringChoiceOption, PacBioFileOption,
PacBioIntChoiceOption, PacBioStringOption,
PacBioFloatOption, PacBioBooleanOption,
- PacBioIntOption, PipelinePreset)
+ PacBioIntOption, PipelinePreset, EntryPoint)
+from pbcommand.models.legacy import Pipeline
from pbcommand.schemas import validate_datastore_view_rules, validate_presets
from pbcommand import to_ascii, to_utf8
@@ -121,7 +122,8 @@ def __simple_option_by_type(
klass_map = {TaskOptionTypes.INT: PacBioIntOption,
TaskOptionTypes.FLOAT: PacBioFloatOption,
TaskOptionTypes.STR: PacBioStringOption,
- TaskOptionTypes.BOOL: PacBioBooleanOption}
+ TaskOptionTypes.BOOL: PacBioBooleanOption,
+ TaskOptionTypes.FILE: PacBioFileOption}
k = klass_map[option_type]
@@ -207,3 +209,18 @@ def load_pipeline_presets_from(d):
name=d.get('name', None),
description=d.get('description', None))
return presets
+
+
+ at _json_path_or_d
+def load_pipeline_interface_from(d):
+ bindings = {} # obsolete
+ epts = [EntryPoint.from_dict(d) for d in d["entryPoints"]]
+ opts = [pacbio_option_from_dict(o) for o in d["taskOptions"]]
+ return Pipeline(d['id'],
+ d['name'],
+ d['version'],
+ d['description'],
+ bindings,
+ epts,
+ tags=d['tags'],
+ task_options=opts)
=====================================
pbcommand/services/__init__.py
=====================================
@@ -1,8 +1,12 @@
from .models import (JobExeError, JobResult, LogLevels,
ServiceResourceTypes, JobTypes, JobStates,
- ServiceJob, ServiceEntryPoint)
+ ServiceJob, ServiceEntryPoint,
+ add_smrtlink_server_args)
# this module imports the models, so the model loading
# must be called first to avoid cyclic-dependencies
-from ._service_access_layer import ServiceAccessLayer, SmrtLinkAuthClient
+from ._service_access_layer import (ServiceAccessLayer,
+ SmrtLinkAuthClient,
+ get_smrtlink_client,
+ get_smrtlink_client_from_args)
# There's some crufty legacy naming. Using a cleaner model here
SmrtLinkClient = ServiceAccessLayer
=====================================
pbcommand/services/_service_access_layer.py
=====================================
@@ -205,23 +205,6 @@ def _transform_job_tasks(j):
return [JobTask.from_d(d) for d in j]
-def _import_dataset_by_type(dataset_type_or_id):
-
- if isinstance(dataset_type_or_id, DataSetFileType):
- ds_type_id = dataset_type_or_id.file_type_id
- else:
- ds_type_id = dataset_type_or_id
-
- def wrapper(total_url, path, headers, avoid_duplicate_import=False):
- _d = dict(datasetType=ds_type_id,
- path=path,
- avoidDuplicateImport=avoid_duplicate_import)
- return _process_rpost_with_transform(
- ServiceJob.from_d)(total_url, _d, headers)
-
- return wrapper
-
-
def _get_job_by_id_or_raise(sal, job_id, error_klass,
error_messge_extras=None):
job = sal.get_job_by_id(job_id)
@@ -711,25 +694,34 @@ class ServiceAccessLayer: # pragma: no cover
return self._get_job_resource_type(
JobTypes.MERGE_DS, job_id, ServiceResourceTypes.DATASTORE)
- def _import_dataset(self, dataset_type, path,
- avoid_duplicate_import=False):
+ def import_dataset(self,
+ path,
+ avoid_duplicate_import=False):
# This returns a job resource
url = self._to_url(
"{p}/{x}".format(x=JobTypes.IMPORT_DS, p=ServiceAccessLayer.ROOT_JOBS))
- return _import_dataset_by_type(dataset_type)(
- url, path, headers=self._get_headers(), avoid_duplicate_import=avoid_duplicate_import)
+ d = {
+ "path": path,
+ "avoidDuplicateImport": avoid_duplicate_import
+ }
+ return _process_rpost_with_transform(
+ ServiceJob.from_d)(url, d, headers=self._get_headers())
- def run_import_dataset_by_type(self, dataset_type, path_to_xml,
- avoid_duplicate_import=False):
- job_or_error = self._import_dataset(
- dataset_type,
+ def run_import_dataset(self,
+ path_to_xml,
+ avoid_duplicate_import=False):
+ job_or_error = self.import_dataset(
path_to_xml,
avoid_duplicate_import=avoid_duplicate_import)
- custom_err_msg = "Import {d} {p}".format(p=path_to_xml, d=dataset_type)
+ custom_err_msg = "Import {p}".format(p=path_to_xml)
job_id = _job_id_or_error(job_or_error, custom_err_msg=custom_err_msg)
return _block_for_job_to_complete(
self, job_id, sleep_time=self._sleep_time)
+ def run_import_dataset_by_type(self, dataset_type, path_to_xml,
+ avoid_duplicate_import=False):
+ return self.run_import_dataset(path_to_xml, avoid_duplicate_import)
+
def _run_import_and_block(self, func, path, time_out=None):
# func while be self.import_dataset_X
job_or_error = func(path)
@@ -739,28 +731,32 @@ class ServiceAccessLayer: # pragma: no cover
sleep_time=self._sleep_time)
def import_dataset_subread(self, path):
- return self._import_dataset(FileTypes.DS_SUBREADS, path)
+ log.warn("DEPRECATED METHOD")
+ return self.import_dataset(path)
def run_import_dataset_subread(self, path, time_out=10):
return self._run_import_and_block(
self.import_dataset_subread, path, time_out=time_out)
def import_dataset_hdfsubread(self, path):
- return self._import_dataset(FileTypes.DS_SUBREADS_H5, path)
+ log.warn("DEPRECATED METHOD")
+ return self.import_dataset(path)
def run_import_dataset_hdfsubread(self, path, time_out=10):
return self._run_import_and_block(
self.import_dataset_hdfsubread, path, time_out=time_out)
def import_dataset_reference(self, path):
- return self._import_dataset(FileTypes.DS_REF, path)
+ log.warn("DEPRECATED METHOD")
+ return self.import_dataset(path)
def run_import_dataset_reference(self, path, time_out=10):
return self._run_import_and_block(
self.import_dataset_reference, path, time_out=time_out)
def import_dataset_barcode(self, path):
- return self._import_dataset(FileTypes.DS_BARCODE, path)
+ log.warn("DEPRECATED METHOD")
+ return self.import_dataset(path)
def run_import_dataset_barcode(self, path, time_out=10):
return self._run_import_and_block(
@@ -1542,3 +1538,11 @@ def get_smrtlink_client(host, port, user=None, password=None, sleep_time=5): #
return SmrtLinkAuthClient(host, user, password, sleep_time=sleep_time)
else:
return ServiceAccessLayer(host, port, sleep_time=sleep_time)
+
+
+def get_smrtlink_client_from_args(args):
+ return get_smrtlink_client(
+ host=args.host,
+ port=args.port,
+ user=args.user,
+ password=args.password)
=====================================
pbcommand/services/job_poller.py
=====================================
@@ -9,7 +9,9 @@ import time
import json
import sys
-from requests.exceptions import HTTPError
+from urllib3.exceptions import ProtocolError
+
+from requests.exceptions import HTTPError, ConnectionError
from pbcommand.cli.core import get_default_argparser_with_base_opts, pacbio_args_runner
from pbcommand.services._service_access_layer import (get_smrtlink_client,
@@ -67,6 +69,14 @@ def poll_for_job_completion(job_id,
continue
else:
raise
+ except (ConnectionError, ProtocolError) as e:
+ log.warning("Connection error: {e}".format(e=str(e)))
+ log.info("Will retry in {d}s".format(d=retry_time))
+ time.sleep(retry_time)
+ # if a retryable error occurs, we increment the retry time
+ # up to a max of 30 minutes
+ retry_time = max(1800, retry_time + sleep_time)
+ continue
else:
# if request succeeded, reset the retry_time
auth_errors = 0
=====================================
setup.py
=====================================
@@ -10,12 +10,12 @@ test_deps = [
'pylint',
'pytest',
'pytest-cov',
- 'pytest-xdist',
+ 'pytest-xdist == 1.34.0',
]
setup(
name='pbcommand',
- version='2.1.1',
+ version='2.2.2',
author='Pacific Biosciences',
author_email='devnet at pacificbiosciences.com',
description='Library and Tools for interfacing with PacBio® data CLI tools',
=====================================
tests/test_models_legacy.py
=====================================
@@ -1,6 +1,7 @@
import tempfile
from pbcommand.models.legacy import Pipeline
+from pbcommand.pb_io.common import load_pipeline_interface_from
class TestLegacyModels:
@@ -60,6 +61,6 @@ class TestLegacyModels:
json_file = tempfile.NamedTemporaryFile(suffix=".json").name
with open(json_file, "w") as json_out:
json_out.write(pipeline_json)
- p = Pipeline.load_from_json(json_file)
+ p = load_pipeline_interface_from(json_file)
assert p.pipeline_id == "cromwell.workflows.dev_diagnostic_subreads"
s = p.summary()
=====================================
tests/test_models_report.py
=====================================
@@ -296,26 +296,30 @@ class TestReportModel:
EXPECTED_VALUES = {
"n_reads": 300,
"n_zmws": 60,
+ "sample": "Person1,Person2"
}
NAMES = {
"n_reads": "Number of reads",
- "n_zmws": "Number of ZMWs"
+ "n_zmws": "Number of ZMWs",
+ "sample": "Sample"
}
chunks = [
Report("pbcommand_test",
attributes=[
Attribute(id_="n_reads", value=50,
name="Number of reads"),
- Attribute(id_="n_zmws", value=10, name="Number of ZMWs")],
+ Attribute(id_="n_zmws", value=10, name="Number of ZMWs"),
+ Attribute(id_="sample", value="Person1", name="Sample")],
dataset_uuids=["12345"]),
Report("pbcommand_test",
attributes=[
Attribute(id_="n_reads", value=250,
name="Number of reads"),
- Attribute(id_="n_zmws", value=50, name="Number of ZMWs")]),
+ Attribute(id_="n_zmws", value=50, name="Number of ZMWs"),
+ Attribute(id_="sample", value="Person2", name="Sample")]),
]
r = Report.merge(chunks)
- assert [a.id for a in r.attributes] == ["n_reads", "n_zmws"]
+ assert [a.id for a in r.attributes] == ["n_reads", "n_zmws", "sample"]
assert r._dataset_uuids == ["12345"]
for attr in r.attributes:
assert attr.value == EXPECTED_VALUES[attr.id]
View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/8fca3229adf5cfc005c9373348f7b42529b96cd4...3e33ced73e5c5b53f57ac2c87faa0fa1278462a0
--
View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/8fca3229adf5cfc005c9373348f7b42529b96cd4...3e33ced73e5c5b53f57ac2c87faa0fa1278462a0
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20201207/742363bb/attachment-0001.html>
More information about the debian-med-commit
mailing list