[med-svn] [Git][med-team/python-pbcommand][master] 6 commits: routine-update: New upstream version

Andreas Tille (@tille) gitlab at salsa.debian.org
Fri Jul 29 11:06:53 BST 2022



Andreas Tille pushed to branch master at Debian Med / python-pbcommand


Commits:
a6eeb523 by Andreas Tille at 2022-07-29T11:55:08+02:00
routine-update: New upstream version

- - - - -
140ae055 by Andreas Tille at 2022-07-29T11:55:09+02:00
New upstream version 2.1.1+git20220616.3f2e6c2
- - - - -
aa5a80d2 by Andreas Tille at 2022-07-29T11:55:09+02:00
Update upstream source from tag 'upstream/2.1.1+git20220616.3f2e6c2'

Update to upstream version '2.1.1+git20220616.3f2e6c2'
with Debian dir 93c8fa8722084b11ba0d8498a0e67b34792385bb
- - - - -
57cc29fd by Andreas Tille at 2022-07-29T11:55:09+02:00
routine-update: Standards-Version: 4.6.1

- - - - -
ec6968fd by Andreas Tille at 2022-07-29T12:04:42+02:00
Force TMPDIR in autopkgtest

- - - - -
1c59ffdc by Andreas Tille at 2022-07-29T12:05:46+02:00
Upload to unstable

- - - - -


13 changed files:

- debian/changelog
- debian/control
- debian/tests/run-unit-test
- pbcommand/cli/core.py
- pbcommand/models/common.py
- pbcommand/models/report.py
- pbcommand/services/_service_access_layer.py
- pbcommand/services/job_poller.py
- pbcommand/services/resolver.py
- pbcommand/utils.py
- setup.py
- tests/test_models_report_table.py
- tests/test_utils.py


Changes:

=====================================
debian/changelog
=====================================
@@ -1,3 +1,11 @@
+python-pbcommand (2.1.1+git20220616.3f2e6c2-1) unstable; urgency=medium
+
+  * New upstream version
+  * Standards-Version: 4.6.1 (routine-update)
+  * Force TMPDIR in autopkgtest
+
+ -- Andreas Tille <tille at debian.org>  Fri, 29 Jul 2022 12:04:49 +0200
+
 python-pbcommand (2.1.1+git20201023.cc0ed3d-1) unstable; urgency=medium
 
   * Team upload.


=====================================
debian/control
=====================================
@@ -19,7 +19,7 @@ Build-Depends: debhelper-compat (= 13),
 Breaks: python-pbcommand
 Provides: python-pbcommand
 Replaces: python-pbcommand
-Standards-Version: 4.5.1
+Standards-Version: 4.6.1
 Vcs-Browser: https://salsa.debian.org/med-team/python-pbcommand
 Vcs-Git: https://salsa.debian.org/med-team/python-pbcommand.git
 Homepage: https://pbcommand.readthedocs.org/en/latest/


=====================================
debian/tests/run-unit-test
=====================================
@@ -15,4 +15,4 @@ cd "${AUTOPKGTEST_TMP}"
 
 gunzip -r *
 
-pytest-3
+TMPDIR=${AUTOPKGTEST_TMP} pytest-3


=====================================
pbcommand/cli/core.py
=====================================
@@ -28,7 +28,7 @@ import pbcommand
 from pbcommand.models import ResourceTypes, PacBioAlarm
 from pbcommand.models.report import Report, Attribute
 from pbcommand.common_options import add_base_options, add_nproc_option
-from pbcommand.utils import get_parsed_args_log_level
+from pbcommand.utils import get_parsed_args_log_level, get_peak_memory_usage
 
 
 def _add_version(p, version):
@@ -90,13 +90,14 @@ def get_default_argparser_with_base_opts(
     return p
 
 
-def write_task_report(run_time, nproc, exit_code):
+def write_task_report(run_time, nproc, exit_code, maxrss):
     attributes = [
         Attribute("host", value=os.uname()[1]),
         Attribute("system", value=os.uname()[0]),
         Attribute("nproc", value=nproc),
         Attribute("run_time", value=run_time),
-        Attribute("exit_code", value=exit_code)
+        Attribute("exit_code", value=exit_code),
+        Attribute("maxrss", value=maxrss)
     ]
     report = Report("workflow_task",
                     title="Workflow Task Report",
@@ -195,11 +196,14 @@ def _pacbio_main_runner(alog, setup_log_func, exe_main_func, *args, **kwargs):
         else:
             return_code = 2
 
-    _d = dict(r=return_code, s=run_time)
+    maxrss = get_peak_memory_usage()
     if is_cromwell_environment:
         alog.info("Writing task report to task-report.json")
-        write_task_report(run_time, getattr(pargs, "nproc", 1), return_code)
+        nproc = getattr(pargs, "nproc", 1)
+        write_task_report(run_time, nproc, return_code, maxrss)
+    _d = dict(r=return_code, s=run_time)
     if alog is not None:
+        alog.info(f"Max RSS (kB): {maxrss}")
         alog.info("exiting with return code {r} in {s:.2f} sec.".format(**_d))
     return return_code
 


=====================================
pbcommand/models/common.py
=====================================
@@ -24,7 +24,7 @@ log = logging.getLogger(__name__)
 REGISTERED_FILE_TYPES = {}
 
 # Light weight Dataset metatadata. Use pbcore for full dataset functionality
-DataSetMetaData = namedtuple("DataSetMetaData", 'uuid metatype')
+DataSetMetaData = namedtuple("DataSetMetaData", 'uuid metatype name')
 
 
 class PacBioNamespaces:
@@ -368,6 +368,7 @@ class FileTypes:
     SAM = FileType(to_file_ns('sam'), "alignments", "sam", MimeTypes.BINARY)
     VCF = FileType(to_file_ns('vcf'), "file", "vcf", MimeTypes.TXT)
     GFF = FileType(to_file_ns('gff'), "file", "gff", MimeTypes.TXT)
+    GTF = FileType(to_file_ns('gtf'), "file", "gtf", MimeTypes.TXT)
     BIGWIG = FileType(
         to_file_ns('bigwig'),
         "annotations",
@@ -490,6 +491,38 @@ class FileTypes:
         ".ngm",
         MimeTypes.BINARY)
 
+    I_TABIX = FileType(to_index_ns("Tabix"), "file", "tbi", MimeTypes.BINARY)
+
+    # GTF/BED index
+    I_PGI = FileType(to_index_ns("PigeonIndex"), "file", "pgi", MimeTypes.TXT)
+
+    # Reference transcript annotations
+    GTF_ANNOTATION = FileType(
+        "PacBio.ReferenceFile.ReferenceAnnotationFile",
+        "file",
+        "gtf",
+        MimeTypes.TXT)
+    POLYA_MOTIF = FileType(
+        "PacBio.ReferenceFile.PolyAMotifFile",
+        "file",
+        "txt",
+        MimeTypes.TXT)
+    BED_POLYA_PEAK = FileType(
+        "PacBio.ReferenceFile.PolyAPeakFile",
+        "file",
+        "bed",
+        MimeTypes.TXT)
+    BED_CAGE_PEAK = FileType(
+        "PacBio.ReferenceFile.CagePeakFile",
+        "file",
+        "bed",
+        MimeTypes.TXT)
+    INTROPOLIS = FileType(
+        "PacBio.ReferenceFile.IntropolisFile",
+        "file",
+        "tsv",
+        MimeTypes.TXT)
+
     # Fasta type files
     FASTA_BC = FileType(
         "PacBio.BarcodeFile.BarcodeFastaFile",


=====================================
pbcommand/models/report.py
=====================================
@@ -545,7 +545,7 @@ class Table(BaseReportElement):
         columns = [columns[col.id] for col in tables[0].columns]
         return Table(table_id, table_title, columns=columns)
 
-    def to_csv(self, file_name, delimiter=','):
+    def to_csv(self, file_name, delimiter=',', float_format=None):
         if len(self.columns) == 0:
             return ""
         for column in self.columns:
@@ -553,6 +553,14 @@ class Table(BaseReportElement):
                 raise ValueError("Column lengths differ ({i} versus {j}".format(
                                  i=len(column.values),
                                  j=len(self.columns[0].values)))
+
+        def _to_str(x):
+            if not float_format or not isinstance(x, float):
+                return str(x)
+            elif float_format.startswith("%"):
+                return float_format % x
+            elif float_format.startswith("{"):
+                return float_format.format(x)
         with open(file_name, "w") as csv_out:
             writer = csv.writer(
                 csv_out,
@@ -560,7 +568,7 @@ class Table(BaseReportElement):
                 lineterminator="\n")
             writer.writerow([c.header for c in self.columns])
             for i in range(len(self.columns[0].values)):
-                writer.writerow([str(c.values[i]) for c in self.columns])
+                writer.writerow([_to_str(c.values[i]) for c in self.columns])
 
     def to_columns_d(self):
         """


=====================================
pbcommand/services/_service_access_layer.py
=====================================
@@ -2,6 +2,17 @@
 Utils for Updating state/progress and results to WebServices
 """
 
+from .utils import to_sal_summary
+from pbcommand.pb_io import load_report_from
+from .models import (SMRTServiceBaseError,
+                     JobResult, JobStates, JobExeError, JobTypes,
+                     ServiceResourceTypes, ServiceJob, JobEntryPoint,
+                     JobTask)
+from pbcommand.utils import get_dataset_metadata
+from pbcommand.models import (FileTypes,
+                              DataSetFileType,
+                              DataStore,
+                              DataStoreFile)
 import base64
 import datetime
 import json
@@ -14,21 +25,11 @@ import warnings
 import pytz
 import requests
 from requests import RequestException
+from requests.exceptions import HTTPError, ConnectionError
+from urllib3.exceptions import ProtocolError, InsecureRequestWarning  # pylint: disable=import-error
 # To disable the ssl cert check warning
-from requests.packages.urllib3.exceptions import InsecureRequestWarning  # pylint: disable=import-error
-requests.packages.urllib3.disable_warnings(InsecureRequestWarning)  # pylint: disable=no-member
-
-from pbcommand.models import (FileTypes,
-                              DataSetFileType,
-                              DataStore,
-                              DataStoreFile)
-from pbcommand.utils import get_dataset_metadata
-from .models import (SMRTServiceBaseError,
-                     JobResult, JobStates, JobExeError, JobTypes,
-                     ServiceResourceTypes, ServiceJob, JobEntryPoint,
-                     JobTask)
-from pbcommand.pb_io import load_report_from
-from .utils import to_sal_summary
+import urllib3
+urllib3.disable_warnings(InsecureRequestWarning)  # pylint: disable=no-member
 
 
 log = logging.getLogger(__name__)
@@ -45,7 +46,10 @@ __all__ = [
 
 
 class Constants:
-    HEADERS = {'Content-type': 'application/json'}
+    HEADERS = {
+        'Content-type': 'application/json',
+        'Accept-Encoding': 'gzip, deflate'
+    }
 
 
 def __jsonable_request(request_method, headers):
@@ -100,7 +104,7 @@ def _process_rget(total_url, ignore_errors=False, headers=None):
     r = _get_requests(__get_headers(headers))(total_url)
     _parse_base_service_error(r)
     if not r.ok and not ignore_errors:
-        log.error(
+        log.warning(
             "Failed ({s}) GET to {x}".format(
                 x=total_url,
                 s=r.status_code))
@@ -109,6 +113,27 @@ def _process_rget(total_url, ignore_errors=False, headers=None):
     return j
 
 
+def _process_rget_or_empty(total_url, ignore_errors=False, headers=None):
+    """
+    Process get request and return JSON response if populated, otherwise None.
+    Raise if not successful
+    """
+    r = _get_requests(__get_headers(headers))(total_url)
+    if len(r.content) > 0:
+        _parse_base_service_error(r)
+    if not r.ok and not ignore_errors:
+        log.warning(
+            "Failed ({s}) GET to {x}".format(
+                x=total_url,
+                s=r.status_code))
+    r.raise_for_status()
+    if len(r.content) > 0:
+        object_d = r.json()
+        if len(object_d) > 0:
+            return object_d
+    return None
+
+
 def _process_rget_with_transform(func, ignore_errors=False):
     """Post process the JSON result (if successful) with F(json_d) -> T"""
     def wrapper(total_url, headers=None):
@@ -162,12 +187,12 @@ def __process_creatable_to_json(f):
         _parse_base_service_error(r)
         # FIXME This should be strict to only return a 201
         if r.status_code not in (200, 201, 202, 204):
-            log.error(
+            log.warning(
                 "Failed ({s} to call {u}".format(
                     u=total_url,
                     s=r.status_code))
-            log.error("payload")
-            log.error("\n" + pprint.pformat(payload_d))
+            log.warning("payload")
+            log.warning("\n" + pprint.pformat(payload_d))
         r.raise_for_status()
         j = r.json()
         return j
@@ -218,6 +243,7 @@ def _get_job_by_id_or_raise(sal, job_id, error_klass,
     return job
 
 
+# FIXME this overlaps with job_poller.py, which is more fault-tolerant
 def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
                                abort_on_interrupt=True,
                                retry_on_failure=False):
@@ -271,8 +297,8 @@ def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
                     sal, job_id, JobExeError, error_messge_extras=msg)
             except JobExeError as e:
                 if retry_on_failure:
-                    log.error(e)
-                    log.warn("Polling job {i} failed".format(i=job_id))
+                    log.warning(e)
+                    log.warning("Polling job {i} failed".format(i=job_id))
                     continue
                 else:
                     raise
@@ -381,13 +407,6 @@ def _to_relative_tasks_url(job_type):
     return wrapper
 
 
-def _show_deprecation_warning(msg):
-    if "PB_TEST_MODE" not in os.environ:
-        warnings.simplefilter('once', DeprecationWarning)
-        warnings.warn(msg, DeprecationWarning)
-        warnings.simplefilter('default', DeprecationWarning)  # reset filte
-
-
 class ServiceAccessLayer:  # pragma: no cover
     """
     General Client Access Layer for interfacing with the job types on
@@ -424,10 +443,6 @@ class ServiceAccessLayer:  # pragma: no cover
         self.debug = debug
         self._sleep_time = sleep_time
 
-        if self.__class__.__name__ == "ServiceAccessLayer":
-            _show_deprecation_warning(
-                "Please use the SmrtLinkAuthClient', direct localhost access is not publicly supported")
-
     def _get_headers(self):
         return Constants.HEADERS
 
@@ -493,7 +508,7 @@ class ServiceAccessLayer:  # pragma: no cover
             i=job_id,
             r=resource_type_id,
             p=ServiceAccessLayer.ROOT_JOBS)
-        return _process_rget_or_none(transform_func)(
+        return _process_rget_with_transform(transform_func)(
             _to_url(self.uri, "{p}/{t}/{i}/{r}".format(**_d)), headers=self._get_headers())
 
     def _get_jobs_by_job_type(self, job_type, query=None):
@@ -528,26 +543,21 @@ class ServiceAccessLayer:  # pragma: no cover
     def get_analysis_jobs(self, query=None):
         return self._get_jobs_by_job_type(JobTypes.ANALYSIS, query=query)
 
-    def get_pbsmrtpipe_jobs(self, query=None):
-        """:rtype: list[ServiceJob]"""
-        _show_deprecation_warning("Please use get_analysis_jobs() instead")
-        return self.get_analysis_jobs(query=query)
-
-    def get_cromwell_jobs(self):
+    def get_cromwell_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        return self._get_jobs_by_job_type(JobTypes.CROMWELL)
+        return self._get_jobs_by_job_type(JobTypes.CROMWELL, query=query)
 
-    def get_import_dataset_jobs(self):
+    def get_import_dataset_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        return self._get_jobs_by_job_type(JobTypes.IMPORT_DS)
+        return self._get_jobs_by_job_type(JobTypes.IMPORT_DS, query=query)
 
-    def get_merge_dataset_jobs(self):
+    def get_merge_dataset_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        return self._get_jobs_by_job_type(JobTypes.MERGE_DS)
+        return self._get_jobs_by_job_type(JobTypes.MERGE_DS, query=query)
 
-    def get_fasta_convert_jobs(self):
+    def get_fasta_convert_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        self._get_jobs_by_job_type(JobTypes.CONVERT_FASTA)
+        self._get_jobs_by_job_type(JobTypes.CONVERT_FASTA, query=query)
 
     def get_analysis_job_by_id(self, job_id):
         """Get an Analysis job by id or UUID or return None
@@ -560,13 +570,13 @@ class ServiceAccessLayer:  # pragma: no cover
         return self.get_job_by_type_and_id(JobTypes.IMPORT_DS, job_id)
 
     def get_analysis_job_datastore(self, job_id):
-        """Get DataStore output from (pbsmrtpipe) analysis job"""
+        """Get DataStore output from analysis job"""
         # this doesn't work the list is sli
         return self._get_job_resource_type_with_transform(
-            "pbsmrtpipe", job_id, ServiceResourceTypes.DATASTORE, _to_datastore)
+            "analysis", job_id, ServiceResourceTypes.DATASTORE, _to_datastore)
 
     def _to_dsf_id_url(self, job_id, dsf_uuid):
-        u = "/".join([ServiceAccessLayer.ROOT_JOBS, "pbsmrtpipe",
+        u = "/".join([ServiceAccessLayer.ROOT_JOBS, "analysis",
                       str(job_id), ServiceResourceTypes.DATASTORE, dsf_uuid])
         return _to_url(self.uri, u)
 
@@ -621,7 +631,7 @@ class ServiceAccessLayer:  # pragma: no cover
                     dsf_uuid, job_id))
 
     def get_analysis_job_reports(self, job_id):
-        """Get list of DataStore ReportFile types output from (pbsmrtpipe) analysis job"""
+        """Get list of DataStore ReportFile types output from analysis job"""
         return self._get_job_resource_type_with_transform(
             JobTypes.ANALYSIS, job_id, ServiceResourceTypes.REPORTS, _to_job_report_files)
 
@@ -769,27 +779,25 @@ class ServiceAccessLayer:  # pragma: no cover
 
         :rtype: JobResult
         """
-        dataset_meta_type = get_dataset_metadata(path)
+        dsmd = get_dataset_metadata(path)
+        result = self.search_dataset_by_uuid(dsmd.uuid)
 
-        result = self.get_dataset_by_uuid(dataset_meta_type.uuid,
-                                          ignore_errors=True)
         if result is None:
             log.info("Importing dataset {p}".format(p=path))
             job_result = self.run_import_dataset_by_type(
-                dataset_meta_type.metatype, path, avoid_duplicate_import=avoid_duplicate_import)
+                dsmd.metatype,
+                path,
+                avoid_duplicate_import=avoid_duplicate_import)
             log.info("Confirming database update")
             # validation 1: attempt to retrieve dataset info
-            result_new = self.get_dataset_by_uuid(dataset_meta_type.uuid)
+            result_new = self.get_dataset_by_uuid(dsmd.uuid)
             if result_new is None:
                 raise JobExeError(("Dataset {u} was imported but could " +
                                    "not be retrieved; this may indicate " +
-                                   "XML schema errors.").format(
-                    u=dataset_meta_type.uuid))
+                                   "XML schema errors.").format(u=dsmd.uuid))
             return job_result
         else:
-            log.info(
-                "{f} already imported. Skipping importing. {r}".format(
-                    r=result, f=dataset_meta_type.metatype))
+            log.info(f"Already imported: {result}")
             # need to clean this up
             return JobResult(self.get_job_by_id(result['jobId']), 0, "")
 
@@ -826,6 +834,14 @@ class ServiceAccessLayer:  # pragma: no cover
                                                p=ServiceAccessLayer.ROOT_DS)),
             headers=self._get_headers())
 
+    def search_dataset_by_uuid(self, uuid):
+        """
+        Better alternative to get_dataset_by_uuid, that does not trigger a 404
+        """
+        return _process_rget_or_empty(
+            _to_url(self.uri, f"{ServiceAccessLayer.ROOT_DS}/search/{uuid}"),
+            headers=self._get_headers())
+
     def get_dataset_by_id(self, dataset_type, int_or_uuid):
         """Get a Dataset using the DataSetMetaType and (int|uuid) of the dataset"""
         ds_endpoint = _get_endpoint_or_raise(dataset_type)
@@ -947,7 +963,7 @@ class ServiceAccessLayer:  # pragma: no cover
                                        task_options=(),
                                        workflow_options=(),
                                        tags=()):
-        """Creates and runs a pbsmrtpipe pipeline by pipeline template id
+        """Creates and runs an analysis workflow by pipeline template id
 
 
         :param tags: Tags should be a set of strings
@@ -1049,7 +1065,7 @@ class ServiceAccessLayer:  # pragma: no cover
     def terminate_job(self, job):
         """
         POST a terminate request appropriate to the job type.  Currently only
-        supported for pbsmrtpipe, cromwell, and analysis job types.
+        supported for cromwell, and analysis job types.
         """
         log.warn("Terminating job {i} ({u})".format(i=job.id, u=job.uuid))
         if job.external_job_id is not None:
@@ -1144,279 +1160,14 @@ class ServiceAccessLayer:  # pragma: no cover
         return _process_rpost_with_transform(ServiceJob.from_d)(
             u, job_options, headers=self._get_headers())
 
-
-def __run_and_ignore_errors(f, warn_message):
-    """
-    Black hole ignoring exceptions from a func with no-args and
-    logging the error has a warning.
-    """
-    try:
-        return f()
-    except Exception as e:
-        log.warn(warn_message + " {e}".format(e=e))
-
-
-def _run_func(f, warn_message, ignore_errors=True):
-    if ignore_errors:
-        return __run_and_ignore_errors(f, warn_message)
-    else:
-        return f()
-
-
-def log_pbsmrtpipe_progress(total_url, message, level, source_id, ignore_errors=True, headers=None):  # pragma: no cover
-    """Log the status of a pbsmrtpipe to SMRT Server"""
-    # Keeping this as public to avoid breaking pbsmrtpipe. The
-    # new public interface should be the JobServiceClient
-
-    # Need to clarify the model here. Trying to pass the most minimal
-    # data necessary to pbsmrtpipe.
-    _d = dict(message=message, level=level, sourceId=source_id)
-    warn_message = "Failed Request to {u} data: {d}".format(u=total_url, d=_d)
-
-    def f():
-        return _process_rpost(total_url, _d, headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors=ignore_errors)
-
-
-def add_datastore_file(total_url, datastore_file, ignore_errors=True, headers=None):  # pragma: no cover
-    """Add datastore to SMRT Server
-
-    :type datastore_file: DataStoreFile
-    """
-    # Keeping this as public to avoid breaking pbsmrtpipe. The
-    # new public interface should be the JobServiceClient
-    _d = datastore_file.to_dict()
-    warn_message = "Failed Request to {u} data: {d}.".format(u=total_url, d=_d)
-
-    def f():
-        return _process_rpost(total_url, _d, headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors=ignore_errors)
-
-
-def _create_job_task(job_tasks_url, create_job_task_record, ignore_errors=True, headers=None):  # pragma: no cover
-    """
-    :type create_job_task_record: CreateJobTaskRecord
-    :rtype: JobTask
-    """
-    warn_message = "Unable to create Task {c}".format(
-        c=repr(create_job_task_record))
-
-    def f():
-        return _process_rpost_with_transform(JobTask.from_d)(
-            job_tasks_url, create_job_task_record.to_dict(), headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors)
-
-
-def _update_job_task_state(task_url, update_job_task_record, ignore_errors=True, headers=None):  # pragma: no cover
-    """
-    :type update_job_task_record: UpdateJobTaskRecord
-    :rtype: JobTask
-    """
-    warn_message = "Unable to update Task {c}".format(
-        c=repr(update_job_task_record))
-
-    def f():
-        return _process_rput_with_transform(JobTask.from_d)(
-            task_url, update_job_task_record.to_dict(), headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors)
-
-
-def _update_datastore_file(datastore_url, uuid, path, file_size, set_is_active,
-                           ignore_errors=True, headers=None):  # pragma: no cover
-    warn_message = "Unable to update datastore file {u}".format(u=uuid)
-    total_url = "{b}/{u}".format(b=datastore_url, u=uuid)
-    d = {"fileSize": file_size, "path": path, "isActive": set_is_active}
-
-    def f():
-        return _process_rput(total_url, d, headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors)
-
-
-class CreateJobTaskRecord:
-
-    def __init__(self, task_uuid, task_id, task_type_id,
-                 name, state, created_at=None):
-        self.task_uuid = task_uuid
-        self.task_id = task_id
-        self.task_type_id = task_type_id
-        self.name = name
-        # this must be consistent with the EngineJob states in the scala code
-        self.state = state
-        # Note, the created_at timestamp must have the form
-        # 2016-02-18T23:24:46.569Z
-        # or
-        # 2016-02-18T15:24:46.569-08:00
-        self.created_at = datetime.datetime.now(
-            pytz.utc) if created_at is None else created_at
-
-    def __repr__(self):
-        _d = dict(k=self.__class__.__name__,
-                  u=self.task_uuid,
-                  i=self.task_id,
-                  n=self.name,
-                  s=self.state)
-        return "<{k} uuid:{u} ix:{i} state:{s} name:{n} >".format(**_d)
-
-    def to_dict(self):
-        return dict(uuid=self.task_uuid,
-                    taskId=self.task_id,
-                    taskTypeId=self.task_type_id,
-                    name=self.name,
-                    state=self.state,
-                    createdAt=self.created_at.isoformat())
-
-
-class UpdateJobTaskRecord:
-
-    def __init__(self, task_uuid, state, message, error_message=None):
-        """:type error_message: str | None"""
-        self.task_uuid = task_uuid
-        self.state = state
-        self.message = message
-        # detailed error message (e.g., terse stack trace)
-        self.error_message = error_message
-
-    @staticmethod
-    def from_error(task_uuid, state, message, error_message):
-        # require an detailed error message
-        return UpdateJobTaskRecord(task_uuid, state,
-                                   message,
-                                   error_message=error_message)
-
-    def __repr__(self):
-        _d = dict(k=self.__class__.__name__,
-                  i=self.task_uuid,
-                  s=self.state)
-        return "<{k} i:{i} state:{s} >".format(**_d)
-
-    def to_dict(self):
-        _d = dict(uuid=self.task_uuid,
-                  state=self.state,
-                  message=self.message)
-
-        # spray API is a little odd here. it will complain about
-        # Expected String as JsString, but got null
-        # even though the model is Option[String]
-        if self.error_message is not None:
-            _d['errorMessage'] = self.error_message
-
-        return _d
-
-
-class JobServiceClient:  # pragma: no cover
-    # Keeping this class private. It should only be used from pbsmrtpipe
-
-    def __init__(self, job_root_url, ignore_errors=False):
-        """
-
-        :param job_root_url: Full Root URL to the job
-        :type job_root_url: str
-
-        :param ignore_errors: Only log errors, don't not raise if a request fails. This is intended to be used in a fire-and-forget usecase
-        :type ignore_errors: bool
-
-        This hides the root location of the URL and hides
-        the job id (as an int or uuid)
-
-        The Url has the form:
-
-        http://localhost:8888/smrt-link/job-manager/jobs/pbsmrtpipe/1234
-
-        or
-
-        http://localhost:8888/smrt-link/job-manager/jobs/pbsmrtpipe/5d562c74-e452-11e6-8b96-3c15c2cc8f88
-        """
-        self.job_root_url = job_root_url
-        self.ignore_errors = ignore_errors
-
-    def __repr__(self):
-        _d = dict(k=self.__class__.__name__, u=self.job_root_url)
-        return "<{k} Job URL:{u} >".format(**_d)
-
-    def _get_headers(self):
-        return Constants.HEADERS
-
-    def to_url(self, segment):
-        return "{i}/{s}".format(i=self.job_root_url, s=segment)
-
-    @property
-    def log_url(self):
-        return self.to_url("log")
-
-    @property
-    def datastore_url(self):
-        return self.to_url("datastore")
-
-    @property
-    def tasks_url(self):
-        return self.to_url("tasks")
-
-    def get_task_url(self, task_uuid):
-        """
-        :param task_uuid: Task UUID
-        :return:
-        """
-        return self.to_url("tasks/{t}".format(t=task_uuid))
-
-    def log_workflow_progress(self, message, level,
-                              source_id, ignore_errors=True):
-        return log_pbsmrtpipe_progress(
-            self.log_url, message, level, source_id, ignore_errors=ignore_errors)
-
-    def add_datastore_file(self, datastore_file, ignore_errors=True):
-        return add_datastore_file(
-            self.datastore_url, datastore_file, ignore_errors=ignore_errors)
-
-    def update_datastore_file(
-            self, uuid, file_size=None, path=None, set_is_active=True, ignore_errors=True):
-        return _update_datastore_file(
-            self.datastore_url, uuid, path, file_size, set_is_active, ignore_errors)
-
-    def create_task(self, task_uuid, task_id,
-                    task_type_id, name, created_at=None):
-        """
-
-        :param task_uuid: Globally unique task id
-        :param task_id: Unique within respect to the job
-        :param task_type_id: ToolContract or task id (e.g., pbcommand.tasks.alpha)
-        :param name: Display name of task
-        :param created_at: time task was created at (will be set if current time if None)
-        """
-        r = CreateJobTaskRecord(task_uuid, task_id, task_type_id,
-                                name, JobStates.CREATED, created_at=created_at)
-
-        return _create_job_task(self.tasks_url, r)
-
-    def update_task_status(self, task_uuid, state,
-                           message, error_message=None):
-
-        task_url = self.get_task_url(task_uuid)
-
-        u = UpdateJobTaskRecord(
-            task_uuid,
-            state,
-            message,
-            error_message=error_message)
-
-        return _update_job_task_state(
-            task_url, u, ignore_errors=self.ignore_errors)
-
-    def update_task_to_failed(self, task_uuid, message,
-                              detailed_error_message):
-        task_url = self.get_task_url(task_uuid)
-        state = JobStates.FAILED
-
-        u = UpdateJobTaskRecord(task_uuid,
-                                state,
-                                message,
-                                error_message=detailed_error_message)
-
-        return _update_job_task_state(task_url, u)
+    def get_sl_api(self, path):
+        service_url = f"{self.uri}{path}"
+        t1 = time.time()
+        r = requests.get(service_url, headers=self._get_headers(), verify=False)
+        t2 = time.time()
+        log.info("Response time: {:.1f}s".format(t2 - t1))
+        r.raise_for_status()
+        return r.json()
 
 
 # -----------------------------------------------------------------------
@@ -1546,3 +1297,54 @@ def get_smrtlink_client_from_args(args):
         port=args.port,
         user=args.user,
         password=args.password)
+
+
+def run_client_with_retry(fx, host, port, user, password,
+                          sleep_time=60,
+                          retry_on=(500, 503)):
+    """
+    Obtain a services client and run the specified function on it - this can
+    be essentially any services call - and return the result.  If the query
+    fails due to 401 Unauthorized, the call will be retried with a new token.
+    HTTP 500 and 503 errors will be retried without re-authenticating.
+    """
+    def _get_client():
+        return get_smrtlink_client(host, port, user, password)
+    auth_errors = 0
+    started_at = time.time()
+    retry_time = sleep_time
+    client = _get_client()
+    while True:
+        try:
+            result = fx(client)
+        except HTTPError as e:
+            status = e.response.status_code
+            log.info("Got error {e} (code = {c})".format(e=str(e), c=status))
+            if status == 401:
+                auth_errors += 1
+                if auth_errors > 10:
+                    raise RuntimeError(
+                        "10 successive HTTP 401 errors, exiting")
+                log.warning("Authentication error, will retry with new token")
+                client = _get_client()
+                continue
+            elif status in retry_on:
+                log.warning("Got HTTP {c}, will retry in {d}s".format(
+                    c=status, d=retry_time))
+                time.sleep(retry_time)
+                # if a retryable error occurs, we increment the retry time
+                # up to a max of 30 minutes
+                retry_time = max(1800, retry_time + sleep_time)
+                continue
+            else:
+                raise
+        except (ConnectionError, ProtocolError) as e:
+            log.warning("Connection error: {e}".format(e=str(e)))
+            log.info("Will retry in {d}s".format(d=retry_time))
+            time.sleep(retry_time)
+            # if a retryable error occurs, we increment the retry time
+            # up to a max of 30 minutes
+            retry_time = max(1800, retry_time + sleep_time)
+            continue
+        else:
+            return result


=====================================
pbcommand/services/job_poller.py
=====================================
@@ -24,6 +24,8 @@ __version__ = "0.1"
 log = logging.getLogger(__name__)
 
 
+# FIXME this overlaps with run_client_with_retry, can we consolidate the
+# general error handling?
 def poll_for_job_completion(job_id,
                             host,
                             port,
@@ -40,6 +42,7 @@ def poll_for_job_completion(job_id,
     auth_errors = 0
     external_job_id = None
     LOG_INTERVAL = 600
+    SLEEP_TIME_MAX = 600
     i = 0
     try:
         client = _get_client()
@@ -51,12 +54,15 @@ def poll_for_job_completion(job_id,
                 job_json = _process_rget(url, headers=client._get_headers())
             except HTTPError as e:
                 status = e.response.status_code
-                log.info("Got error {e} (code = {c})".format(e=str(e), c=status))
+                log.info("Got error {e} (code = {c})".format(
+                    e=str(e), c=status))
                 if status == 401:
                     auth_errors += 1
                     if auth_errors > 10:
-                        raise RuntimeError("10 successive HTTP 401 errors, exiting")
-                    log.warning("Authentication error, will retry with new token")
+                        raise RuntimeError(
+                            "10 successive HTTP 401 errors, exiting")
+                    log.warning(
+                        "Authentication error, will retry with new token")
                     client = _get_client()
                     continue
                 elif status in retry_on:
@@ -100,6 +106,9 @@ def poll_for_job_completion(job_id,
                             "Exceeded runtime {r} of {t}".format(
                                 r=run_time, t=time_out))
                 time.sleep(sleep_time)
+                # after an hour we increase the wait
+                if run_time > 3600:
+                    sleep_time = min(SLEEP_TIME_MAX, sleep_time * 5)
     except KeyboardInterrupt:
         if abort_on_interrupt:
             client.terminate_job_id(job_id)


=====================================
pbcommand/services/resolver.py
=====================================
@@ -11,7 +11,7 @@ import os.path as op
 import sys
 
 from pbcommand.models.common import FileTypes
-from pbcommand.services._service_access_layer import get_smrtlink_client
+from pbcommand.services._service_access_layer import get_smrtlink_client, run_client_with_retry
 from pbcommand.utils import setup_log
 from pbcommand.cli import get_default_argparser_with_base_opts, pacbio_args_runner
 
@@ -80,16 +80,8 @@ def _find_alignments(datastore):
 
 
 class Resolver:
-    def __init__(self,
-                 host,
-                 port,
-                 user=None,
-                 password=None):
-        self._host = host
-        self._port = port
-        self._user = user
-        self._password = password
-        self._client = get_smrtlink_client(host, port, user, password)
+    def __init__(self, client):
+        self._client = client
 
     def _get_job_datastore_reports(self, job_id):
         datastore = self._client.get_analysis_job_datastore(job_id)
@@ -130,24 +122,31 @@ class Resolver:
 
 
 def run_args(args):
-    resolver = Resolver(args.host, args.port, args.user, args.password)
-    resource = None
-    if args.resource_type == ResourceTypes.JOB_PATH:
-        resource = resolver.resolve_job(args.job_id)
-    elif args.resource_type == ResourceTypes.ALIGNMENTS:
-        resource = resolver.resolve_alignments(args.job_id)
-    elif args.resource_type == ResourceTypes.PREASSEMBLY:
-        resource = resolver.resolve_preassembly_stats(args.job_id)
-    elif args.resource_type == ResourceTypes.POLISHED_ASSEMBLY:
-        resource = resolver.resolve_polished_assembly_stats(args.job_id)
-    elif args.resource_type == ResourceTypes.MAPPING_STATS:
-        resource = resolver.resolve_mapping_stats(args.job_id)
-    elif args.resource_type == ResourceTypes.SUBREADS_ENTRY:
-        resource = resolver.resolve_input_subreads(args.job_id)
-    else:
-        raise NotImplementedError(
-            "Can't retrieve resource type '%s'" %
-            args.resource_type)
+    def _run_resolver(client):
+        resolver = Resolver(client)
+        resource = None
+        if args.resource_type == ResourceTypes.JOB_PATH:
+            resource = resolver.resolve_job(args.job_id)
+        elif args.resource_type == ResourceTypes.ALIGNMENTS:
+            resource = resolver.resolve_alignments(args.job_id)
+        elif args.resource_type == ResourceTypes.PREASSEMBLY:
+            resource = resolver.resolve_preassembly_stats(args.job_id)
+        elif args.resource_type == ResourceTypes.POLISHED_ASSEMBLY:
+            resource = resolver.resolve_polished_assembly_stats(args.job_id)
+        elif args.resource_type == ResourceTypes.MAPPING_STATS:
+            resource = resolver.resolve_mapping_stats(args.job_id)
+        elif args.resource_type == ResourceTypes.SUBREADS_ENTRY:
+            resource = resolver.resolve_input_subreads(args.job_id)
+        else:
+            raise NotImplementedError(
+                "Can't retrieve resource type '%s'" % args.resource_type)
+        return resource
+
+    resource = run_client_with_retry(_run_resolver,
+                                     args.host,
+                                     args.port,
+                                     args.user,
+                                     args.password)
     print(resource)
     if args.make_symlink is not None:
         if op.exists(args.make_symlink):


=====================================
pbcommand/utils.py
=====================================
@@ -7,6 +7,7 @@ import functools
 import logging
 import logging.config
 import multiprocessing
+import resource
 import os
 import pprint
 import subprocess
@@ -470,16 +471,17 @@ def get_dataset_metadata(path):
     :raises: ValueError
     :return: DataSetMetaData
     """
-    uuid = mt = None
+    uuid = mt = name = None
     for event, element in ET.iterparse(path, events=("start",)):
         uuid = element.get("UniqueId")
         mt = element.get("MetaType")
+        name = element.get("Name")
         break
     else:
         raise ValueError(
             'Did not find events=("start",) in XML path={}'.format(path))
     if mt in FileTypes.ALL_DATASET_TYPES().keys():
-        return DataSetMetaData(uuid, mt)
+        return DataSetMetaData(uuid, mt, name)
     else:
         raise ValueError("Unsupported dataset type '{t}'".format(t=mt))
 
@@ -536,3 +538,11 @@ def pool_map(func, args, nproc):
         log.debug("computed_nproc=1, running serially")
         result = list(map(func, args))
     return result
+
+
+def get_peak_memory_usage():
+    try:
+        return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+    except Exception as e:
+        log.error(f"Failed to get ru_maxrss: {e}")
+        return None


=====================================
setup.py
=====================================
@@ -15,7 +15,7 @@ test_deps = [
 
 setup(
     name='pbcommand',
-    version='2.2.2',
+    version='2.4.0',
     author='Pacific Biosciences',
     author_email='devnet at pacificbiosciences.com',
     description='Library and Tools for interfacing with PacBio® data CLI tools',
@@ -45,5 +45,5 @@ setup(
         'interactive': [
             'prompt_toolkit',
         ]},
-    python_requires='>=3.7',
+    python_requires='>=3.9',
 )


=====================================
tests/test_models_report_table.py
=====================================
@@ -46,12 +46,14 @@ class TestBasicTable:
     def setup_method(self, method):
         self.columns = [Column('one', header="One"),
                         Column('two', header="Two"),
-                        Column('three', header="Three")]
+                        Column('three', header="Three"),
+                        Column('four', header="Four")]
         self.table = Table('my_table_with_values', columns=self.columns)
         datum = [
             ('one', list(range(3))),
             ('two', list('abc')),
-            ('three', 'file1 file2 file3'.split())
+            ('three', 'file1 file2 file3'.split()),
+            ('four', [3.14159, 2.72, 9.87654321])
         ]
         for k, values in datum:
             for value in values:
@@ -64,7 +66,7 @@ class TestBasicTable:
 
     def test_columns(self):
         """Test Columns"""
-        assert len(self.table.columns) == 3
+        assert len(self.table.columns) == 4
 
     def test_column_values(self):
         """Basic check for column values"""
@@ -80,7 +82,13 @@ class TestBasicTable:
         f = tempfile.NamedTemporaryFile(suffix=".csv").name
         self.table.to_csv(f)
         with open(f) as csv_out:
-            assert csv_out.read() == "One,Two,Three\n0,a,file1\n1,b,file2\n2,c,file3\n"
+            assert csv_out.read() == "One,Two,Three,Four\n0,a,file1,3.14159\n1,b,file2,2.72\n2,c,file3,9.87654321\n"
+        self.table.to_csv(f, float_format="%.4f")
+        with open(f) as csv_out:
+            assert csv_out.read() == "One,Two,Three,Four\n0,a,file1,3.1416\n1,b,file2,2.7200\n2,c,file3,9.8765\n"
+        self.table.to_csv(f, float_format="{:.2f}")
+        with open(f) as csv_out:
+            assert csv_out.read() == "One,Two,Three,Four\n0,a,file1,3.14\n1,b,file2,2.72\n2,c,file3,9.88\n"
 
 
 class TestTable:


=====================================
tests/test_utils.py
=====================================
@@ -105,6 +105,7 @@ class TestUtils:
         import pbtestdata
         md = get_dataset_metadata(pbtestdata.get_file("subreads-xml"))
         assert md.metatype == "PacBio.DataSet.SubreadSet"
+        assert md.name == "subreads-xml"
 
         from pbcore.io import SubreadSet
         ds = SubreadSet(pbtestdata.get_file("subreads-xml"))



View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/cc19c3b174f5a84e433a55d0509abd416fb5e183...1c59ffdcdb8136044f3cab638a3f38eede49d3cf

-- 
View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/compare/cc19c3b174f5a84e433a55d0509abd416fb5e183...1c59ffdcdb8136044f3cab638a3f38eede49d3cf
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20220729/75dd0d86/attachment-0001.htm>


More information about the debian-med-commit mailing list