[med-svn] [Git][med-team/python-pbcommand][master] 6 commits: routine-update: New upstream version
Andreas Tille (@tille)
gitlab at salsa.debian.org
Fri Jul 29 11:06:53 BST 2022
Andreas Tille pushed to branch master at Debian Med / python-pbcommand
Commits:
a6eeb523 by Andreas Tille at 2022-07-29T11:55:08+02:00
routine-update: New upstream version
- - - - -
140ae055 by Andreas Tille at 2022-07-29T11:55:09+02:00
New upstream version 2.1.1+git20220616.3f2e6c2
- - - - -
aa5a80d2 by Andreas Tille at 2022-07-29T11:55:09+02:00
Update upstream source from tag 'upstream/2.1.1+git20220616.3f2e6c2'
Update to upstream version '2.1.1+git20220616.3f2e6c2'
with Debian dir 93c8fa8722084b11ba0d8498a0e67b34792385bb
- - - - -
57cc29fd by Andreas Tille at 2022-07-29T11:55:09+02:00
routine-update: Standards-Version: 4.6.1
- - - - -
ec6968fd by Andreas Tille at 2022-07-29T12:04:42+02:00
Force TMPDIR in autopkgtest
- - - - -
1c59ffdc by Andreas Tille at 2022-07-29T12:05:46+02:00
Upload to unstable
- - - - -
13 changed files:
- debian/changelog
- debian/control
- debian/tests/run-unit-test
- pbcommand/cli/core.py
- pbcommand/models/common.py
- pbcommand/models/report.py
- pbcommand/services/_service_access_layer.py
- pbcommand/services/job_poller.py
- pbcommand/services/resolver.py
- pbcommand/utils.py
- setup.py
- tests/test_models_report_table.py
- tests/test_utils.py
Changes:
=====================================
debian/changelog
=====================================
@@ -1,3 +1,11 @@
+python-pbcommand (2.1.1+git20220616.3f2e6c2-1) unstable; urgency=medium
+
+ * New upstream version
+ * Standards-Version: 4.6.1 (routine-update)
+ * Force TMPDIR in autopkgtest
+
+ -- Andreas Tille <tille at debian.org> Fri, 29 Jul 2022 12:04:49 +0200
+
python-pbcommand (2.1.1+git20201023.cc0ed3d-1) unstable; urgency=medium
* Team upload.
=====================================
debian/control
=====================================
@@ -19,7 +19,7 @@ Build-Depends: debhelper-compat (= 13),
Breaks: python-pbcommand
Provides: python-pbcommand
Replaces: python-pbcommand
-Standards-Version: 4.5.1
+Standards-Version: 4.6.1
Vcs-Browser: https://salsa.debian.org/med-team/python-pbcommand
Vcs-Git: https://salsa.debian.org/med-team/python-pbcommand.git
Homepage: https://pbcommand.readthedocs.org/en/latest/
=====================================
debian/tests/run-unit-test
=====================================
@@ -15,4 +15,4 @@ cd "${AUTOPKGTEST_TMP}"
gunzip -r *
-pytest-3
+TMPDIR=${AUTOPKGTEST_TMP} pytest-3
=====================================
pbcommand/cli/core.py
=====================================
@@ -28,7 +28,7 @@ import pbcommand
from pbcommand.models import ResourceTypes, PacBioAlarm
from pbcommand.models.report import Report, Attribute
from pbcommand.common_options import add_base_options, add_nproc_option
-from pbcommand.utils import get_parsed_args_log_level
+from pbcommand.utils import get_parsed_args_log_level, get_peak_memory_usage
def _add_version(p, version):
@@ -90,13 +90,14 @@ def get_default_argparser_with_base_opts(
return p
-def write_task_report(run_time, nproc, exit_code):
+def write_task_report(run_time, nproc, exit_code, maxrss):
attributes = [
Attribute("host", value=os.uname()[1]),
Attribute("system", value=os.uname()[0]),
Attribute("nproc", value=nproc),
Attribute("run_time", value=run_time),
- Attribute("exit_code", value=exit_code)
+ Attribute("exit_code", value=exit_code),
+ Attribute("maxrss", value=maxrss)
]
report = Report("workflow_task",
title="Workflow Task Report",
@@ -195,11 +196,14 @@ def _pacbio_main_runner(alog, setup_log_func, exe_main_func, *args, **kwargs):
else:
return_code = 2
- _d = dict(r=return_code, s=run_time)
+ maxrss = get_peak_memory_usage()
if is_cromwell_environment:
alog.info("Writing task report to task-report.json")
- write_task_report(run_time, getattr(pargs, "nproc", 1), return_code)
+ nproc = getattr(pargs, "nproc", 1)
+ write_task_report(run_time, nproc, return_code, maxrss)
+ _d = dict(r=return_code, s=run_time)
if alog is not None:
+ alog.info(f"Max RSS (kB): {maxrss}")
alog.info("exiting with return code {r} in {s:.2f} sec.".format(**_d))
return return_code
=====================================
pbcommand/models/common.py
=====================================
@@ -24,7 +24,7 @@ log = logging.getLogger(__name__)
REGISTERED_FILE_TYPES = {}
# Light weight Dataset metatadata. Use pbcore for full dataset functionality
-DataSetMetaData = namedtuple("DataSetMetaData", 'uuid metatype')
+DataSetMetaData = namedtuple("DataSetMetaData", 'uuid metatype name')
class PacBioNamespaces:
@@ -368,6 +368,7 @@ class FileTypes:
SAM = FileType(to_file_ns('sam'), "alignments", "sam", MimeTypes.BINARY)
VCF = FileType(to_file_ns('vcf'), "file", "vcf", MimeTypes.TXT)
GFF = FileType(to_file_ns('gff'), "file", "gff", MimeTypes.TXT)
+ GTF = FileType(to_file_ns('gtf'), "file", "gtf", MimeTypes.TXT)
BIGWIG = FileType(
to_file_ns('bigwig'),
"annotations",
@@ -490,6 +491,38 @@ class FileTypes:
".ngm",
MimeTypes.BINARY)
+ I_TABIX = FileType(to_index_ns("Tabix"), "file", "tbi", MimeTypes.BINARY)
+
+ # GTF/BED index
+ I_PGI = FileType(to_index_ns("PigeonIndex"), "file", "pgi", MimeTypes.TXT)
+
+ # Reference transcript annotations
+ GTF_ANNOTATION = FileType(
+ "PacBio.ReferenceFile.ReferenceAnnotationFile",
+ "file",
+ "gtf",
+ MimeTypes.TXT)
+ POLYA_MOTIF = FileType(
+ "PacBio.ReferenceFile.PolyAMotifFile",
+ "file",
+ "txt",
+ MimeTypes.TXT)
+ BED_POLYA_PEAK = FileType(
+ "PacBio.ReferenceFile.PolyAPeakFile",
+ "file",
+ "bed",
+ MimeTypes.TXT)
+ BED_CAGE_PEAK = FileType(
+ "PacBio.ReferenceFile.CagePeakFile",
+ "file",
+ "bed",
+ MimeTypes.TXT)
+ INTROPOLIS = FileType(
+ "PacBio.ReferenceFile.IntropolisFile",
+ "file",
+ "tsv",
+ MimeTypes.TXT)
+
# Fasta type files
FASTA_BC = FileType(
"PacBio.BarcodeFile.BarcodeFastaFile",
=====================================
pbcommand/models/report.py
=====================================
@@ -545,7 +545,7 @@ class Table(BaseReportElement):
columns = [columns[col.id] for col in tables[0].columns]
return Table(table_id, table_title, columns=columns)
- def to_csv(self, file_name, delimiter=','):
+ def to_csv(self, file_name, delimiter=',', float_format=None):
if len(self.columns) == 0:
return ""
for column in self.columns:
@@ -553,6 +553,14 @@ class Table(BaseReportElement):
raise ValueError("Column lengths differ ({i} versus {j}".format(
i=len(column.values),
j=len(self.columns[0].values)))
+
+ def _to_str(x):
+ if not float_format or not isinstance(x, float):
+ return str(x)
+ elif float_format.startswith("%"):
+ return float_format % x
+ elif float_format.startswith("{"):
+ return float_format.format(x)
with open(file_name, "w") as csv_out:
writer = csv.writer(
csv_out,
@@ -560,7 +568,7 @@ class Table(BaseReportElement):
lineterminator="\n")
writer.writerow([c.header for c in self.columns])
for i in range(len(self.columns[0].values)):
- writer.writerow([str(c.values[i]) for c in self.columns])
+ writer.writerow([_to_str(c.values[i]) for c in self.columns])
def to_columns_d(self):
"""
=====================================
pbcommand/services/_service_access_layer.py
=====================================
@@ -2,6 +2,17 @@
Utils for Updating state/progress and results to WebServices
"""
+from .utils import to_sal_summary
+from pbcommand.pb_io import load_report_from
+from .models import (SMRTServiceBaseError,
+ JobResult, JobStates, JobExeError, JobTypes,
+ ServiceResourceTypes, ServiceJob, JobEntryPoint,
+ JobTask)
+from pbcommand.utils import get_dataset_metadata
+from pbcommand.models import (FileTypes,
+ DataSetFileType,
+ DataStore,
+ DataStoreFile)
import base64
import datetime
import json
@@ -14,21 +25,11 @@ import warnings
import pytz
import requests
from requests import RequestException
+from requests.exceptions import HTTPError, ConnectionError
+from urllib3.exceptions import ProtocolError, InsecureRequestWarning # pylint: disable=import-error
# To disable the ssl cert check warning
-from requests.packages.urllib3.exceptions import InsecureRequestWarning # pylint: disable=import-error
-requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # pylint: disable=no-member
-
-from pbcommand.models import (FileTypes,
- DataSetFileType,
- DataStore,
- DataStoreFile)
-from pbcommand.utils import get_dataset_metadata
-from .models import (SMRTServiceBaseError,
- JobResult, JobStates, JobExeError, JobTypes,
- ServiceResourceTypes, ServiceJob, JobEntryPoint,
- JobTask)
-from pbcommand.pb_io import load_report_from
-from .utils import to_sal_summary
+import urllib3
+urllib3.disable_warnings(InsecureRequestWarning) # pylint: disable=no-member
log = logging.getLogger(__name__)
@@ -45,7 +46,10 @@ __all__ = [
class Constants:
- HEADERS = {'Content-type': 'application/json'}
+ HEADERS = {
+ 'Content-type': 'application/json',
+ 'Accept-Encoding': 'gzip, deflate'
+ }
def __jsonable_request(request_method, headers):
@@ -100,7 +104,7 @@ def _process_rget(total_url, ignore_errors=False, headers=None):
r = _get_requests(__get_headers(headers))(total_url)
_parse_base_service_error(r)
if not r.ok and not ignore_errors:
- log.error(
+ log.warning(
"Failed ({s}) GET to {x}".format(
x=total_url,
s=r.status_code))
@@ -109,6 +113,27 @@ def _process_rget(total_url, ignore_errors=False, headers=None):
return j
+def _process_rget_or_empty(total_url, ignore_errors=False, headers=None):
+ """
+ Process get request and return JSON response if populated, otherwise None.
+ Raise if not successful
+ """
+ r = _get_requests(__get_headers(headers))(total_url)
+ if len(r.content) > 0:
+ _parse_base_service_error(r)
+ if not r.ok and not ignore_errors:
+ log.warning(
+ "Failed ({s}) GET to {x}".format(
+ x=total_url,
+ s=r.status_code))
+ r.raise_for_status()
+ if len(r.content) > 0:
+ object_d = r.json()
+ if len(object_d) > 0:
+ return object_d
+ return None
+
+
def _process_rget_with_transform(func, ignore_errors=False):
"""Post process the JSON result (if successful) with F(json_d) -> T"""
def wrapper(total_url, headers=None):
@@ -162,12 +187,12 @@ def __process_creatable_to_json(f):
_parse_base_service_error(r)
# FIXME This should be strict to only return a 201
if r.status_code not in (200, 201, 202, 204):
- log.error(
+ log.warning(
"Failed ({s} to call {u}".format(
u=total_url,
s=r.status_code))
- log.error("payload")
- log.error("\n" + pprint.pformat(payload_d))
+ log.warning("payload")
+ log.warning("\n" + pprint.pformat(payload_d))
r.raise_for_status()
j = r.json()
return j
@@ -218,6 +243,7 @@ def _get_job_by_id_or_raise(sal, job_id, error_klass,
return job
+# FIXME this overlaps with job_poller.py, which is more fault-tolerant
def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
abort_on_interrupt=True,
retry_on_failure=False):
@@ -271,8 +297,8 @@ def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
sal, job_id, JobExeError, error_messge_extras=msg)
except JobExeError as e:
if retry_on_failure:
- log.error(e)
- log.warn("Polling job {i} failed".format(i=job_id))
+ log.warning(e)
+ log.warning("Polling job {i} failed".format(i=job_id))
continue
else:
raise
@@ -381,13 +407,6 @@ def _to_relative_tasks_url(job_type):
return wrapper
-def _show_deprecation_warning(msg):
- if "PB_TEST_MODE" not in os.environ:
- warnings.simplefilter('once', DeprecationWarning)
- warnings.warn(msg, DeprecationWarning)
- warnings.simplefilter('default', DeprecationWarning) # reset filte
-
-
class ServiceAccessLayer: # pragma: no cover
"""
General Client Access Layer for interfacing with the job types on
@@ -424,10 +443,6 @@ class ServiceAccessLayer: # pragma: no cover
self.debug = debug
self._sleep_time = sleep_time
- if self.__class__.__name__ == "ServiceAccessLayer":
- _show_deprecation_warning(
- "Please use the SmrtLinkAuthClient', direct localhost access is not publicly supported")
-
def _get_headers(self):
return Constants.HEADERS
@@ -493,7 +508,7 @@ class ServiceAccessLayer: # pragma: no cover
i=job_id,
r=resource_type_id,
p=ServiceAccessLayer.ROOT_JOBS)
- return _process_rget_or_none(transform_func)(
+ return _process_rget_with_transform(transform_func)(
_to_url(self.uri, "{p}/{t}/{i}/{r}".format(**_d)), headers=self._get_headers())
def _get_jobs_by_job_type(self, job_type, query=None):
@@ -528,26 +543,21 @@ class ServiceAccessLayer: # pragma: no cover
def get_analysis_jobs(self, query=None):
return self._get_jobs_by_job_type(JobTypes.ANALYSIS, query=query)
- def get_pbsmrtpipe_jobs(self, query=None):
- """:rtype: list[ServiceJob]"""
- _show_deprecation_warning("Please use get_analysis_jobs() instead")
- return self.get_analysis_jobs(query=query)
-
- def get_cromwell_jobs(self):
+ def get_cromwell_jobs(self, query=None):
""":rtype: list[ServiceJob]"""
- return self._get_jobs_by_job_type(JobTypes.CROMWELL)
+ return self._get_jobs_by_job_type(JobTypes.CROMWELL, query=query)
- def get_import_dataset_jobs(self):
+ def get_import_dataset_jobs(self, query=None):
""":rtype: list[ServiceJob]"""
- return self._get_jobs_by_job_type(JobTypes.IMPORT_DS)
+ return self._get_jobs_by_job_type(JobTypes.IMPORT_DS, query=query)
- def get_merge_dataset_jobs(self):
+ def get_merge_dataset_jobs(self, query=None):
""":rtype: list[ServiceJob]"""
- return self._get_jobs_by_job_type(JobTypes.MERGE_DS)
+ return self._get_jobs_by_job_type(JobTypes.MERGE_DS, query=query)
- def get_fasta_convert_jobs(self):
+ def get_fasta_convert_jobs(self, query=None):
""":rtype: list[ServiceJob]"""
- self._get_jobs_by_job_type(JobTypes.CONVERT_FASTA)
+ self._get_jobs_by_job_type(JobTypes.CONVERT_FASTA, query=query)
def get_analysis_job_by_id(self, job_id):
"""Get an Analysis job by id or UUID or return None
@@ -560,13 +570,13 @@ class ServiceAccessLayer: # pragma: no cover
return self.get_job_by_type_and_id(JobTypes.IMPORT_DS, job_id)
def get_analysis_job_datastore(self, job_id):
- """Get DataStore output from (pbsmrtpipe) analysis job"""
+ """Get DataStore output from analysis job"""
# this doesn't work the list is sli
return self._get_job_resource_type_with_transform(
- "pbsmrtpipe", job_id, ServiceResourceTypes.DATASTORE, _to_datastore)
+ "analysis", job_id, ServiceResourceTypes.DATASTORE, _to_datastore)
def _to_dsf_id_url(self, job_id, dsf_uuid):
- u = "/".join([ServiceAccessLayer.ROOT_JOBS, "pbsmrtpipe",
+ u = "/".join([ServiceAccessLayer.ROOT_JOBS, "analysis",
str(job_id), ServiceResourceTypes.DATASTORE, dsf_uuid])
return _to_url(self.uri, u)
@@ -621,7 +631,7 @@ class ServiceAccessLayer: # pragma: no cover
dsf_uuid, job_id))
def get_analysis_job_reports(self, job_id):
- """Get list of DataStore ReportFile types output from (pbsmrtpipe) analysis job"""
+ """Get list of DataStore ReportFile types output from analysis job"""
return self._get_job_resource_type_with_transform(
JobTypes.ANALYSIS, job_id, ServiceResourceTypes.REPORTS, _to_job_report_files)
@@ -769,27 +779,25 @@ class ServiceAccessLayer: # pragma: no cover
:rtype: JobResult
"""
- dataset_meta_type = get_dataset_metadata(path)
+ dsmd = get_dataset_metadata(path)
+ result = self.search_dataset_by_uuid(dsmd.uuid)
- result = self.get_dataset_by_uuid(dataset_meta_type.uuid,
- ignore_errors=True)
if result is None:
log.info("Importing dataset {p}".format(p=path))
job_result = self.run_import_dataset_by_type(
- dataset_meta_type.metatype, path, avoid_duplicate_import=avoid_duplicate_import)
+ dsmd.metatype,
+ path,
+ avoid_duplicate_import=avoid_duplicate_import)
log.info("Confirming database update")
# validation 1: attempt to retrieve dataset info
- result_new = self.get_dataset_by_uuid(dataset_meta_type.uuid)
+ result_new = self.get_dataset_by_uuid(dsmd.uuid)
if result_new is None:
raise JobExeError(("Dataset {u} was imported but could " +
"not be retrieved; this may indicate " +
- "XML schema errors.").format(
- u=dataset_meta_type.uuid))
+ "XML schema errors.").format(u=dsmd.uuid))
return job_result
else:
- log.info(
- "{f} already imported. Skipping importing. {r}".format(
- r=result, f=dataset_meta_type.metatype))
+ log.info(f"Already imported: {result}")
# need to clean this up
return JobResult(self.get_job_by_id(result['jobId']), 0, "")
@@ -826,6 +834,14 @@ class ServiceAccessLayer: # pragma: no cover
p=ServiceAccessLayer.ROOT_DS)),
headers=self._get_headers())
+ def search_dataset_by_uuid(self, uuid):
+ """
+ Better alternative to get_dataset_by_uuid, that does not trigger a 404
+ """
+ return _process_rget_or_empty(
+ _to_url(self.uri, f"{ServiceAccessLayer.ROOT_DS}/search/{uuid}"),
+ headers=self._get_headers())
+
def get_dataset_by_id(self, dataset_type, int_or_uuid):
"""Get a Dataset using the DataSetMetaType and (int|uuid) of the dataset"""
ds_endpoint = _get_endpoint_or_raise(dataset_type)
@@ -947,7 +963,7 @@ class ServiceAccessLayer: # pragma: no cover
task_options=(),
workflow_options=(),
tags=()):
- """Creates and runs a pbsmrtpipe pipeline by pipeline template id
+ """Creates and runs an analysis workflow by pipeline template id
:param tags: Tags should be a set of strings
@@ -1049,7 +1065,7 @@ class ServiceAccessLayer: # pragma: no cover
def terminate_job(self, job):
"""
POST a terminate request appropriate to the job type. Currently only
- supported for pbsmrtpipe, cromwell, and analysis job types.
+ supported for cromwell, and analysis job types.
"""
log.warn("Terminating job {i} ({u})".format(i=job.id, u=job.uuid))
if job.external_job_id is not None:
@@ -1144,279 +1160,14 @@ class ServiceAccessLayer: # pragma: no cover
return _process_rpost_with_transform(ServiceJob.from_d)(
u, job_options, headers=self._get_headers())
-
-def __run_and_ignore_errors(f, warn_message):
- """
- Black hole ignoring exceptions from a func with no-args and
- logging the error has a warning.
- """
- try:
- return f()
- except Exception as e:
- log.warn(warn_message + " {e}".format(e=e))
-
-
-def _run_func(f, warn_message, ignore_errors=True):
- if ignore_errors:
- return __run_and_ignore_errors(f, warn_message)
- else:
- return f()
-
-
-def log_pbsmrtpipe_progress(total_url, message, level, source_id, ignore_errors=True, headers=None): # pragma: no cover
- """Log the status of a pbsmrtpipe to SMRT Server"""
- # Keeping this as public to avoid breaking pbsmrtpipe. The
- # new public interface should be the JobServiceClient
-
- # Need to clarify the model here. Trying to pass the most minimal
- # data necessary to pbsmrtpipe.
- _d = dict(message=message, level=level, sourceId=source_id)
- warn_message = "Failed Request to {u} data: {d}".format(u=total_url, d=_d)
-
- def f():
- return _process_rpost(total_url, _d, headers=headers)
-
- return _run_func(f, warn_message, ignore_errors=ignore_errors)
-
-
-def add_datastore_file(total_url, datastore_file, ignore_errors=True, headers=None): # pragma: no cover
- """Add datastore to SMRT Server
-
- :type datastore_file: DataStoreFile
- """
- # Keeping this as public to avoid breaking pbsmrtpipe. The
- # new public interface should be the JobServiceClient
- _d = datastore_file.to_dict()
- warn_message = "Failed Request to {u} data: {d}.".format(u=total_url, d=_d)
-
- def f():
- return _process_rpost(total_url, _d, headers=headers)
-
- return _run_func(f, warn_message, ignore_errors=ignore_errors)
-
-
-def _create_job_task(job_tasks_url, create_job_task_record, ignore_errors=True, headers=None): # pragma: no cover
- """
- :type create_job_task_record: CreateJobTaskRecord
- :rtype: JobTask
- """
- warn_message = "Unable to create Task {c}".format(
- c=repr(create_job_task_record))
-
- def f():
- return _process_rpost_with_transform(JobTask.from_d)(
- job_tasks_url, create_job_task_record.to_dict(), headers=headers)
-
- return _run_func(f, warn_message, ignore_errors)
-
-
-def _update_job_task_state(task_url, update_job_task_record, ignore_errors=True, headers=None): # pragma: no cover
- """
- :type update_job_task_record: UpdateJobTaskRecord
- :rtype: JobTask
- """
- warn_message = "Unable to update Task {c}".format(
- c=repr(update_job_task_record))
-
- def f():
- return _process_rput_with_transform(JobTask.from_d)(
- task_url, update_job_task_record.to_dict(), headers=headers)
-
- return _run_func(f, warn_message, ignore_errors)
-
-
-def _update_datastore_file(datastore_url, uuid, path, file_size, set_is_active,
- ignore_errors=True, headers=None): # pragma: no cover
- warn_message = "Unable to update datastore file {u}".format(u=uuid)
- total_url = "{b}/{u}".format(b=datastore_url, u=uuid)
- d = {"fileSize": file_size, "path": path, "isActive": set_is_active}
-
- def f():
- return _process_rput(total_url, d, headers=headers)
-
- return _run_func(f, warn_message, ignore_errors)
-
-
-class CreateJobTaskRecord:
-
- def __init__(self, task_uuid, task_id, task_type_id,
- name, state, created_at=None):
- self.task_uuid = task_uuid
- self.task_id = task_id
- self.task_type_id = task_type_id
- self.name = name
- # this must be consistent with the EngineJob states in the scala code
- self.state = state
- # Note, the created_at timestamp must have the form
- # 2016-02-18T23:24:46.569Z
- # or
- # 2016-02-18T15:24:46.569-08:00
- self.created_at = datetime.datetime.now(
- pytz.utc) if created_at is None else created_at
-
- def __repr__(self):
- _d = dict(k=self.__class__.__name__,
- u=self.task_uuid,
- i=self.task_id,
- n=self.name,
- s=self.state)
- return "<{k} uuid:{u} ix:{i} state:{s} name:{n} >".format(**_d)
-
- def to_dict(self):
- return dict(uuid=self.task_uuid,
- taskId=self.task_id,
- taskTypeId=self.task_type_id,
- name=self.name,
- state=self.state,
- createdAt=self.created_at.isoformat())
-
-
-class UpdateJobTaskRecord:
-
- def __init__(self, task_uuid, state, message, error_message=None):
- """:type error_message: str | None"""
- self.task_uuid = task_uuid
- self.state = state
- self.message = message
- # detailed error message (e.g., terse stack trace)
- self.error_message = error_message
-
- @staticmethod
- def from_error(task_uuid, state, message, error_message):
- # require an detailed error message
- return UpdateJobTaskRecord(task_uuid, state,
- message,
- error_message=error_message)
-
- def __repr__(self):
- _d = dict(k=self.__class__.__name__,
- i=self.task_uuid,
- s=self.state)
- return "<{k} i:{i} state:{s} >".format(**_d)
-
- def to_dict(self):
- _d = dict(uuid=self.task_uuid,
- state=self.state,
- message=self.message)
-
- # spray API is a little odd here. it will complain about
- # Expected String as JsString, but got null
- # even though the model is Option[String]
- if self.error_message is not None:
- _d['errorMessage'] = self.error_message
-
- return _d
-
-
-class JobServiceClient: # pragma: no cover
- # Keeping this class private. It should only be used from pbsmrtpipe
-
- def __init__(self, job_root_url, ignore_errors=False):
- """
-
- :param job_root_url: Full Root URL to the job
- :type job_root_url: str
-
- :param ignore_errors: Only log errors, don't not raise if a request fails. This is intended to be used in a fire-and-forget usecase
- :type ignore_errors: bool
-
- This hides the root location of the URL and hides
- the job id (as an int or uuid)
-
- The Url has the form:
-
- http://localhost:8888/smrt-link/job-manager/jobs/pbsmrtpipe/1234
-
- or
-
- http://localhost:8888/smrt-link/job-manager/jobs/pbsmrtpipe/5d562c74-e452-11e6-8b96-3c15c2cc8f88
- """
- self.job_root_url = job_root_url
- self.ignore_errors = ignore_errors
-
- def __repr__(self):
- _d = dict(k=self.__class__.__name__, u=self.job_root_url)
- return "<{k} Job URL:{u} >".format(**_d)
-
- def _get_headers(self):
- return Constants.HEADERS
-
- def to_url(self, segment):
- return "{i}/{s}".format(i=self.job_root_url, s=segment)
-
- @property
- def log_url(self):
- return self.to_url("log")
-
- @property
- def datastore_url(self):
- return self.to_url("datastore")
-
- @property
- def tasks_url(self):
- return self.to_url("tasks")
-
- def get_task_url(self, task_uuid):
- """
- :param task_uuid: Task UUID
- :return:
- """
- return self.to_url("tasks/{t}".format(t=task_uuid))
-
- def log_workflow_progress(self, message, level,
- source_id, ignore_errors=True):
- return log_pbsmrtpipe_progress(
- self.log_url, message, level, source_id, ignore_errors=ignore_errors)
-
- def add_datastore_file(self, datastore_file, ignore_errors=True):
- return add_datastore_file(
- self.datastore_url, datastore_file, ignore_errors=ignore_errors)
-
- def update_datastore_file(
- self, uuid, file_size=None, path=None, set_is_active=True, ignore_errors=True):
- return _update_datastore_file(
- self.datastore_url, uuid, path, file_size, set_is_active, ignore_errors)
-
- def create_task(self, task_uuid, task_id,
- task_type_id, name, created_at=None):
- """
-
- :param task_uuid: Globally unique task id
- :param task_id: Unique within respect to the job
- :param task_type_id: ToolContract or task id (e.g., pbcommand.tasks.alpha)
- :param name: Display name of task
- :param created_at: time task was created at (will be set if current time if None)
- """
- r = CreateJobTaskRecord(task_uuid, task_id, task_type_id,
- name, JobStates.CREATED, created_at=created_at)
-
- return _create_job_task(self.tasks_url, r)
-
- def update_task_status(self, task_uuid, state,
- message, error_message=None):
-
- task_url = self.get_task_url(task_uuid)
-
- u = UpdateJobTaskRecord(
- task_uuid,
- state,
- message,
- error_message=error_message)
-
- return _update_job_task_state(
- task_url, u, ignore_errors=self.ignore_errors)
-
- def update_task_to_failed(self, task_uuid, message,
- detailed_error_message):
- task_url = self.get_task_url(task_uuid)
- state = JobStates.FAILED
-
- u = UpdateJobTaskRecord(task_uuid,
- state,
- message,
- error_message=detailed_error_message)
-
- return _update_job_task_state(task_url, u)
+ def get_sl_api(self, path):
+ service_url = f"{self.uri}{path}"
+ t1 = time.time()
+ r = requests.get(service_url, headers=self._get_headers(), verify=False)
+ t2 = time.time()
+ log.info("Response time: {:.1f}s".format(t2 - t1))
+ r.raise_for_status()
+ return r.json()
# -----------------------------------------------------------------------
@@ -1546,3 +1297,54 @@ def get_smrtlink_client_from_args(args):
port=args.port,
user=args.user,
password=args.password)
+
+
+def run_client_with_retry(fx, host, port, user, password,
+ sleep_time=60,
+ retry_on=(500, 503)):
+ """
+ Obtain a services client and run the specified function on it - this can
+ be essentially any services call - and return the result. If the query
+ fails due to 401 Unauthorized, the call will be retried with a new token.
+ HTTP 500 and 503 errors will be retried without re-authenticating.
+ """
+ def _get_client():
+ return get_smrtlink_client(host, port, user, password)
+ auth_errors = 0
+ started_at = time.time()
+ retry_time = sleep_time
+ client = _get_client()
+ while True:
+ try:
+ result = fx(client)
+ except HTTPError as e:
+ status = e.response.status_code
+ log.info("Got error {e} (code = {c})".format(e=str(e), c=status))
+ if status == 401:
+ auth_errors += 1
+ if auth_errors > 10:
+ raise RuntimeError(
+ "10 successive HTTP 401 errors, exiting")
+ log.warning("Authentication error, will retry with new token")
+ client = _get_client()
+ continue
+ elif status in retry_on:
+ log.warning("Got HTTP {c}, will retry in {d}s".format(
+ c=status, d=retry_time))
+ time.sleep(retry_time)
+ # if a retryable error occurs, we increment the retry time
+ # up to a max of 30 minutes
+ retry_time = max(1800, retry_time + sleep_time)
+ continue
+ else:
+ raise
+ except (ConnectionError, ProtocolError) as e:
+ log.warning("Connection error: {e}".format(e=str(e)))
+ log.info("Will retry in {d}s".format(d=retry_time))
+ time.sleep(retry_time)
+ # if a retryable error occurs, we increment the retry time
+ # up to a max of 30 minutes
+ retry_time = max(1800, retry_time + sleep_time)
+ continue
+ else:
+ return result
=====================================
pbcommand/services/job_poller.py
=====================================
@@ -24,6 +24,8 @@ __version__ = "0.1"
log = logging.getLogger(__name__)
+# FIXME this overlaps with run_client_with_retry, can we consolidate the
+# general error handling?
def poll_for_job_completion(job_id,
host,
port,
@@ -40,6 +42,7 @@ def poll_for_job_completion(job_id,
auth_errors = 0
external_job_id = None
LOG_INTERVAL = 600
+ SLEEP_TIME_MAX = 600
i = 0
try:
client = _get_client()
@@ -51,12 +54,15 @@ def poll_for_job_completion(job_id,
job_json = _process_rget(url, headers=client._get_headers())
except HTTPError as e:
status = e.response.status_code
- log.info("Got error {e} (code = {c})".format(e=str(e), c=status))
+ log.info("Got error {e} (code = {c})".format(
+ e=str(e), c=status))
if status == 401:
auth_errors += 1
if auth_errors > 10:
- raise RuntimeError("10 successive HTTP 401 errors, exiting")
- log.warning("Authentication error, will retry with new token")
+ raise RuntimeError(
+ "10 successive HTTP 401 errors, exiting")
+ log.warning(
+ "Authentication error, will retry with new token")
client = _get_client()
continue
elif status in retry_on:
@@ -100,6 +106,9 @@ def poll_for_job_completion(job_id,
"Exceeded runtime {r} of {t}".format(
r=run_time, t=time_out))
time.sleep(sleep_time)
+ # after an hour we increase the wait
+ if run_time > 3600:
+ sleep_time = min(SLEEP_TIME_MAX, sleep_time * 5)
except KeyboardInterrupt:
if abort_on_interrupt:
client.terminate_job_id(job_id)
=====================================
pbcommand/services/resolver.py
=====================================
@@ -11,7 +11,7 @@ import os.path as op
import sys
from pbcommand.models.common import FileTypes
-from pbcommand.services._service_access_layer import get_smrtlink_client
+from pbcommand.services._service_access_layer import get_smrtlink_client, run_client_with_retry
from pbcommand.utils import setup_log
from pbcommand.cli import get_default_argparser_with_base_opts, pacbio_args_runner
@@ -80,16 +80,8 @@ def _find_alignments(datastore):
class Resolver:
- def __init__(self,
- host,
- port,
- user=None,
- password=None):
- self._host = host
- self._port = port
- self._user = user
- self._password = password
- self._client = get_smrtlink_client(host, port, user, password)
+ def __init__(self, client):
+ self._client = client
def _get_job_datastore_reports(self, job_id):
datastore = self._client.get_analysis_job_datastore(job_id)
@@ -130,24 +122,31 @@ class Resolver:
def run_args(args):
- resolver = Resolver(args.host, args.port, args.user, args.password)
- resource = None
- if args.resource_type == ResourceTypes.JOB_PATH:
- resource = resolver.resolve_job(args.job_id)
- elif args.resource_type == ResourceTypes.ALIGNMENTS:
- resource = resolver.resolve_alignments(args.job_id)
- elif args.resource_type == ResourceTypes.PREASSEMBLY:
- resource = resolver.resolve_preassembly_stats(args.job_id)
- elif args.resource_type == ResourceTypes.POLISHED_ASSEMBLY:
- resource = resolver.resolve_polished_assembly_stats(args.job_id)
- elif args.resource_type == ResourceTypes.MAPPING_STATS:
- resource = resolver.resolve_mapping_stats(args.job_id)
- elif args.resource_type == ResourceTypes.SUBREADS_ENTRY:
- resource = resolver.resolve_input_subreads(args.job_id)
- else:
- raise NotImplementedError(
- "Can't retrieve resource type '%s'" %
- args.resource_type)
+ def _run_resolver(client):
+ resolver = Resolver(client)
+ resource = None
+ if args.resource_type == ResourceTypes.JOB_PATH:
+ resource = resolver.resolve_job(args.job_id)
+ elif args.resource_type == ResourceTypes.ALIGNMENTS:
+ resource = resolver.resolve_alignments(args.job_id)
+ elif args.resource_type == ResourceTypes.PREASSEMBLY:
+ resource = resolver.resolve_preassembly_stats(args.job_id)
+ elif args.resource_type == ResourceTypes.POLISHED_ASSEMBLY:
+ resource = resolver.resolve_polished_assembly_stats(args.job_id)
+ elif args.resource_type == ResourceTypes.MAPPING_STATS:
+ resource = resolver.resolve_mapping_stats(args.job_id)
+ elif args.resource_type == ResourceTypes.SUBREADS_ENTRY:
+ resource = resolver.resolve_input_subreads(args.job_id)
+ else:
+ raise NotImplementedError(
+ "Can't retrieve resource type '%s'" % args.resource_type)
+ return resource
+
+ resource = run_client_with_retry(_run_resolver,
+ args.host,
+ args.port,
+ args.user,
+ args.password)
print(resource)
if args.make_symlink is not None:
if op.exists(args.make_symlink):
=====================================
pbcommand/utils.py
=====================================
@@ -7,6 +7,7 @@ import functools
import logging
import logging.config
import multiprocessing
+import resource
import os
import pprint
import subprocess
@@ -470,16 +471,17 @@ def get_dataset_metadata(path):
:raises: ValueError
:return: DataSetMetaData
"""
- uuid = mt = None
+ uuid = mt = name = None
for event, element in ET.iterparse(path, events=("start",)):
uuid = element.get("UniqueId")
mt = element.get("MetaType")
+ name = element.get("Name")
break
else:
raise ValueError(
'Did not find events=("start",) in XML path={}'.format(path))
if mt in FileTypes.ALL_DATASET_TYPES().keys():
- return DataSetMetaData(uuid, mt)
+ return DataSetMetaData(uuid, mt, name)
else:
raise ValueError("Unsupported dataset type '{t}'".format(t=mt))
@@ -536,3 +538,11 @@ def pool_map(func, args, nproc):
log.debug("computed_nproc=1, running serially")
result = list(map(func, args))
return result
+
+
+def get_peak_memory_usage():
+ try:
+ return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+ except Exception as e:
+ log.error(f"Failed to get ru_maxrss: {e}")
+ return None
=====================================
setup.py
=====================================
@@ -15,7 +15,7 @@ test_deps = [
setup(
name='pbcommand',
- version='2.2.2',
+ version='2.4.0',
author='Pacific Biosciences',
author_email='devnet at pacificbiosciences.com',
description='Library and Tools for interfacing with PacBio® data CLI tools',
@@ -45,5 +45,5 @@ setup(
'interactive': [
'prompt_toolkit',
]},
- python_requires='>=3.7',
+ python_requires='>=3.9',
)
=====================================
tests/test_models_report_table.py
=====================================
@@ -46,12 +46,14 @@ class TestBasicTable:
def setup_method(self, method):
self.columns = [Column('one', header="One"),
Column('two', header="Two"),
- Column('three', header="Three")]
+ Column('three', header="Three"),
+ Column('four', header="Four")]
self.table = Table('my_table_with_values', columns=self.columns)
datum = [
('one', list(range(3))),
('two', list('abc')),
- ('three', 'file1 file2 file3'.split())
+ ('three', 'file1 file2 file3'.split()),
+ ('four', [3.14159, 2.72, 9.87654321])
]
for k, values in datum:
for value in values:
@@ -64,7 +66,7 @@ class TestBasicTable:
def test_columns(self):
"""Test Columns"""
- assert len(self.table.columns) == 3
+ assert len(self.table.columns) == 4
def test_column_values(self):
"""Basic check for column values"""
@@ -80,7 +82,13 @@ class TestBasicTable:
f = tempfile.NamedTemporaryFile(suffix=".csv").name
self.table.to_csv(f)
with open(f) as csv_out:
- assert csv_out.read() == "One,Two,Three\n0,a,file1\n1,b,file2\n2,c,file3\n"
+ assert csv_out.read() == "One,Two,Three,Four\n0,a,file1,3.14159\n1,b,file2,2.72\n2,c,file3,9.87654321\n"
+ self.table.to_csv(f, float_format="%.4f")
+ with open(f) as csv_out:
+ assert csv_out.read() == "One,Two,Three,Four\n0,a,file1,3.1416\n1,b,file2,2.7200\n2,c,file3,9.8765\n"
+ self.table.to_csv(f, float_format="{:.2f}")
+ with open(f) as csv_out:
+ assert csv_out.read() == "One,Two,Three,Four\n0,a,file1,3.14\n1,b,file2,2.72\n2,c,file3,9.88\n"
class TestTable:
=====================================
tests/test_utils.py
=====================================
@@ -105,6 +105,7 @@ class TestUtils:
import pbtestdata
md = get_dataset_metadata(pbtestdata.get_file("subreads-xml"))
assert md.metatype == "PacBio.DataSet.SubreadSet"
+ assert md.name == "subreads-xml"
from pbcore.io import SubreadSet
ds = SubreadSet(pbtestdata.get_file("subreads-xml"))
View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/cc19c3b174f5a84e433a55d0509abd416fb5e183...1c59ffdcdb8136044f3cab638a3f38eede49d3cf
--
View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/cc19c3b174f5a84e433a55d0509abd416fb5e183...1c59ffdcdb8136044f3cab638a3f38eede49d3cf
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20220729/75dd0d86/attachment-0001.htm>
More information about the debian-med-commit
mailing list