[med-svn] [cwltool] 02/06: New upstream version 1.0.20170111193653

Michael Crusoe misterc-guest at moszumanska.debian.org
Fri Jan 13 05:32:32 UTC 2017


This is an automated email from the git hooks/post-receive script.

misterc-guest pushed a commit to branch master
in repository cwltool.

commit 0489a77125a330eb7e5e781cea2593f8a219aca0
Author: Michael R. Crusoe <michael.crusoe at gmail.com>
Date:   Wed Jan 11 22:35:11 2017 -0800

    New upstream version 1.0.20170111193653
---
 PKG-INFO                      |  2 +-
 cwltool.egg-info/PKG-INFO     |  2 +-
 cwltool.egg-info/pbr.json     |  2 +-
 cwltool.egg-info/requires.txt |  6 ++--
 cwltool/draft2tool.py         |  5 +--
 cwltool/factory.py            | 17 +++++++--
 cwltool/job.py                |  2 +-
 cwltool/load_tool.py          |  4 +--
 cwltool/main.py               | 65 +++++++++++++++++++---------------
 cwltool/pathmapper.py         |  6 ++--
 cwltool/process.py            |  4 +--
 cwltool/resolver.py           |  8 +++--
 cwltool/stdfsaccess.py        |  3 +-
 cwltool/workflow.py           | 82 +++++++++++++++++++++++++++----------------
 setup.cfg                     |  2 +-
 setup.py                      |  7 ++--
 16 files changed, 132 insertions(+), 85 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index 1681910..5cae014 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: cwltool
-Version: 1.0.20161216212910
+Version: 1.0.20170111193653
 Summary: Common workflow language reference implementation
 Home-page: https://github.com/common-workflow-language/cwltool
 Author: Common workflow language working group
diff --git a/cwltool.egg-info/PKG-INFO b/cwltool.egg-info/PKG-INFO
index 1681910..5cae014 100644
--- a/cwltool.egg-info/PKG-INFO
+++ b/cwltool.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: cwltool
-Version: 1.0.20161216212910
+Version: 1.0.20170111193653
 Summary: Common workflow language reference implementation
 Home-page: https://github.com/common-workflow-language/cwltool
 Author: Common workflow language working group
diff --git a/cwltool.egg-info/pbr.json b/cwltool.egg-info/pbr.json
index 8f35d5c..2a5bfd6 100644
--- a/cwltool.egg-info/pbr.json
+++ b/cwltool.egg-info/pbr.json
@@ -1 +1 @@
-{"is_release": false, "git_version": "ab9e86e"}
\ No newline at end of file
+{"is_release": false, "git_version": "92d59e6"}
\ No newline at end of file
diff --git a/cwltool.egg-info/requires.txt b/cwltool.egg-info/requires.txt
index 961c503..0ff00f7 100644
--- a/cwltool.egg-info/requires.txt
+++ b/cwltool.egg-info/requires.txt
@@ -1,8 +1,8 @@
 setuptools
 requests >= 1.0
-ruamel.yaml >= 0.12.4, < 0.12.5
+ruamel.yaml >= 0.12.4
 rdflib >= 4.2.0, < 4.3.0
 shellescape >= 3.4.1, < 3.5
-schema-salad >= 2.1.20161216210732, < 3
+schema-salad >= 2.2.20170111180227, < 3
 typing >= 3.5.2, < 3.6
-cwltest >= 1.0.20160907111242
+cwltest >= 1.0.20161227194859
diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py
index 0a5b5ad..9ca34d6 100644
--- a/cwltool/draft2tool.py
+++ b/cwltool/draft2tool.py
@@ -13,6 +13,7 @@ import errno
 
 import avro.schema
 import schema_salad.validate as validate
+from schema_salad.ref_resolver import file_uri, uri_file_path
 import shellescape
 from typing import Any, Callable, cast, Generator, Text, Union
 
@@ -94,11 +95,11 @@ def revmap_file(builder, outdir, f):
 
     split = urlparse.urlsplit(outdir)
     if not split.scheme:
-        outdir = "file://" + outdir
+        outdir = file_uri(str(outdir))
 
     if "location" in f:
         if f["location"].startswith("file://"):
-            path = f["location"][7:]
+            path = uri_file_path(f["location"])
             revmap_f = builder.pathmapper.reversemap(path)
             if revmap_f:
                 f["location"] = revmap_f[1]
diff --git a/cwltool/factory.py b/cwltool/factory.py
index 1b25a0b..be2468b 100644
--- a/cwltool/factory.py
+++ b/cwltool/factory.py
@@ -3,10 +3,17 @@ from . import load_tool
 from . import workflow
 import os
 from .process import Process
-from typing import Any, Text, Union
+from typing import Any, Text, Union, Tuple
 from typing import Callable as tCallable
 import argparse
 
+class WorkflowStatus(Exception):
+    def __init__(self, out, status):
+        # type: (Dict[Text,Any], Text) -> None
+        super(WorkflowStatus, self).__init__("Completed %s" % status)
+        self.out = out
+        self.status = status
+
 class Callable(object):
     def __init__(self, t, factory):  # type: (Process, Factory) -> None
         self.t = t
@@ -16,13 +23,17 @@ class Callable(object):
         # type: (**Any) -> Union[Text, Dict[Text, Text]]
         execkwargs = self.factory.execkwargs.copy()
         execkwargs["basedir"] = os.getcwd()
-        return self.factory.executor(self.t, kwargs, **execkwargs)
+        out, status = self.factory.executor(self.t, kwargs, **execkwargs)
+        if status != "success":
+            raise WorkflowStatus(out, status)
+        else:
+            return out
 
 class Factory(object):
     def __init__(self, makeTool=workflow.defaultMakeTool,
                  executor=main.single_job_executor,
                  **execkwargs):
-        # type: (tCallable[[Dict[Text, Any], Any], Process],tCallable[...,Union[Text,Dict[Text,Text]]], **Any) -> None
+        # type: (tCallable[[Dict[Text, Any], Any], Process],tCallable[...,Tuple[Dict[Text,Any], Text]], **Any) -> None
         self.makeTool = makeTool
         self.executor = executor
         self.execkwargs = execkwargs
diff --git a/cwltool/job.py b/cwltool/job.py
index 4aca3d5..8b331a8 100644
--- a/cwltool/job.py
+++ b/cwltool/job.py
@@ -314,7 +314,7 @@ class CommandLineJob(object):
         if processStatus != "success":
             _logger.warn(u"[job %s] completed %s", self.name, processStatus)
         else:
-            _logger.debug(u"[job %s] completed %s", self.name, processStatus)
+            _logger.info(u"[job %s] completed %s", self.name, processStatus)
 
         if _logger.isEnabledFor(logging.DEBUG):
             _logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4))
diff --git a/cwltool/load_tool.py b/cwltool/load_tool.py
index f2b3dbb..3070f58 100644
--- a/cwltool/load_tool.py
+++ b/cwltool/load_tool.py
@@ -12,7 +12,7 @@ from ruamel.yaml.comments import CommentedSeq, CommentedMap
 from avro.schema import Names
 import requests.sessions
 
-from schema_salad.ref_resolver import Loader, Fetcher
+from schema_salad.ref_resolver import Loader, Fetcher, file_uri
 import schema_salad.validate as validate
 from schema_salad.validate import ValidationException
 import schema_salad.schema as schema
@@ -42,7 +42,7 @@ def fetch_document(argsworkflow,   # type: Union[Text, dict[Text, Any]]
         if split.scheme:
             uri = argsworkflow
         elif os.path.exists(os.path.abspath(argsworkflow)):
-            uri = "file://" + os.path.abspath(argsworkflow)
+            uri = file_uri(str(os.path.abspath(argsworkflow)))
         elif resolver:
             uri = resolver(document_loader, argsworkflow)
 
diff --git a/cwltool/main.py b/cwltool/main.py
index a8279bb..75e2611 100755
--- a/cwltool/main.py
+++ b/cwltool/main.py
@@ -18,7 +18,7 @@ import requests
 from typing import (Union, Any, AnyStr, cast, Callable, Dict, Sequence, Text,
     Tuple, Type, IO)
 
-from schema_salad.ref_resolver import Loader, Fetcher
+from schema_salad.ref_resolver import Loader, Fetcher, file_uri, uri_file_path
 import schema_salad.validate as validate
 import schema_salad.jsonld_context
 import schema_salad.makedoc
@@ -165,9 +165,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Will be passed to `docker run` as the '--net' "
                         "parameter. Implies '--enable-net'.")
 
-    parser.add_argument("--on-error", type=Text,
+    parser.add_argument("--on-error", type=str,
                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
-                        "Default is 'stop.", default="stop")
+                        "Default is 'stop'.", default="stop", choices=("stop", "continue"))
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--compute-checksum", action="store_true", default=True,
@@ -187,16 +187,12 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
 
 def single_job_executor(t, job_order_object, **kwargs):
-    # type: (Process, Dict[Text, Any], **Any) -> Union[Text, Dict[Text, Text]]
+    # type: (Process, Dict[Text, Any], **Any) -> Tuple[Dict[Text, Any], Text]
     final_output = []
     final_status = []
 
     def output_callback(out, processStatus):
         final_status.append(processStatus)
-        if processStatus == "success":
-            _logger.info(u"Final process status is %s", processStatus)
-        else:
-            _logger.warn(u"Final process status is %s", processStatus)
         final_output.append(out)
 
     if "basedir" not in kwargs:
@@ -223,30 +219,30 @@ def single_job_executor(t, job_order_object, **kwargs):
 
     try:
         for r in jobiter:
-            if r.outdir:
-                output_dirs.add(r.outdir)
-
             if r:
+                if r.outdir:
+                    output_dirs.add(r.outdir)
                 r.run(**kwargs)
             else:
-                raise WorkflowException("Workflow cannot make any more progress.")
+                _logger.error("Workflow cannot make any more progress.")
+                break
     except WorkflowException:
         raise
     except Exception as e:
         _logger.exception("Got workflow error")
         raise WorkflowException(Text(e))
 
-    if final_status[0] != "success":
-        raise WorkflowException(u"Process status is %s" % (final_status))
-
-    if final_output[0] and finaloutdir:
+    if final_output and final_output[0] and finaloutdir:
         final_output[0] = relocateOutputs(final_output[0], finaloutdir,
                                           output_dirs, kwargs.get("move_outputs"))
 
     if kwargs.get("rm_tmpdir"):
         cleanIntermediate(output_dirs)
 
-    return final_output[0]
+    if final_output and final_status:
+        return (final_output[0], final_status[0])
+    else:
+        return (None, "permanentFail")
 
 class FSAction(argparse.Action):
     objclass = None  # type: Text
@@ -262,7 +258,7 @@ class FSAction(argparse.Action):
         setattr(namespace,
             self.dest,  # type: ignore
             {"class": self.objclass,
-             "location": "file://%s" % os.path.abspath(cast(AnyStr, values))})
+             "location": file_uri(str(os.path.abspath(cast(AnyStr, values))))})
 
 class FSAppendAction(argparse.Action):
     objclass = None  # type: Text
@@ -285,7 +281,7 @@ class FSAppendAction(argparse.Action):
                     g)
         g.append(
             {"class": self.objclass,
-             "location": "file://%s" % os.path.abspath(cast(AnyStr, values))})
+             "location": file_uri(str(os.path.abspath(cast(AnyStr, values))))})
 
 class FileAction(FSAction):
     objclass = "File"
@@ -475,7 +471,7 @@ def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False,
 
     if print_input_deps:
         printdeps(job_order_object, loader, stdout, relative_deps, "",
-                  basedir=u"file://%s/" % input_basedir)
+                  basedir=file_uri(input_basedir+"/"))
         return 0
 
     def pathToLoc(p):
@@ -502,7 +498,7 @@ def makeRelative(base, ob):
         pass
     else:
         if u.startswith("file://"):
-            u = u[7:]
+            u = uri_file_path(u)
         ob["location"] = os.path.relpath(u, base)
 
 def printdeps(obj, document_loader, stdout, relative_deps, uri, basedir=None):
@@ -523,7 +519,7 @@ def printdeps(obj, document_loader, stdout, relative_deps, uri, basedir=None):
         if relative_deps == "primary":
             base = basedir if basedir else os.path.dirname(uri)
         elif relative_deps == "cwd":
-            base = "file://" + os.getcwd()
+            base = file_uri(os.getcwd())
         else:
             raise Exception(u"Unknown relative_deps %s" % relative_deps)
 
@@ -551,7 +547,7 @@ def versionstring():
 
 def main(argsl=None,  # type: List[str]
          args=None,   # type: argparse.Namespace
-         executor=single_job_executor,  # type: Callable[..., Union[Text, Dict[Text, Text]]]
+         executor=single_job_executor,  # type: Callable[..., Tuple[Dict[Text, Any], Text]]
          makeTool=workflow.defaultMakeTool,  # type: Callable[..., Process]
          selectResources=None,  # type: Callable[[Dict[Text, int]], Dict[Text, int]]
          stdin=sys.stdin,  # type: IO[Any]
@@ -561,12 +557,16 @@ def main(argsl=None,  # type: List[str]
          job_order_object=None,  # type: Union[Tuple[Dict[Text, Any], Text], int]
          make_fs_access=StdFsAccess,  # type: Callable[[Text], StdFsAccess]
          fetcher_constructor=None,  # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
-         resolver=tool_resolver
+         resolver=tool_resolver,
+         logger_handler=None
          ):
     # type: (...) -> int
 
     _logger.removeHandler(defaultStreamHandler)
-    stderr_handler = logging.StreamHandler(stderr)
+    if logger_handler:
+        stderr_handler = logger_handler
+    else:
+        stderr_handler = logging.StreamHandler(stderr)
     _logger.addHandler(stderr_handler)
     try:
         if args is None:
@@ -598,7 +598,8 @@ def main(argsl=None,  # type: List[str]
                     'job_order': None,
                     'pack': False,
                     'on_error': 'continue',
-                    'relax_path_checks': False}.iteritems():
+                    'relax_path_checks': False,
+                    'validate': False}.iteritems():
             if not hasattr(args, k):
                 setattr(args, k, v)
 
@@ -709,7 +710,7 @@ def main(argsl=None,  # type: List[str]
             setattr(args, 'basedir', job_order_object[1])
             del args.workflow
             del args.job_order
-            out = executor(tool, job_order_object[0],
+            (out, status) = executor(tool, job_order_object[0],
                            makeTool=makeTool,
                            select_resources=selectResources,
                            make_fs_access=make_fs_access,
@@ -719,7 +720,7 @@ def main(argsl=None,  # type: List[str]
             if out is not None:
                 def locToPath(p):
                     if p["location"].startswith("file://"):
-                        p["path"] = p["location"][7:]
+                        p["path"] = uri_file_path(p["location"])
 
                 adjustDirObjs(out, locToPath)
                 adjustFileObjs(out, locToPath)
@@ -730,8 +731,14 @@ def main(argsl=None,  # type: List[str]
                     stdout.write(json.dumps(out, indent=4))
                 stdout.write("\n")
                 stdout.flush()
-            else:
+
+            if status != "success":
+                _logger.warn(u"Final process status is %s", status)
                 return 1
+            else:
+                _logger.info(u"Final process status is %s", status)
+                return 0
+
         except (validate.ValidationException) as exc:
             _logger.error(u"Input object failed validation:\n%s", exc,
                     exc_info=args.debug)
diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py
index b50538a..3e1aa2e 100644
--- a/cwltool/pathmapper.py
+++ b/cwltool/pathmapper.py
@@ -3,11 +3,13 @@ import logging
 import stat
 import collections
 import uuid
+import urllib
 import urlparse
 from functools import partial
 from typing import Any, Callable, Set, Text, Tuple, Union
 import schema_salad.validate as validate
 from schema_salad.sourceline import SourceLine
+from schema_salad.ref_resolver import uri_file_path
 
 _logger = logging.getLogger("cwltool")
 
@@ -64,7 +66,7 @@ def normalizeFilesDirs(job):
 
         if "basename" not in d:
             parse = urlparse.urlparse(d["location"])
-            d["basename"] = os.path.basename(parse.path)
+            d["basename"] = os.path.basename(urllib.url2pathname(parse.path))
 
     adjustFileObjs(job, addLocation)
     adjustDirObjs(job, addLocation)
@@ -72,7 +74,7 @@ def normalizeFilesDirs(job):
 
 def abspath(src, basedir):  # type: (Text, Text) -> Text
     if src.startswith(u"file://"):
-        ab = src[7:]
+        ab = unicode(uri_file_path(str(src)))
     else:
         ab = src if os.path.isabs(src) else os.path.join(basedir, src)
     return ab
diff --git a/cwltool/process.py b/cwltool/process.py
index 38547c8..8e4a20d 100644
--- a/cwltool/process.py
+++ b/cwltool/process.py
@@ -16,7 +16,7 @@ import hashlib
 import abc
 import schema_salad.validate as validate
 import schema_salad.schema
-from schema_salad.ref_resolver import Loader
+from schema_salad.ref_resolver import Loader, file_uri
 from schema_salad.sourceline import SourceLine
 import avro.schema
 from typing import (Any, AnyStr, Callable, cast, Dict, List, Generator, IO, Text,
@@ -220,7 +220,7 @@ def relocateOutputs(outputObj, outdir, output_dirs, action):
     stageFiles(pm, moveIt)
 
     def _check_adjust(f):
-        f["location"] = "file://" + pm.mapper(f["location"])[1]
+        f["location"] = file_uri(pm.mapper(f["location"])[1])
         if "contents" in f:
             del f["contents"]
         if f["class"] == "File":
diff --git a/cwltool/resolver.py b/cwltool/resolver.py
index 73d4941..cf753f3 100644
--- a/cwltool/resolver.py
+++ b/cwltool/resolver.py
@@ -3,6 +3,8 @@ import logging
 import urllib
 import urlparse
 
+from schema_salad.ref_resolver import file_uri
+
 _logger = logging.getLogger("cwltool")
 
 def resolve_local(document_loader, uri):
@@ -17,9 +19,9 @@ def resolve_local(document_loader, uri):
 
     for s in shares:
         if os.path.exists(s):
-            return ("file://%s" % s)
+            return file_uri(s)
         if os.path.exists("%s.cwl" % s):
-            return ("file://%s.cwl" % s)
+            return file_uri(s)
     return None
 
 def tool_resolver(document_loader, uri):
@@ -27,4 +29,4 @@ def tool_resolver(document_loader, uri):
         ret = r(document_loader, uri)
         if ret is not None:
             return ret
-    return "file://" + os.path.abspath(uri)
+    return file_uri(os.path.abspath(uri))
diff --git a/cwltool/stdfsaccess.py b/cwltool/stdfsaccess.py
index 5a67ce0..2db2a5d 100644
--- a/cwltool/stdfsaccess.py
+++ b/cwltool/stdfsaccess.py
@@ -2,6 +2,7 @@ from typing import Any, BinaryIO, Text
 from .pathmapper import abspath
 import glob
 import os
+from schema_salad.ref_resolver import file_uri
 
 class StdFsAccess(object):
 
@@ -12,7 +13,7 @@ class StdFsAccess(object):
         return abspath(p, self.basedir)
 
     def glob(self, pattern):  # type: (Text) -> List[Text]
-        return ["file://%s" % self._abs(l) for l in glob.glob(self._abs(pattern))]
+        return [file_uri(str(self._abs(l))) for l in glob.glob(self._abs(pattern))]
 
     def open(self, fn, mode):  # type: (Text, Text) -> BinaryIO
         return open(self._abs(fn), mode)
diff --git a/cwltool/workflow.py b/cwltool/workflow.py
index 9f93b9d..68d9bd5 100644
--- a/cwltool/workflow.py
+++ b/cwltool/workflow.py
@@ -141,8 +141,8 @@ def _compare_records(src, sink):
             return False
     return True
 
-def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceField):
-    # type: (Dict[Text, WorkflowStateItem], List[Dict[Text, Any]], bool, bool, Text) -> Dict[Text, Any]
+def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceField, incomplete=False):
+    # type: (Dict[Text, WorkflowStateItem], List[Dict[Text, Any]], bool, bool, Text, bool) -> Dict[Text, Any]
     inputobj = {}  # type: Dict[Text, Any]
     for inp in parms:
         iid = inp["id"]
@@ -172,7 +172,7 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceFiel
                     raise WorkflowException(
                         u"Connect source '%s' on parameter '%s' does not "
                         "exist" % (src, inp["id"]))
-                else:
+                elif not incomplete:
                     return None
         elif "default" in inp:
             inputobj[iid] = inp["default"]
@@ -225,12 +225,13 @@ class WorkflowJob(object):
 
     def receive_output(self, step, outputparms, jobout, processStatus):
         # type: (WorkflowJobStep, List[Dict[Text,Text]], Dict[Text,Text], Text) -> None
+
         for i in outputparms:
             if "id" in i:
                 if i["id"] in jobout:
                     self.state[i["id"]] = WorkflowStateItem(i, jobout[i["id"]])
                 else:
-                    _logger.error(u"Output is missing expected field %s" % i["id"])
+                    _logger.error(u"[%s] Output is missing expected field %s", step.name, i["id"])
                     processStatus = "permanentFail"
 
         if _logger.isEnabledFor(logging.DEBUG):
@@ -240,9 +241,9 @@ class WorkflowJob(object):
             if self.processStatus != "permanentFail":
                 self.processStatus = processStatus
 
-            _logger.warn(u"[%s] completion status is %s", step.name, processStatus)
+            _logger.warn(u"[%s] completed %s", step.name, processStatus)
         else:
-            _logger.info(u"[%s] completion status is %s", step.name, processStatus)
+            _logger.info(u"[%s] completed %s", step.name, processStatus)
 
         step.completed = True
 
@@ -363,7 +364,7 @@ class WorkflowJob(object):
                 self.state[out["id"]] = None
 
         completed = 0
-        while completed < len(self.steps) and self.processStatus == "success":
+        while completed < len(self.steps):
             made_progress = False
 
             for step in self.steps:
@@ -371,29 +372,44 @@ class WorkflowJob(object):
                     break
 
                 if not step.submitted:
-                    step.iterable = self.try_make_job(step, **kwargs)
+                    try:
+                        step.iterable = self.try_make_job(step, **kwargs)
+                    except WorkflowException as e:
+                        _logger.error(u"[%s] Cannot make job: %s", step.name, e)
+                        _logger.debug("", exc_info=True)
+                        self.processStatus = "permanentFail"
 
                 if step.iterable:
-                    for newjob in step.iterable:
-                        if kwargs.get("on_error", "stop") == "stop" and self.processStatus != "success":
-                            break
-                        if newjob:
-                            made_progress = True
-                            yield newjob
-                        else:
-                            break
+                    try:
+                        for newjob in step.iterable:
+                            if kwargs.get("on_error", "stop") == "stop" and self.processStatus != "success":
+                                break
+                            if newjob:
+                                made_progress = True
+                                yield newjob
+                            else:
+                                break
+                    except WorkflowException as e:
+                        _logger.error(u"[%s] Cannot make job: %s", step.name, e)
+                        _logger.debug("", exc_info=True)
+                        self.processStatus = "permanentFail"
 
             completed = sum(1 for s in self.steps if s.completed)
 
             if not made_progress and completed < len(self.steps):
-                yield None
+                if self.processStatus != "success":
+                    break
+                else:
+                    yield None
 
         supportsMultipleInput = bool(self.workflow.get_requirement("MultipleInputFeatureRequirement")[0])
 
-        wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource")
-
-        if wo is None:
-            raise WorkflowException("Output for workflow not available")
+        try:
+            wo = object_from_state(self.state, self.tool["outputs"], True, supportsMultipleInput, "outputSource", incomplete=True)
+        except WorkflowException as e:
+            _logger.error(u"[%s] Cannot collect workflow output: %s", self.name, e)
+            wo = {}
+            self.processStatus = "permanentFail"
 
         _logger.info(u"[%s] outdir is %s", self.name, self.outdir)
 
@@ -591,17 +607,23 @@ class ReceiveScatterOutput(object):
 def parallel_steps(steps, rc, kwargs):  # type: (List[Generator], ReceiveScatterOutput, Dict[str, Any]) -> Generator
     while rc.completed < rc.total:
         made_progress = False
-        for step in steps:
+        for index in xrange(len(steps)):
+            step = steps[index]
             if kwargs.get("on_error", "stop") == "stop" and rc.processStatus != "success":
                 break
-            for j in step:
-                if kwargs.get("on_error", "stop") == "stop" and rc.processStatus != "success":
-                    break
-                if j:
-                    made_progress = True
-                    yield j
-                else:
-                    break
+            try:
+                for j in step:
+                    if kwargs.get("on_error", "stop") == "stop" and rc.processStatus != "success":
+                        break
+                    if j:
+                        made_progress = True
+                        yield j
+                    else:
+                        break
+            except WorkflowException as e:
+                _logger.error(u"Cannot make scatter job: %s", e)
+                _logger.debug("", exc_info=True)
+                rc.receive_scatter_output(index, {}, "permanentFail")
         if not made_progress and rc.completed < rc.total:
             yield None
 
diff --git a/setup.cfg b/setup.cfg
index da4d62a..d46ef4b 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -5,7 +5,7 @@ exclude = cwltool/schemas
 [easy_install]
 
 [egg_info]
-tag_build = .20161216212910
+tag_build = .20170111193653
 tag_date = 0
 tag_svn_revision = 0
 
diff --git a/setup.py b/setup.py
index 9d5c14c..2c637ed 100755
--- a/setup.py
+++ b/setup.py
@@ -44,12 +44,13 @@ setup(name='cwltool',
       install_requires=[
           'setuptools',
           'requests >= 1.0',
-          'ruamel.yaml >= 0.12.4, < 0.12.5',
+          'ruamel.yaml >= 0.12.4',
           'rdflib >= 4.2.0, < 4.3.0',
           'shellescape >= 3.4.1, < 3.5',
-          'schema-salad >= 2.1.20161216210732, < 3',
+          'schema-salad >= 2.2.20170111180227, < 3',
           'typing >= 3.5.2, < 3.6',
-          'cwltest >= 1.0.20160907111242'],
+          'cwltest >= 1.0.20161227194859'
+      ],
       test_suite='tests',
       tests_require=[],
       entry_points={

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/cwltool.git



More information about the debian-med-commit mailing list