[med-svn] [Git][med-team/python-pbcommand][upstream] New upstream version 1.1.1+git20200313.2a40099

Steffen Möller gitlab at salsa.debian.org
Sat Mar 21 18:47:36 GMT 2020



Steffen Möller pushed to branch upstream at Debian Med / python-pbcommand


Commits:
4889d35f by Steffen Moeller at 2020-03-21T19:40:02+01:00
New upstream version 1.1.1+git20200313.2a40099
- - - - -


5 changed files:

- pbcommand/create_bundle_manifest.py
- pbcommand/services/_service_access_layer.py
- + pbcommand/services/job_poller.py
- pbcommand/services/models.py
- setup.py


Changes:

=====================================
pbcommand/create_bundle_manifest.py
=====================================
@@ -5,6 +5,7 @@
 import argparse
 import datetime
 import json
+import os.path as op
 import os
 import subprocess
 import sys
@@ -25,7 +26,8 @@ def get_parser():
     p = argparse.ArgumentParser(description=desc,
                                 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
     p.add_argument("bundle_id", help="ID of the software bundle")
-    p.add_argument("version_txt", help="Path to Manifest.xml")
+    p.add_argument("version",
+                   help="Version string or path to filename that contains it")
     p.add_argument("-o", dest="output_manifest_xml", default="manifest.xml",
                    help="Path to the version.txt file. Must have a single line with Major.Minor.Tiny format")
     p.add_argument(
@@ -47,12 +49,12 @@ def get_parser():
 
 def git_short_sha():
     args = "git rev-parse --short HEAD".split()
-    return subprocess.check_output(args).strip()
+    return subprocess.check_output(args, encoding="utf-8").strip()
 
 
 def git_branch():
     args = "git rev-parse --abbrev-ref HEAD".split()
-    return subprocess.check_output(args).strip()
+    return subprocess.check_output(args, encoding="utf-8").strip()
 
 
 def get_bamboo_buildnumber(default=0):
@@ -81,11 +83,13 @@ def get_version(major, minor, tiny):
     return to_semver(major, minor, tiny, git_sha, build_number=None)
 
 
-def read_version_txt(path):
-    with open(path, 'r') as f:
-        x = f.readline()
-
-    major, minor, patch = [int(i) for i in x.split(".")][:3]
+def read_version_txt(version):
+    if op.isfile(version):
+        with open(version, 'r') as f:
+            x = f.readline()
+    else:
+        x = version
+    major, minor, patch = [int(i) for i in x.split(".")[:3]]
     return major, minor, patch
 
 
@@ -137,10 +141,10 @@ def write_pacbio_manifest_json(bundle_id, version, name, desc, output_json):
                 indent=True))
 
 
-def runner(bundle_id, version_txt, output_manifest_xml,
+def runner(bundle_id, version, output_manifest_xml,
            pacbio_manifest_json, author, name, desc):
 
-    major, minor, patch = read_version_txt(version_txt)
+    major, minor, patch = read_version_txt(version)
     sem_ver = get_version(major, minor, patch)
     branch = git_branch()
     other = get_bamboo_buildnumber()
@@ -177,7 +181,7 @@ def main(argv_):
     p = get_parser()
     args = p.parse_args(argv_)
     return runner(args.bundle_id,
-                  args.version_txt,
+                  args.version,
                   args.output_manifest_xml,
                   args.pacbio_manifest_json,
                   args.author,


=====================================
pbcommand/services/_service_access_layer.py
=====================================
@@ -236,7 +236,8 @@ def _get_job_by_id_or_raise(sal, job_id, error_klass,
 
 
 def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
-                               abort_on_interrupt=True):
+                               abort_on_interrupt=True,
+                               retry_on_failure=False):
     """
     Waits for job to complete
 
@@ -280,8 +281,18 @@ def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
             log.debug(msg)
             # making the exceptions different to distinguish between an initial
             # error and a "polling" error. Adding some msg details
-            job = _get_job_by_id_or_raise(
-                sal, job_id, JobExeError, error_messge_extras=msg)
+            # FIXME this should distinguish between failure modes - an HTTP 503
+            # or 401 is different from a 404 in this context
+            try:
+                job = _get_job_by_id_or_raise(
+                    sal, job_id, JobExeError, error_messge_extras=msg)
+            except JobExeError as e:
+                if retry_on_failure:
+                    log.error(e)
+                    log.warn("Polling job {i} failed".format(i=job_id))
+                    continue
+                else:
+                    raise
 
             # FIXME, there's currently not a good way to get errors for jobs
             job_result = JobResult(job, run_time, "")
@@ -988,7 +999,8 @@ class ServiceAccessLayer:  # pragma: no cover
                                     workflow_options=(),
                                     time_out=JOB_DEFAULT_TIMEOUT,
                                     tags=(),
-                                    abort_on_interrupt=True):
+                                    abort_on_interrupt=True,
+                                    retry_on_failure=False):
         """Blocks and runs a job with a timeout"""
 
         job_or_error = self.create_by_pipeline_template_id(
@@ -1003,9 +1015,12 @@ class ServiceAccessLayer:  # pragma: no cover
         custom_err_msg = "Job {n} args: {a}".format(n=name, a=_d)
 
         job_id = _job_id_or_error(job_or_error, custom_err_msg=custom_err_msg)
-        return _block_for_job_to_complete(self, job_id, time_out=time_out,
+        return _block_for_job_to_complete(self,
+                                          job_id,
+                                          time_out=time_out,
                                           sleep_time=self._sleep_time,
-                                          abort_on_interrupt=abort_on_interrupt)
+                                          abort_on_interrupt=abort_on_interrupt,
+                                          retry_on_failure=retry_on_failure)
 
     def run_cromwell_workflow(self,
                               name,


=====================================
pbcommand/services/job_poller.py
=====================================
@@ -0,0 +1,160 @@
+#!/usr/bin/env python3
+
+"""
+SMRT Link job poller with retry mechanism
+"""
+
+import logging
+import time
+import json
+import sys
+
+from requests.exceptions import HTTPError
+
+from pbcommand.cli.core import get_default_argparser_with_base_opts, pacbio_args_runner
+from pbcommand.services._service_access_layer import (get_smrtlink_client,
+                                                      _to_url,
+                                                      _process_rget)
+from pbcommand.services.models import add_smrtlink_server_args, JobExeError, JobStates, ServiceJob
+from pbcommand.utils import setup_log
+
+__version__ = "0.1"
+log = logging.getLogger(__name__)
+
+
+def poll_for_job_completion(job_id,
+                            host,
+                            port,
+                            user,
+                            password,
+                            time_out=None,
+                            sleep_time=60,
+                            retry_on=(),
+                            abort_on_interrupt=False):
+    def _get_client():
+        return get_smrtlink_client(host, port, user, password)
+    started_at = time.time()
+    retry_time = sleep_time
+    auth_errors = 0
+    external_job_id = None
+    LOG_INTERVAL = 600
+    i = 0
+    try:
+        client = _get_client()
+        while True:
+            i += 1
+            job_uri = "/smrt-link/job-manager/jobs/analysis/{}".format(job_id)
+            url = _to_url(client.uri, job_uri)
+            try:
+                job_json = _process_rget(url, headers=client._get_headers())
+            except HTTPError as e:
+                status = e.response.status_code
+                log.info("Got error {e} (code = {c})".format(e=str(e), c=status))
+                if status == 401:
+                    auth_errors += 1
+                    if auth_errors > 10:
+                        raise RuntimeError("10 successive HTTP 401 errors, exiting")
+                    log.warning("Authentication error, will retry with new token")
+                    client = _get_client()
+                    continue
+                elif status in retry_on:
+                    log.warning("Got HTTP {c}, will retry in {d}s".format(
+                        c=status, d=retry_time))
+                    time.sleep(retry_time)
+                    # if a retryable error occurs, we increment the retry time
+                    # up to a max of 30 minutes
+                    retry_time = max(1800, retry_time + sleep_time)
+                    continue
+                else:
+                    raise
+            else:
+                # if request succeeded, reset the retry_time
+                auth_errors = 0
+                retry_time = sleep_time
+                run_time = time.time() - started_at
+                job = ServiceJob.from_d(job_json)
+                if external_job_id is None and job.external_job_id is not None:
+                    external_job_id = job.external_job_id
+                    log.info("Cromwell workflow ID is %s", external_job_id)
+                if job.state in JobStates.ALL_COMPLETED or sleep_time == 0:
+                    return job_json
+                msg = "Running pipeline {n} (job {j}) state: {s} runtime:{r:.2f} sec {i} iteration".format(
+                    n=job.name, j=job.id, s=job.state, r=run_time, i=i)
+                if run_time % LOG_INTERVAL < sleep_time:
+                    log.info(msg)
+                else:
+                    log.debug(msg)
+                if time_out is not None:
+                    if run_time > time_out:
+                        raise JobExeError(
+                            "Exceeded runtime {r} of {t}".format(
+                                r=run_time, t=time_out))
+                time.sleep(sleep_time)
+    except KeyboardInterrupt:
+        if abort_on_interrupt:
+            client.terminate_job_id(job_id)
+        raise
+
+
+def run_args(args):
+    d = poll_for_job_completion(
+        args.job_id,
+        args.host,
+        args.port,
+        args.user,
+        args.password,
+        time_out=args.max_time,
+        sleep_time=args.poll_interval,
+        retry_on=args.retry_on,
+        abort_on_interrupt=False)
+    job = ServiceJob.from_d(d)
+    log.info(str(job))
+    if args.json is not None:
+        with open(args.json, "wt") as json_out:
+            json_out.write(json.dumps(d))
+            log.info("Wrote job JSON to {f}".format(f=args.json))
+    if job.state in JobStates.ALL_FAILED:
+        return 1
+    return 0
+
+
+def _get_parser():
+    p = get_default_argparser_with_base_opts(
+        description=__doc__,
+        version=__version__,
+        default_level="INFO")
+    p.add_argument("job_id", help="SMRT Link Job ID (or UUID)")
+    add_smrtlink_server_args(p)
+    p.add_argument("--retry-on",
+                   action="store",
+                   type=lambda arg: [int(x) for x in arg.split(",")],
+                   default=[],
+                   help="HTTP error codes to retry")
+    p.add_argument("--json",
+                   action="store",
+                   default=None,
+                   help="Name of output file to write")
+    p.add_argument("--max-time",
+                   action="store",
+                   type=int,
+                   default=None,
+                   help="Max time to wait before aborting")
+    p.add_argument("--poll-interval",
+                   action="store",
+                   type=int,
+                   default=60,
+                   help="Time to sleep between polling for job state.  If set to zero, the program will exit immediately after getting job status, regardless of state.")
+    return p
+
+
+def main(argv=sys.argv):
+    return pacbio_args_runner(
+        argv=argv[1:],
+        parser=_get_parser(),
+        args_runner_func=run_args,
+        alog=log,
+        setup_log_func=setup_log)
+
+
+if __name__ == "__main__":
+    sys.exit(main(sys.argv))


=====================================
pbcommand/services/models.py
=====================================
@@ -5,6 +5,7 @@ Services Specific Data Models
 import json
 import uuid
 from collections import namedtuple
+import os
 
 import iso8601
 from requests.exceptions import RequestException
@@ -404,11 +405,14 @@ class JobStates:
     RUNNING = "RUNNING"
     FAILED = "FAILED"
     SUCCESSFUL = "SUCCESSFUL"
+    TERMINATED = "TERMINATED"
+    ABORTED = "ABORTED"
 
     ALL = (RUNNING, CREATED, FAILED, SUCCESSFUL, SUBMITTED)
 
     # End points
-    ALL_COMPLETED = (FAILED, SUCCESSFUL)
+    ALL_COMPLETED = (FAILED, SUCCESSFUL, TERMINATED, ABORTED)
+    ALL_FAILED = (FAILED, TERMINATED, ABORTED)
 
 
 class JobTypes:
@@ -432,3 +436,28 @@ class ServiceResourceTypes:
     REPORTS = "reports"
     DATASTORE = "datastore"
     ENTRY_POINTS = "entry-points"
+
+
+def add_smrtlink_server_args(p):
+    DEFAULT_HOST = os.environ.get("PB_SERVICE_HOST", "localhost")
+    DEFAULT_PORT = int(os.environ.get("PB_SERVICE_PORT", 8000))
+    DEFAULT_USER = os.environ.get("PB_SERVICE_AUTH_USER", None)
+    DEFAULT_PASSWORD = os.environ.get("PB_SERVICE_AUTH_PASSWORD", None)
+    p.add_argument("--host",
+                   action="store",
+                   default=DEFAULT_HOST,
+                   help="SL Server Hostname")
+    p.add_argument("--port",
+                   action="store",
+                   type=int,
+                   default=DEFAULT_PORT,
+                   help="SL Server Port")
+    p.add_argument("--user",
+                   action="store",
+                   default=DEFAULT_USER,
+                   help="SL Server User Name")
+    p.add_argument("--password",
+                   action="store",
+                   default=DEFAULT_PASSWORD,
+                   help="SL Server Password")
+    return p


=====================================
setup.py
=====================================
@@ -15,7 +15,7 @@ test_deps = [
 
 setup(
     name='pbcommand',
-    version='2.0.0',
+    version='2.0.1',
     author='Pacific Biosciences',
     author_email='devnet at pacificbiosciences.com',
     description='Library and Tools for interfacing with PacBio® data CLI tools',



View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/commit/4889d35f4466be41ace14ba4aca3a1814b62d61d

-- 
View it on GitLab: https://salsa.debian.org/med-team/python-pbcommand/-/commit/4889d35f4466be41ace14ba4aca3a1814b62d61d
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20200321/4c647c23/attachment-0001.html>


More information about the debian-med-commit mailing list