[med-svn] [cwltool] 04/05: Imported Upstream version 1.0.20160504183010
Michael Crusoe
misterc-guest at moszumanska.debian.org
Fri Dec 9 09:14:51 UTC 2016
This is an automated email from the git hooks/post-receive script.
misterc-guest pushed a commit to branch master
in repository cwltool.
commit 57e9b8a28ff3fe3e2a673bf21a74648c5cc980bd
Author: Michael R. Crusoe <crusoe at ucdavis.edu>
Date: Thu May 5 07:46:45 2016 -0700
Imported Upstream version 1.0.20160504183010
---
PKG-INFO | 7 +-
README.rst | 5 +
cwltool.egg-info/PKG-INFO | 7 +-
cwltool.egg-info/SOURCES.txt | 4 +-
cwltool.egg-info/requires.txt | 3 +-
cwltool/__main__.py | 2 +-
cwltool/aslist.py | 5 -
cwltool/builder.py | 71 +++++++--
cwltool/cwlrdf.py | 75 +++++-----
cwltool/cwltest.py | 54 ++++---
cwltool/docker.py | 17 ++-
cwltool/docker_uid.py | 23 +--
cwltool/draft2tool.py | 246 +++++++++++++++++++++----------
cwltool/expression.py | 61 ++++----
cwltool/factory.py | 13 +-
cwltool/flatten.py | 7 +-
cwltool/job.py | 112 +++++++++-----
cwltool/main.py | 331 +++++++++++++++++++++++++-----------------
cwltool/pathmapper.py | 61 +++++---
cwltool/process.py | 240 ++++++++++++++++++------------
cwltool/sandboxjs.py | 61 ++++++--
cwltool/stdfsaccess.py | 22 +++
cwltool/update.py | 98 +++++++++----
cwltool/utils.py | 18 +++
cwltool/workflow.py | 183 ++++++++++++++---------
gittaggers.py | 3 +
setup.cfg | 5 +-
setup.py | 8 +-
28 files changed, 1138 insertions(+), 604 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index 626843b..a4cb71a 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: cwltool
-Version: 1.0.20160316204054
+Version: 1.0.20160504183010
Summary: Common workflow language reference implementation
Home-page: https://github.com/common-workflow-language/common-workflow-language
Author: Common workflow language working group
@@ -11,6 +11,8 @@ Description: ==================================================================
Common workflow language tool description reference implementation
==================================================================
+ CWL Conformance test: |Build Status|
+
This is the reference implementation of the Common Workflow Language. It is
intended to be feature complete and provide comprehensive validation of CWL
files as well as provide other tools related to working with CWL.
@@ -65,4 +67,7 @@ Description: ==================================================================
$ cwl-runner --tmp-outdir-prefix=/Users/username/project --tmpdir-prefix=/Users/username/project wc-tool.cwl wc-job.json
+ .. |Build Status| image:: https://ci.commonwl.org/buildStatus/icon?job=cwltool-conformance
+ :target: https://ci.commonwl.org/job/cwltool-conformance/
+
Platform: UNKNOWN
diff --git a/README.rst b/README.rst
index b811f73..31f8cab 100644
--- a/README.rst
+++ b/README.rst
@@ -2,6 +2,8 @@
Common workflow language tool description reference implementation
==================================================================
+CWL Conformance test: |Build Status|
+
This is the reference implementation of the Common Workflow Language. It is
intended to be feature complete and provide comprehensive validation of CWL
files as well as provide other tools related to working with CWL.
@@ -55,3 +57,6 @@ To run CWL successfully with boot2docker you need to set the ``--tmpdir-prefix``
and ``--tmp-outdir-prefix`` to somewhere under ``/Users``::
$ cwl-runner --tmp-outdir-prefix=/Users/username/project --tmpdir-prefix=/Users/username/project wc-tool.cwl wc-job.json
+
+.. |Build Status| image:: https://ci.commonwl.org/buildStatus/icon?job=cwltool-conformance
+ :target: https://ci.commonwl.org/job/cwltool-conformance/
diff --git a/cwltool.egg-info/PKG-INFO b/cwltool.egg-info/PKG-INFO
index cdf478d..a4cb71a 100644
--- a/cwltool.egg-info/PKG-INFO
+++ b/cwltool.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: cwltool
-Version: 1.0.20160209222805
+Version: 1.0.20160504183010
Summary: Common workflow language reference implementation
Home-page: https://github.com/common-workflow-language/common-workflow-language
Author: Common workflow language working group
@@ -11,6 +11,8 @@ Description: ==================================================================
Common workflow language tool description reference implementation
==================================================================
+ CWL Conformance test: |Build Status|
+
This is the reference implementation of the Common Workflow Language. It is
intended to be feature complete and provide comprehensive validation of CWL
files as well as provide other tools related to working with CWL.
@@ -65,4 +67,7 @@ Description: ==================================================================
$ cwl-runner --tmp-outdir-prefix=/Users/username/project --tmpdir-prefix=/Users/username/project wc-tool.cwl wc-job.json
+ .. |Build Status| image:: https://ci.commonwl.org/buildStatus/icon?job=cwltool-conformance
+ :target: https://ci.commonwl.org/job/cwltool-conformance/
+
Platform: UNKNOWN
diff --git a/cwltool.egg-info/SOURCES.txt b/cwltool.egg-info/SOURCES.txt
index be6d829..55d0211 100644
--- a/cwltool.egg-info/SOURCES.txt
+++ b/cwltool.egg-info/SOURCES.txt
@@ -2,10 +2,10 @@ MANIFEST.in
README.rst
ez_setup.py
gittaggers.py
+setup.cfg
setup.py
cwltool/__init__.py
cwltool/__main__.py
-cwltool/aslist.py
cwltool/builder.py
cwltool/cwlrdf.py
cwltool/cwltest.py
@@ -21,7 +21,9 @@ cwltool/main.py
cwltool/pathmapper.py
cwltool/process.py
cwltool/sandboxjs.py
+cwltool/stdfsaccess.py
cwltool/update.py
+cwltool/utils.py
cwltool/workflow.py
cwltool.egg-info/PKG-INFO
cwltool.egg-info/SOURCES.txt
diff --git a/cwltool.egg-info/requires.txt b/cwltool.egg-info/requires.txt
index 4e6c69d..f1b2098 100644
--- a/cwltool.egg-info/requires.txt
+++ b/cwltool.egg-info/requires.txt
@@ -1,6 +1,7 @@
requests
PyYAML
-rdflib >= 4.2.0
+rdflib >= 4.1.0
rdflib-jsonld >= 0.3.0
shellescape
schema_salad == 1.7.20160316203940
+typing
diff --git a/cwltool/__main__.py b/cwltool/__main__.py
index ae4ff8a..2b15c84 100644
--- a/cwltool/__main__.py
+++ b/cwltool/__main__.py
@@ -1,4 +1,4 @@
-import main
+from . import main
import sys
sys.exit(main.main())
diff --git a/cwltool/aslist.py b/cwltool/aslist.py
deleted file mode 100644
index f34a048..0000000
--- a/cwltool/aslist.py
+++ /dev/null
@@ -1,5 +0,0 @@
-def aslist(l):
- if isinstance(l, list):
- return l
- else:
- return [l]
diff --git a/cwltool/builder.py b/cwltool/builder.py
index 1394616..36d4663 100644
--- a/cwltool/builder.py
+++ b/cwltool/builder.py
@@ -1,22 +1,54 @@
import copy
-from aslist import aslist
-import expression
+from .utils import aslist
+from . import expression
import avro
import schema_salad.validate as validate
+from typing import Any, Union, AnyStr, Callable
+from .errors import WorkflowException
+from .stdfsaccess import StdFsAccess
+from .pathmapper import PathMapper
CONTENT_LIMIT = 64 * 1024
-def substitute(value, replace):
+
+def substitute(value, replace): # type: (str, str) -> str
if replace[0] == "^":
return substitute(value[0:value.rindex('.')], replace[1:])
else:
return value + replace
+def adjustFileObjs(rec, op): # type: (Any, Callable[[Any], Any]) -> None
+ """Apply an update function to each File object in the object `rec`."""
+
+ if isinstance(rec, dict):
+ if rec.get("class") == "File":
+ op(rec)
+ for d in rec:
+ adjustFileObjs(rec[d], op)
+ if isinstance(rec, list):
+ for d in rec:
+ adjustFileObjs(d, op)
+
class Builder(object):
+ def __init__(self): # type: () -> None
+ self.names = None # type: avro.schema.Names
+ self.schemaDefs = None # type: Dict[str,Dict[unicode, Any]]
+ self.files = None # type: List[Dict[str, str]]
+ self.fs_access = None # type: StdFsAccess
+ self.job = None # type: Dict[str, Any]
+ self.requirements = None # type: List[Dict[str,Any]]
+ self.outdir = None # type: str
+ self.tmpdir = None # type: str
+ self.resources = None # type: Dict[str, Union[int, str]]
+ self.bindings = [] # type: List[Dict[str, Any]]
+ self.timeout = None # type: int
+ self.pathmapper = None # type: PathMapper
+
def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]):
- bindings = []
- binding = None
+ # type: (Dict[unicode, Any], Any, List[int], List[int]) -> List[Dict[str, Any]]
+ bindings = [] # type: List[Dict[str,str]]
+ binding = None # type: Dict[str,Any]
if "inputBinding" in schema and isinstance(schema["inputBinding"], dict):
binding = copy.copy(schema["inputBinding"])
@@ -32,7 +64,7 @@ class Builder(object):
# Handle union types
if isinstance(schema["type"], list):
for t in schema["type"]:
- if isinstance(t, basestring) and self.names.has_name(t, ""):
+ if isinstance(t, (str, unicode)) and self.names.has_name(t, ""):
avsc = self.names.get_name(t, "")
elif isinstance(t, dict) and "name" in t and self.names.has_name(t["name"], ""):
avsc = self.names.get_name(t["name"], "")
@@ -42,7 +74,7 @@ class Builder(object):
schema = copy.deepcopy(schema)
schema["type"] = t
return self.bind_input(schema, datum, lead_pos=lead_pos, tail_pos=tail_pos)
- raise validate.ValidationException("'%s' is not a valid union %s" % (datum, schema["type"]))
+ raise validate.ValidationException(u"'%s' is not a valid union %s" % (datum, schema["type"]))
elif isinstance(schema["type"], dict):
st = copy.deepcopy(schema["type"])
if binding and "inputBinding" not in st and "itemSeparator" not in binding and st["type"] in ("array", "map"):
@@ -90,17 +122,23 @@ class Builder(object):
datum["secondaryFiles"] = []
for sf in aslist(schema["secondaryFiles"]):
if isinstance(sf, dict) or "$(" in sf or "${" in sf:
- sfpath = self.do_eval(sf, context=datum)
- if isinstance(sfpath, basestring):
- sfpath = {"path": sfpath, "class": "File"}
+ secondary_eval = self.do_eval(sf, context=datum)
+ if isinstance(secondary_eval, basestring):
+ sfpath = {"path": secondary_eval, "class": "File"}
+ else:
+ sfpath = secondary_eval
else:
sfpath = {"path": substitute(datum["path"], sf), "class": "File"}
if isinstance(sfpath, list):
datum["secondaryFiles"].extend(sfpath)
else:
datum["secondaryFiles"].append(sfpath)
- for sf in datum.get("secondaryFiles", []):
- self.files.append(sf)
+
+ def _capture_files(f):
+ self.files.append(f)
+ return f
+
+ adjustFileObjs(datum.get("secondaryFiles", []), _capture_files)
# Position to front of the sort key
if binding:
@@ -110,15 +148,15 @@ class Builder(object):
return bindings
- def tostr(self, value):
+ def tostr(self, value): # type: (Any) -> str
if isinstance(value, dict) and value.get("class") == "File":
if "path" not in value:
- raise WorkflowException("File object must have \"path\": %s" % (value))
+ raise WorkflowException(u"File object must have \"path\": %s" % (value))
return value["path"]
else:
return str(value)
- def generate_arg(self, binding):
+ def generate_arg(self, binding): # type: (Dict[str,Any]) -> List[str]
value = binding["valueFrom"]
if "do_eval" in binding:
value = self.do_eval(binding["do_eval"], context=value)
@@ -126,7 +164,7 @@ class Builder(object):
prefix = binding.get("prefix")
sep = binding.get("separate", True)
- l = []
+ l = [] # type: List[Dict[str,str]]
if isinstance(value, list):
if binding.get("itemSeparator"):
l = [binding["itemSeparator"].join([self.tostr(v) for v in value])]
@@ -158,6 +196,7 @@ class Builder(object):
return [a for a in args if a is not None]
def do_eval(self, ex, context=None, pull_image=True):
+ # type: (Dict[str,str], Any, bool) -> Any
return expression.do_eval(ex, self.job, self.requirements,
self.outdir, self.tmpdir,
self.resources,
diff --git a/cwltool/cwlrdf.py b/cwltool/cwlrdf.py
index f8a6e41..8e603b7 100644
--- a/cwltool/cwlrdf.py
+++ b/cwltool/cwlrdf.py
@@ -1,17 +1,22 @@
import json
import urlparse
+from schema_salad.ref_resolver import Loader
from rdflib import Graph, plugin, URIRef
from rdflib.serializer import Serializer
+from typing import Any, Union, Dict, IO
def makerdf(workflow, wf, ctx):
+ # type: (str, Dict[str,Any], Loader.ContextType) -> Graph
prefixes = {}
for k,v in ctx.iteritems():
if isinstance(v, dict):
- v = v["@id"]
- doc_url, frg = urlparse.urldefrag(v)
+ url = v["@id"]
+ else:
+ url = v
+ doc_url, frg = urlparse.urldefrag(url)
if "/" in frg:
p, _ = frg.split("/")
- prefixes[p] = "%s#%s/" % (doc_url, p)
+ prefixes[p] = u"%s#%s/" % (doc_url, p)
wf["@context"] = ctx
g = Graph().parse(data=json.dumps(wf), format='json-ld', location=workflow)
@@ -20,15 +25,16 @@ def makerdf(workflow, wf, ctx):
for s,p,o in g.triples((None, URIRef("@id"), None)):
g.remove((s, p, o))
- for k,v in prefixes.iteritems():
- g.namespace_manager.bind(k, v)
+ for k2,v2 in prefixes.iteritems():
+ g.namespace_manager.bind(k2, v2)
return g
-def printrdf(workflow, wf, ctx, sr):
- print(makerdf(workflow, wf, ctx).serialize(format=sr))
+def printrdf(workflow, wf, ctx, sr, stdout):
+ # type: (str, Dict[str,Any], Loader.ContextType, str, IO[Any]) -> None
+ stdout.write(makerdf(workflow, wf, ctx).serialize(format=sr))
-def lastpart(uri):
+def lastpart(uri): # type: (Any) -> str
uri = str(uri)
if "/" in uri:
return uri[uri.rindex("/")+1:]
@@ -36,7 +42,7 @@ def lastpart(uri):
return uri
-def dot_with_parameters(g):
+def dot_with_parameters(g, stdout): # type: (Graph, IO[Any]) -> None
qres = g.query(
"""SELECT ?step ?run ?runtype
WHERE {
@@ -45,7 +51,7 @@ def dot_with_parameters(g):
}""")
for step, run, runtype in qres:
- print '"%s" [label="%s"]' % (lastpart(step), "%s (%s)" % (lastpart(step), lastpart(run)))
+ stdout.write(u'"%s" [label="%s"]\n' % (lastpart(step), "%s (%s)" % (lastpart(step), lastpart(run))))
qres = g.query(
"""SELECT ?step ?inp ?source
@@ -56,9 +62,9 @@ def dot_with_parameters(g):
}""")
for step, inp, source in qres:
- print '"%s" [shape=box]' % (lastpart(inp))
- print '"%s" -> "%s" [label="%s"]' % (lastpart(source), lastpart(inp), "")
- print '"%s" -> "%s" [label="%s"]' % (lastpart(inp), lastpart(step), "")
+ stdout.write(u'"%s" [shape=box]\n' % (lastpart(inp)))
+ stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(inp), ""))
+ stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(inp), lastpart(step), ""))
qres = g.query(
"""SELECT ?step ?out
@@ -68,8 +74,8 @@ def dot_with_parameters(g):
}""")
for step, out in qres:
- print '"%s" [shape=box]' % (lastpart(out))
- print '"%s" -> "%s" [label="%s"]' % (lastpart(step), lastpart(out), "")
+ stdout.write(u'"%s" [shape=box]\n' % (lastpart(out)))
+ stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(step), lastpart(out), ""))
qres = g.query(
"""SELECT ?out ?source
@@ -79,8 +85,8 @@ def dot_with_parameters(g):
}""")
for out, source in qres:
- print '"%s" [shape=octagon]' % (lastpart(out))
- print '"%s" -> "%s" [label="%s"]' % (lastpart(source), lastpart(out), "")
+ stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(out)))
+ stdout.write(u'"%s" -> "%s" [label="%s"]\n' % (lastpart(source), lastpart(out), ""))
qres = g.query(
"""SELECT ?inp
@@ -90,13 +96,13 @@ def dot_with_parameters(g):
}""")
for (inp,) in qres:
- print '"%s" [shape=octagon]' % (lastpart(inp))
+ stdout.write(u'"%s" [shape=octagon]\n' % (lastpart(inp)))
-def dot_without_parameters(g):
- dotname = {}
+def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None
+ dotname = {} # type: Dict[str,str]
clusternode = {}
- print "compound=true"
+ stdout.write("compound=true\n")
subworkflows = set()
qres = g.query(
@@ -126,21 +132,21 @@ def dot_without_parameters(g):
if wf != currentwf:
if currentwf is not None:
- print "}"
+ stdout.write("}\n")
if wf in subworkflows:
if wf not in dotname:
dotname[wf] = "cluster_" + lastpart(wf)
- print 'subgraph "%s" { label="%s"' % (dotname[wf], lastpart(wf))
+ stdout.write(u'subgraph "%s" { label="%s"\n' % (dotname[wf], lastpart(wf)))
currentwf = wf
clusternode[wf] = step
else:
currentwf = None
if str(runtype) != "https://w3id.org/cwl/cwl#Workflow":
- print '"%s" [label="%s"]' % (dotname[step], urlparse.urldefrag(str(step))[1])
+ stdout.write(u'"%s" [label="%s"]\n' % (dotname[step], urlparse.urldefrag(str(step))[1]))
if currentwf is not None:
- print "}\n"
+ stdout.write("}\n")
qres = g.query(
"""SELECT DISTINCT ?src ?sink ?srcrun ?sinkrun
@@ -155,26 +161,27 @@ def dot_without_parameters(g):
}""")
for src, sink, srcrun, sinkrun in qres:
- attr = ""
+ attr = u""
if srcrun in clusternode:
- attr += 'ltail="%s"' % dotname[srcrun]
+ attr += u'ltail="%s"' % dotname[srcrun]
src = clusternode[srcrun]
if sinkrun in clusternode:
- attr += ' lhead="%s"' % dotname[sinkrun]
+ attr += u' lhead="%s"' % dotname[sinkrun]
sink = clusternode[sinkrun]
- print '"%s" -> "%s" [%s]' % (dotname[src], dotname[sink], attr)
+ stdout.write(u'"%s" -> "%s" [%s]\n' % (dotname[src], dotname[sink], attr))
-def printdot(workflow, wf, ctx, include_parameters=False):
+def printdot(workflow, wf, ctx, stdout, include_parameters=False):
+ # type: (str, Dict[str,Any], Loader.ContextType, Any, bool) -> None
g = makerdf(workflow, wf, ctx)
- print "digraph {"
+ stdout.write("digraph {")
#g.namespace_manager.qname(predicate)
if include_parameters:
- dot_with_parmeters(g)
+ dot_with_parameters(g, stdout)
else:
- dot_without_parameters(g)
+ dot_without_parameters(g, stdout)
- print "}"
+ stdout.write("}")
diff --git a/cwltool/cwltest.py b/cwltool/cwltest.py
index 31afadb..cb6b6f7 100755
--- a/cwltool/cwltest.py
+++ b/cwltool/cwltest.py
@@ -8,9 +8,11 @@ import sys
import shutil
import tempfile
import yaml
+import yaml.scanner
import pipes
import logging
import schema_salad.ref_resolver
+from typing import Any, Union
_logger = logging.getLogger("cwltest")
_logger.addHandler(logging.StreamHandler())
@@ -21,41 +23,43 @@ UNSUPPORTED_FEATURE = 33
class CompareFail(Exception):
pass
-def compare(a, b):
+
+def compare(a, b): # type: (Any, Any) -> bool
try:
if isinstance(a, dict):
if a.get("class") == "File":
- if not b["path"].endswith("/" + a["path"]):
- raise CompareFail("%s does not end with %s" %(b["path"], a["path"]))
+ if not (b["path"].endswith("/" + a["path"]) or ("/" not in b["path"] and a["path"] == b["path"])):
+ raise CompareFail(u"%s does not end with %s" %(b["path"], a["path"]))
# ignore empty collections
b = {k: v for k, v in b.iteritems()
if not isinstance(v, (list, dict)) or len(v) > 0}
if len(a) != len(b):
- raise CompareFail("expected %s\ngot %s" % (json.dumps(a, indent=4, sort_keys=True), json.dumps(b, indent=4, sort_keys=True)))
+ raise CompareFail(u"expected %s\ngot %s" % (json.dumps(a, indent=4, sort_keys=True), json.dumps(b, indent=4, sort_keys=True)))
for c in a:
if a.get("class") != "File" or c != "path":
if c not in b:
- raise CompareFail("%s not in %s" % (c, b))
+ raise CompareFail(u"%s not in %s" % (c, b))
if not compare(a[c], b[c]):
return False
return True
elif isinstance(a, list):
if len(a) != len(b):
- raise CompareFail("expected %s\ngot %s" % (json.dumps(a, indent=4, sort_keys=True), json.dumps(b, indent=4, sort_keys=True)))
+ raise CompareFail(u"expected %s\ngot %s" % (json.dumps(a, indent=4, sort_keys=True), json.dumps(b, indent=4, sort_keys=True)))
for c in xrange(0, len(a)):
if not compare(a[c], b[c]):
return False
return True
else:
if a != b:
- raise CompareFail("%s != %s" % (a, b))
+ raise CompareFail(u"%s != %s" % (a, b))
else:
return True
except Exception as e:
raise CompareFail(str(e))
-def run_test(args, i, t):
- out = {}
+
+def run_test(args, i, t): # type: (argparse.Namespace, Any, Dict[str,str]) -> int
+ out = {} # type: Dict[str,Any]
outdir = None
try:
if "output" in t:
@@ -83,21 +87,23 @@ def run_test(args, i, t):
outstr = subprocess.check_output(test_command)
out = yaml.load(outstr)
+ if not isinstance(out, dict):
+ raise ValueError("Non-dict value parsed from output string.")
except ValueError as v:
- _logger.error(v)
+ _logger.error(str(v))
_logger.error(outstr)
except subprocess.CalledProcessError as err:
if err.returncode == UNSUPPORTED_FEATURE:
return UNSUPPORTED_FEATURE
else:
- _logger.error("""Test failed: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
+ _logger.error(u"""Test failed: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
_logger.error(t.get("doc"))
_logger.error("Returned non-zero")
return 1
except yaml.scanner.ScannerError as e:
- _logger.error("""Test failed: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
+ _logger.error(u"""Test failed: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
_logger.error(outstr)
- _logger.error("Parse error %s", str(e))
+ _logger.error(u"Parse error %s", str(e))
pwd = os.path.abspath(os.path.dirname(t["job"]))
# t["args"] = map(lambda x: x.replace("$PWD", pwd), t["args"])
@@ -114,16 +120,17 @@ def run_test(args, i, t):
try:
compare(t.get(key), out.get(key))
except CompareFail as ex:
- _logger.warn("""Test failed: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
+ _logger.warn(u"""Test failed: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
_logger.warn(t.get("doc"))
- _logger.warn("%s expected %s\n got %s", key,
+ _logger.warn(u"%s expected %s\n got %s", key,
json.dumps(t.get(key), indent=4, sort_keys=True),
json.dumps(out.get(key), indent=4, sort_keys=True))
- _logger.warn("Compare failure %s", ex)
+ _logger.warn(u"Compare failure %s", ex)
failed = True
if outdir:
- shutil.rmtree(outdir, True)
+ shutil.rmtree(outdir, True) # type: ignore
+ # Weird AnyStr != basestring issue
if failed:
return 1
@@ -131,7 +138,7 @@ def run_test(args, i, t):
return 0
-def main():
+def main(): # type: () -> int
parser = argparse.ArgumentParser(description='Compliance tests for cwltool')
parser.add_argument("--test", type=str, help="YAML file describing test cases", required=True)
parser.add_argument("--basedir", type=str, help="Basedir to use for tests", default=".")
@@ -158,13 +165,16 @@ def main():
tests = []
for t in alltests:
loader = schema_salad.ref_resolver.Loader({"id": "@id"})
- cwl, _ = loader.resolve_ref(t["tool"])
- if cwl["class"] == "CommandLineTool":
- tests.append(t)
+ cwl = loader.resolve_ref(t["tool"])[0]
+ if isinstance(cwl, dict):
+ if cwl["class"] == "CommandLineTool":
+ tests.append(t)
+ else:
+ raise Exception("Unexpected code path.")
if args.l:
for i, t in enumerate(tests):
- print "[%i] %s" % (i+1, t["doc"].strip())
+ print u"[%i] %s" % (i+1, t["doc"].strip())
return 0
if args.n is not None:
diff --git a/cwltool/docker.py b/cwltool/docker.py
index e9a56f4..1a5da7e 100644
--- a/cwltool/docker.py
+++ b/cwltool/docker.py
@@ -3,13 +3,15 @@ import logging
import sys
import requests
import os
-import process
+from .errors import WorkflowException
import re
import tempfile
+from typing import Any, Union
_logger = logging.getLogger("cwltool")
def get_image(dockerRequirement, pull_image, dry_run=False):
+ # type: (Dict[str,str], bool, bool) -> bool
found = False
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
@@ -36,7 +38,7 @@ def get_image(dockerRequirement, pull_image, dry_run=False):
subprocess.check_call(cmd, stdout=sys.stderr)
found = True
elif "dockerFile" in dockerRequirement:
- dockerfile_dir = tempfile.mkdtemp()
+ dockerfile_dir = str(tempfile.mkdtemp())
with open(os.path.join(dockerfile_dir, "Dockerfile"), "w") as df:
df.write(dockerRequirement["dockerFile"])
cmd = ["docker", "build", "--tag=%s" % dockerRequirement["dockerImageId"], dockerfile_dir]
@@ -49,12 +51,12 @@ def get_image(dockerRequirement, pull_image, dry_run=False):
_logger.info(str(cmd))
if not dry_run:
if os.path.exists(dockerRequirement["dockerLoad"]):
- _logger.info("Loading docker image from %s", dockerRequirement["dockerLoad"])
+ _logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"])
with open(dockerRequirement["dockerLoad"], "rb") as f:
loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr)
else:
loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr)
- _logger.info("Sending GET request to %s", dockerRequirement["dockerLoad"])
+ _logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"])
req = requests.get(dockerRequirement["dockerLoad"], stream=True)
n = 0
for chunk in req.iter_content(1024*1024):
@@ -64,7 +66,7 @@ def get_image(dockerRequirement, pull_image, dry_run=False):
loadproc.stdin.close()
rcode = loadproc.wait()
if rcode != 0:
- raise process.WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
+ raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
found = True
elif "dockerImport" in dockerRequirement:
cmd = ["docker", "import", dockerRequirement["dockerImport"], dockerRequirement["dockerImageId"]]
@@ -77,6 +79,7 @@ def get_image(dockerRequirement, pull_image, dry_run=False):
def get_from_requirements(r, req, pull_image, dry_run=False):
+ # type: (Dict[str,str], bool, bool, bool) -> Union[None,str]
if r:
errmsg = None
try:
@@ -88,7 +91,7 @@ def get_from_requirements(r, req, pull_image, dry_run=False):
if errmsg:
if req:
- raise process.WorkflowException(errmsg)
+ raise WorkflowException(errmsg)
else:
return None
@@ -96,6 +99,6 @@ def get_from_requirements(r, req, pull_image, dry_run=False):
return r["dockerImageId"]
else:
if req:
- raise process.WorkflowException("Docker image %s not found" % r["dockerImageId"])
+ raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"])
return None
diff --git a/cwltool/docker_uid.py b/cwltool/docker_uid.py
index e8fbbe2..4482f77 100644
--- a/cwltool/docker_uid.py
+++ b/cwltool/docker_uid.py
@@ -1,7 +1,8 @@
import subprocess
+from typing import Union
-def docker_vm_uid():
+def docker_vm_uid(): # type: () -> Union[int,None]
"""
Returns the UID of the default docker user inside the VM
@@ -19,7 +20,7 @@ def docker_vm_uid():
return None
-def check_output_and_strip(cmd):
+def check_output_and_strip(cmd): # type: (List[str]) -> Union[str,None]
"""
Passes a command list to subprocess.check_output, returning None
if an expected exception is raised
@@ -36,7 +37,7 @@ def check_output_and_strip(cmd):
return None
-def docker_machine_name():
+def docker_machine_name(): # type: () -> Union[str,None]
"""
Get the machine name of the active docker-machine machine
:return: Name of the active machine or None if error
@@ -45,6 +46,7 @@ def docker_machine_name():
def cmd_output_matches(check_cmd, expected_status):
+ # type: (List[str], str) -> bool
"""
Runs a command and compares output to expected
:param check_cmd: Command list to execute
@@ -57,7 +59,7 @@ def cmd_output_matches(check_cmd, expected_status):
return False
-def boot2docker_running():
+def boot2docker_running(): # type: () -> bool
"""
Checks if boot2docker CLI reports that boot2docker vm is running
:return: True if vm is running, False otherwise
@@ -65,7 +67,7 @@ def boot2docker_running():
return cmd_output_matches(['boot2docker', 'status'], 'running')
-def docker_machine_running():
+def docker_machine_running(): # type: () -> bool
"""
Asks docker-machine for active machine and checks if its VM is running
:return: True if vm is running, False otherwise
@@ -74,7 +76,7 @@ def docker_machine_running():
return cmd_output_matches(['docker-machine', 'status', machine_name], 'Running')
-def cmd_output_to_int(cmd):
+def cmd_output_to_int(cmd): # type: (List[str]) -> Union[int,None]
"""
Runs the provided command and returns the integer value of the result
:param cmd: The command to run
@@ -83,14 +85,13 @@ def cmd_output_to_int(cmd):
result = check_output_and_strip(cmd) # may return None
if result is not None:
try:
- result = int(result)
+ return int(result)
except ValueError:
# ValueError is raised if int conversion fails
- result = None
- return result
+ return None
-def boot2docker_uid():
+def boot2docker_uid(): # type: () -> Union[int,None]
"""
Gets the UID of the docker user inside a running boot2docker vm
:return: the UID, or None if error (e.g. boot2docker not present or stopped)
@@ -98,7 +99,7 @@ def boot2docker_uid():
return cmd_output_to_int(['boot2docker', 'ssh', 'id', '-u'])
-def docker_machine_uid():
+def docker_machine_uid(): # type: () -> Union[int,None]
"""
Asks docker-machine for active machine and gets the UID of the docker user
inside the vm
diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py
index b506b5f..47d6c04 100644
--- a/cwltool/draft2tool.py
+++ b/cwltool/draft2tool.py
@@ -1,43 +1,59 @@
import avro.schema
import json
import copy
-from flatten import flatten
-import functools
+from .flatten import flatten
+from functools import partial
import os
-from pathmapper import PathMapper, DockerPathMapper
-from job import CommandLineJob
+from .pathmapper import PathMapper, DockerPathMapper
+from .job import CommandLineJob
import yaml
import glob
import logging
import hashlib
import random
-from process import Process, shortname, uniquename, adjustFileObjs
-from errors import WorkflowException
+from .process import Process, shortname, uniquename, adjustFileObjs
+from .errors import WorkflowException
import schema_salad.validate as validate
-from aslist import aslist
-import expression
+from .utils import aslist
+from . import expression
import re
import urlparse
import tempfile
-from builder import CONTENT_LIMIT, substitute
+from .builder import CONTENT_LIMIT, substitute, Builder
import shellescape
import errno
+from typing import Callable, Any, Union, Generator, cast
+import hashlib
+import shutil
_logger = logging.getLogger("cwltool")
class ExpressionTool(Process):
def __init__(self, toolpath_object, **kwargs):
+ # type: (Dict[str,List[None]], **Any) -> None
super(ExpressionTool, self).__init__(toolpath_object, **kwargs)
class ExpressionJob(object):
- def run(self, **kwargs):
+
+ def __init__(self): # type: () -> None
+ self.builder = None # type: Builder
+ self.requirements = None # type: Dict[str,str]
+ self.hints = None # type: Dict[str,str]
+ self.collect_outputs = None # type: Callable[[Any], Any]
+ self.output_callback = None # type: Callable[[Any, Any], Any]
+ self.outdir = None # type: str
+ self.tmpdir = None # type: str
+ self.script = None # type: Dict[str,str]
+
+ def run(self, **kwargs): # type: (**Any) -> None
try:
self.output_callback(self.builder.do_eval(self.script), "success")
except Exception as e:
- _logger.warn("Failed to evaluate expression:\n%s", e, exc_info=(e if kwargs.get('debug') else False))
+ _logger.warn(u"Failed to evaluate expression:\n%s", e, exc_info=(e if kwargs.get('debug') else False))
self.output_callback({}, "permanentFail")
def job(self, joborder, input_basedir, output_callback, **kwargs):
+ # type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[ExpressionTool.ExpressionJob, None, None]
builder = self._init_job(joborder, input_basedir, **kwargs)
j = ExpressionTool.ExpressionJob()
@@ -51,11 +67,14 @@ class ExpressionTool(Process):
yield j
-def remove_hostfs(f):
+
+def remove_hostfs(f): # type: (Dict[str, Any]) -> None
if "hostfs" in f:
del f["hostfs"]
+
def revmap_file(builder, outdir, f):
+ # type: (Builder,str,Dict[str,Any]) -> Union[Dict[str,Any],None]
"""Remap a file back to original path. For Docker, this is outside the container.
Uses either files in the pathmapper or remaps internal output directories
@@ -63,7 +82,7 @@ def revmap_file(builder, outdir, f):
"""
if f.get("hostfs"):
- return
+ return None
revmap_f = builder.pathmapper.reversemap(f["path"])
if revmap_f:
@@ -75,17 +94,33 @@ def revmap_file(builder, outdir, f):
f["hostfs"] = True
return f
else:
- raise WorkflowException("Output file path %s must be within designated output directory (%s) or an input file pass through." % (f["path"], builder.outdir))
+ raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input file pass through." % (f["path"], builder.outdir))
+
+class CallbackJob(object):
+ def __init__(self, job, output_callback, cachebuilder, jobcache):
+ # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, str) -> None
+ self.job = job
+ self.output_callback = output_callback
+ self.cachebuilder = cachebuilder
+ self.outdir = jobcache
+
+ def run(self, **kwargs):
+ # type: (**Any) -> None
+ self.output_callback(self.job.collect_output_ports(self.job.tool["outputs"],
+ self.cachebuilder, self.outdir),
+ "success")
class CommandLineTool(Process):
def __init__(self, toolpath_object, **kwargs):
+ # type: (Dict[str,Any], **Any) -> None
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)
- def makeJobRunner(self):
+ def makeJobRunner(self): # type: () -> CommandLineJob
return CommandLineJob()
def makePathMapper(self, reffiles, input_basedir, **kwargs):
+ # type: (Set[str], str, **Any) -> PathMapper
dockerReq, _ = self.get_requirement("DockerRequirement")
try:
if dockerReq and kwargs.get("use_container"):
@@ -94,36 +129,76 @@ class CommandLineTool(Process):
return PathMapper(reffiles, input_basedir)
except OSError as e:
if e.errno == errno.ENOENT:
- raise WorkflowException("Missing input file %s" % e)
+ raise WorkflowException(u"Missing input file %s" % e)
def job(self, joborder, input_basedir, output_callback, **kwargs):
- builder = self._init_job(joborder, input_basedir, **kwargs)
-
- if self.tool["baseCommand"]:
- for n, b in enumerate(aslist(self.tool["baseCommand"])):
- builder.bindings.append({
- "position": [-1000000, n],
- "valueFrom": b
- })
-
- if self.tool.get("arguments"):
- for i, a in enumerate(self.tool["arguments"]):
- if isinstance(a, dict):
- a = copy.copy(a)
- if a.get("position"):
- a["position"] = [a["position"], i]
- else:
- a["position"] = [0, i]
- a["do_eval"] = a["valueFrom"]
- a["valueFrom"] = None
- builder.bindings.append(a)
+ # type: (Dict[str,str], str, Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
+
+ jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))
+
+ if kwargs.get("cachedir"):
+ cacheargs = kwargs.copy()
+ cacheargs["outdir"] = "/out"
+ cacheargs["tmpdir"] = "/tmp"
+ cachebuilder = self._init_job(joborder, input_basedir, **cacheargs)
+ cachebuilder.pathmapper = PathMapper(set((f["path"] for f in cachebuilder.files)),
+ input_basedir)
+
+ cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+ if docker_req and kwargs.get("use_container") is not False:
+ dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
+ cmdline = ["docker", "run", dockerimg] + cmdline
+ keydict = {"cmdline": cmdline}
+
+ for _,f in cachebuilder.pathmapper.items():
+ st = os.stat(f[0])
+ keydict[f[0]] = [st.st_size, int(st.st_mtime * 1000)]
+
+ interesting = {"DockerRequirement",
+ "EnvVarRequirement",
+ "CreateFileRequirement",
+ "ShellCommandRequirement"}
+ for rh in (self.requirements, self.hints):
+ for r in reversed(rh):
+ if r["class"] in interesting and r["class"] not in keydict:
+ keydict[r["class"]] = r
+
+ keydictstr = json.dumps(keydict, separators=(',',':'), sort_keys=True)
+ cachekey = hashlib.md5(keydictstr).hexdigest()
+
+ _logger.debug("[job %s] keydictstr is %s -> %s", jobname, keydictstr, cachekey)
+
+ jobcache = os.path.join(kwargs["cachedir"], cachekey)
+ jobcachepending = jobcache + ".pending"
+
+ if os.path.isdir(jobcache) and not os.path.isfile(jobcachepending):
+ if docker_req and kwargs.get("use_container") is not False:
+ cachebuilder.outdir = kwargs.get("docker_outdir") or "/var/spool/cwl"
else:
- builder.bindings.append({
- "position": [0, i],
- "valueFrom": a
- })
+ cachebuilder.outdir = jobcache
- builder.bindings.sort(key=lambda a: a["position"])
+ _logger.info("[job %s] Using cached output in %s", jobname, jobcache)
+ yield CallbackJob(self, output_callback, cachebuilder, jobcache)
+ return
+ else:
+ _logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
+ shutil.rmtree(jobcache, True)
+ os.makedirs(jobcache)
+ kwargs["outdir"] = jobcache
+ open(jobcachepending, "w").close()
+ def rm_pending_output_callback(output_callback, jobcachepending,
+ outputs, processStatus):
+ if processStatus == "success":
+ os.remove(jobcachepending)
+ output_callback(outputs, processStatus)
+ output_callback = cast(
+ Callable[..., Any], # known bug in mypy
+ # https://github.com/python/mypy/issues/797
+ partial(rm_pending_output_callback, output_callback,
+ jobcachepending))
+
+ builder = self._init_job(joborder, input_basedir, **kwargs)
reffiles = set((f["path"] for f in builder.files))
@@ -137,21 +212,19 @@ class CommandLineTool(Process):
j.permanentFailCodes = self.tool.get("permanentFailCodes")
j.requirements = self.requirements
j.hints = self.hints
- j.name = uniquename(kwargs.get("name", str(id(j))))
+ j.name = jobname
- _logger.debug("[job %s] initializing from %s%s",
+ _logger.debug(u"[job %s] initializing from %s%s",
j.name,
self.tool.get("id", ""),
- " as part of %s" % kwargs["part_of"] if "part_of" in kwargs else "")
- _logger.debug("[job %s] %s", j.name, json.dumps(joborder, indent=4))
+ u" as part of %s" % kwargs["part_of"] if "part_of" in kwargs else "")
+ _logger.debug(u"[job %s] %s", j.name, json.dumps(joborder, indent=4))
builder.pathmapper = None
if self.tool.get("stdin"):
j.stdin = builder.do_eval(self.tool["stdin"])
- if isinstance(j.stdin, dict) and "ref" in j.stdin:
- j.stdin = builder.job[j.stdin["ref"][1:]]["path"]
reffiles.add(j.stdin)
if self.tool.get("stdout"):
@@ -164,19 +237,20 @@ class CommandLineTool(Process):
# map files to assigned path inside a container. We need to also explicitly
# walk over input as implicit reassignment doesn't reach everything in builder.bindings
- def _check_adjust(f):
+ def _check_adjust(f): # type: (Dict[str,Any]) -> Dict[str,Any]
if not f.get("containerfs"):
f["path"] = builder.pathmapper.mapper(f["path"])[1]
f["containerfs"] = True
return f
+ _logger.debug(u"[job %s] path mappings is %s", j.name, json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4))
+
adjustFileObjs(builder.files, _check_adjust)
adjustFileObjs(builder.bindings, _check_adjust)
- _logger.debug("[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))
- _logger.debug("[job %s] path mappings is %s", j.name, json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4))
+ _logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4))
- dockerReq, _ = self.get_requirement("DockerRequirement")
+ dockerReq = self.get_requirement("DockerRequirement")[0]
if dockerReq and kwargs.get("use_container"):
out_prefix = kwargs.get("tmp_outdir_prefix")
j.outdir = kwargs.get("outdir") or tempfile.mkdtemp(prefix=out_prefix)
@@ -186,21 +260,21 @@ class CommandLineTool(Process):
j.outdir = builder.outdir
j.tmpdir = builder.tmpdir
- createFiles, _ = self.get_requirement("CreateFileRequirement")
+ createFiles = self.get_requirement("CreateFileRequirement")[0]
j.generatefiles = {}
if createFiles:
for t in createFiles["fileDef"]:
j.generatefiles[builder.do_eval(t["filename"])] = copy.deepcopy(builder.do_eval(t["fileContent"]))
j.environment = {}
- evr, _ = self.get_requirement("EnvVarRequirement")
+ evr = self.get_requirement("EnvVarRequirement")[0]
if evr:
for t in evr["envDef"]:
j.environment[t["envName"]] = builder.do_eval(t["envValue"])
- shellcmd, _ = self.get_requirement("ShellCommandRequirement")
+ shellcmd = self.get_requirement("ShellCommandRequirement")[0]
if shellcmd:
- cmd = []
+ cmd = [] # type: List[str]
for b in builder.bindings:
arg = builder.generate_arg(b)
if b.get("shellQuote", True):
@@ -211,28 +285,36 @@ class CommandLineTool(Process):
j.command_line = flatten(map(builder.generate_arg, builder.bindings))
j.pathmapper = builder.pathmapper
- j.collect_outputs = functools.partial(self.collect_output_ports, self.tool["outputs"], builder)
+ j.collect_outputs = partial(
+ self.collect_output_ports, self.tool["outputs"], builder)
j.output_callback = output_callback
yield j
def collect_output_ports(self, ports, builder, outdir):
+ # type: (Set[Dict[str,Any]], Builder, str) -> Dict[str,Union[str,List[Any],Dict[str,Any]]]
try:
- ret = {}
+ ret = {} # type: Dict[str,Union[str,List[Any],Dict[str,Any]]]
custom_output = os.path.join(outdir, "cwl.output.json")
if builder.fs_access.exists(custom_output):
with builder.fs_access.open(custom_output, "r") as f:
ret = yaml.load(f)
- _logger.debug("Raw output from %s: %s", custom_output, json.dumps(ret, indent=4))
+ _logger.debug(u"Raw output from %s: %s", custom_output, json.dumps(ret, indent=4))
adjustFileObjs(ret, remove_hostfs)
- adjustFileObjs(ret, functools.partial(revmap_file, builder, outdir))
+ adjustFileObjs(ret,
+ cast(Callable[[Any], Any], # known bug in mypy
+ # https://github.com/python/mypy/issues/797
+ partial(revmap_file, builder, outdir)))
adjustFileObjs(ret, remove_hostfs)
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
return ret
for port in ports:
fragment = shortname(port["id"])
- ret[fragment] = self.collect_output(port, builder, outdir)
+ try:
+ ret[fragment] = self.collect_output(port, builder, outdir)
+ except Exception as e:
+ raise WorkflowException(u"Error collecting output for parameter '%s': %s" % (shortname(port["id"]), e))
if ret:
adjustFileObjs(ret, remove_hostfs)
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
@@ -241,15 +323,15 @@ class CommandLineTool(Process):
raise WorkflowException("Error validating output record, " + str(e) + "\n in " + json.dumps(ret, indent=4))
def collect_output(self, schema, builder, outdir):
- r = None
+ # type: (Dict[str,Any], Builder, str) -> Union[Dict[str, Any], List[Union[Dict[str, Any], str]]]
+ r = [] # type: List[Any]
if "outputBinding" in schema:
binding = schema["outputBinding"]
- globpatterns = []
+ globpatterns = [] # type: List[str]
- revmap = functools.partial(revmap_file, builder, outdir)
+ revmap = partial(revmap_file, builder, outdir)
if "glob" in binding:
- r = []
for gb in aslist(binding["glob"]):
gb = builder.do_eval(gb)
if gb:
@@ -257,7 +339,7 @@ class CommandLineTool(Process):
for gb in globpatterns:
if gb.startswith("/"):
- raise WorkflowError("glob patterns must not start with '/'")
+ raise WorkflowException("glob patterns must not start with '/'")
try:
r.extend([{"path": g, "class": "File", "hostfs": True}
for g in builder.fs_access.glob(os.path.join(outdir, gb))])
@@ -291,15 +373,23 @@ class CommandLineTool(Process):
singlefile = True
if "outputEval" in binding:
- r = builder.do_eval(binding["outputEval"], context=r)
+ eout = builder.do_eval(binding["outputEval"], context=r)
if singlefile:
# Handle single file outputs not wrapped in a list
- if r is not None and not isinstance(r, (list, tuple)):
- r = [r]
- if optional and r is None:
+ if eout is not None and not isinstance(eout, (list, tuple)):
+ r = [eout]
+ elif optional and eout is None:
pass
- elif (r is None or len(r) != 1 or not isinstance(r[0], dict) or "path" not in r[0]):
- raise WorkflowException("Expression must return a file object for %s." % schema["id"])
+ elif (eout is None or len(eout) != 1 or
+ not isinstance(eout[0], dict)
+ or "path" not in eout[0]):
+ raise WorkflowException(
+ u"Expression must return a file object for %s."
+ % schema["id"])
+ else:
+ r = [eout]
+ else:
+ r = eout
if singlefile:
if not r and not optional:
@@ -313,7 +403,9 @@ class CommandLineTool(Process):
r = r[0]
# Ensure files point to local references outside of the run environment
- adjustFileObjs(r, revmap)
+ adjustFileObjs(r, cast( # known bug in mypy
+ # https://github.com/python/mypy/issues/797
+ Callable[[Any], Any], revmap))
if "secondaryFiles" in schema:
for primary in aslist(r):
@@ -334,9 +426,11 @@ class CommandLineTool(Process):
if not r and optional:
r = None
- if not r and isinstance(schema["type"], dict) and schema["type"]["type"] == "record":
- r = {}
+ if (not r and isinstance(schema["type"], dict) and
+ schema["type"]["type"] == "record"):
+ out = {}
for f in schema["type"]["fields"]:
- r[shortname(f["name"])] = self.collect_output(f, builder, outdir)
-
+ out[shortname(f["name"])] = self.collect_output( # type: ignore
+ f, builder, outdir)
+ return out
return r
diff --git a/cwltool/expression.py b/cwltool/expression.py
index 3e9f62c..bea6758 100644
--- a/cwltool/expression.py
+++ b/cwltool/expression.py
@@ -1,49 +1,55 @@
-import docker
+from . import docker
import subprocess
import json
-from aslist import aslist
+from .utils import aslist, get_feature
import logging
import os
-from errors import WorkflowException
-import process
+from .errors import WorkflowException
import yaml
import schema_salad.validate as validate
import schema_salad.ref_resolver
-import sandboxjs
+from . import sandboxjs
import re
+from typing import Any, AnyStr, Union
_logger = logging.getLogger("cwltool")
def jshead(engineConfig, rootvars):
- return "\n".join(engineConfig + ["var %s = %s;" % (k, json.dumps(v)) for k, v in rootvars.items()])
+ # type: (List[unicode],Dict[str,str]) -> unicode
+ return u"\n".join(engineConfig + [u"var %s = %s;" % (k, json.dumps(v, indent=4)) for k, v in rootvars.items()])
def exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image):
- if ex["engine"] == "https://w3id.org/cwl/cwl#JsonPointer":
- try:
- obj = {"job": jobinput, "context": context, "outdir": outdir, "tmpdir": tmpdir}
- return schema_salad.ref_resolver.resolve_json_pointer(obj, ex["script"])
- except ValueError as v:
- raise WorkflowException("%s in %s" % (v, obj))
+ # type: (Dict[str,Any], Dict[str,str], List[Dict[str, Any]], str, str, Any, bool) -> sandboxjs.JSON
if ex["engine"] == "https://w3id.org/cwl/cwl#JavascriptEngine":
- engineConfig = []
+ engineConfig = [] # type: List[unicode]
for r in reversed(requirements):
if r["class"] == "ExpressionEngineRequirement" and r["id"] == "https://w3id.org/cwl/cwl#JavascriptEngine":
engineConfig = r.get("engineConfig", [])
break
- return sandboxjs.execjs(ex["script"], jshead(engineConfig, jobinput, context, tmpdir, outdir))
+ rootvars = {
+ "inputs": jobinput,
+ "self": context,
+ "runtime": {
+ "tmpdir": tmpdir,
+ "outdir": outdir
+ }
+ }
+ return sandboxjs.execjs(ex["script"], jshead(engineConfig, rootvars))
for r in reversed(requirements):
if r["class"] == "ExpressionEngineRequirement" and r["id"] == ex["engine"]:
- runtime = []
+ runtime = [] # type: List[str]
class DR(object):
- pass
+ def __init__(self): # type: ()->None
+ self.requirements = None # type: List[None]
+ self.hints = None # type: List[None]
dr = DR()
dr.requirements = r.get("requirements", [])
dr.hints = r.get("hints", [])
- (docker_req, docker_is_req) = process.get_feature(dr, "DockerRequirement")
+ (docker_req, docker_is_req) = get_feature(dr, "DockerRequirement")
img_id = None
if docker_req:
img_id = docker.get_from_requirements(docker_req, docker_is_req, pull_image)
@@ -59,7 +65,7 @@ def exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image):
"tmpdir": tmpdir,
}
- _logger.debug("Invoking expression engine %s with %s",
+ _logger.debug(u"Invoking expression engine %s with %s",
runtime + aslist(r["engineCommand"]),
json.dumps(inp, indent=4))
@@ -71,11 +77,11 @@ def exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image):
(stdoutdata, stderrdata) = sp.communicate(json.dumps(inp) + "\n\n")
if sp.returncode != 0:
- raise WorkflowException("Expression engine returned non-zero exit code on evaluation of\n%s" % json.dumps(inp, indent=4))
+ raise WorkflowException(u"Expression engine returned non-zero exit code on evaluation of\n%s" % json.dumps(inp, indent=4))
return json.loads(stdoutdata)
- raise WorkflowException("Unknown expression engine '%s'" % ex["engine"])
+ raise WorkflowException(u"Unknown expression engine '%s'" % ex["engine"])
seg_symbol = r"""\w+"""
seg_single = r"""\['([^']|\\')+'\]"""
@@ -85,7 +91,7 @@ segments = r"(\.%s|%s|%s|%s)" % (seg_symbol, seg_single, seg_double, seg_index)
segment_re = re.compile(segments, flags=re.UNICODE)
param_re = re.compile(r"\$\((%s)%s*\)" % (seg_symbol, segments), flags=re.UNICODE)
-def next_seg(remain, obj):
+def next_seg(remain, obj): # type: (str,Any)->str
if remain:
m = segment_re.match(remain)
if m.group(0)[0] == '.':
@@ -99,7 +105,9 @@ def next_seg(remain, obj):
else:
return obj
+
def param_interpolate(ex, obj, strip=True):
+ # type: (str, Dict[Any,Any], bool) -> Union[str, unicode]
m = param_re.search(ex)
if m:
leaf = next_seg(m.group(0)[m.end(1) - m.start(0):-1], obj[m.group(1)])
@@ -112,12 +120,14 @@ def param_interpolate(ex, obj, strip=True):
return ex[0:m.start(0)] + leaf + param_interpolate(ex[m.end(0):], obj, False)
else:
if "$(" in ex or "${" in ex:
- _logger.warn("Warning possible workflow bug: found '$(' or '${' in '%s' but did not match valid parameter reference and InlineJavascriptRequirement not specified.", ex)
+ _logger.warn(u"Warning possible workflow bug: found '$(' or '${' in '%s' but did not match valid parameter reference and InlineJavascriptRequirement not specified.", ex)
return ex
def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
context=None, pull_image=True, timeout=None):
+ # type: (Any, Dict[str,str], List[Dict[str,Any]], str, str, Dict[str, Union[int, str]], Any, bool, int) -> Any
+
runtime = resources.copy()
runtime["tmpdir"] = tmpdir
runtime["outdir"] = outdir
@@ -133,7 +143,8 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
if isinstance(ex, basestring):
for r in requirements:
if r["class"] == "InlineJavascriptRequirement":
- return sandboxjs.interpolate(ex, jshead(r.get("expressionLib", []), rootvars),
+ return sandboxjs.interpolate(str(ex), jshead(r.get("expressionLib", []), rootvars),
timeout=timeout)
- return param_interpolate(ex, rootvars)
- return ex
+ return param_interpolate(str(ex), rootvars)
+ else:
+ return ex
diff --git a/cwltool/factory.py b/cwltool/factory.py
index 53d96e0..312732a 100644
--- a/cwltool/factory.py
+++ b/cwltool/factory.py
@@ -1,19 +1,24 @@
-import main
-import workflow
+from . import main
+from . import workflow
import os
+from .process import Process
+from typing import Any, Union
+from typing import Callable as tCallable
+import argparse
class Callable(object):
- def __init__(self, t, factory):
+ def __init__(self, t, factory): # type: (Process, Factory) -> None
self.t = t
self.factory = factory
- def __call__(self, **kwargs):
+ def __call__(self, **kwargs): # type: (**Any) -> Union[str,Dict[str,str]]
return self.factory.executor(self.t, kwargs, os.getcwd(), None, **self.factory.execkwargs)
class Factory(object):
def __init__(self, makeTool=workflow.defaultMakeTool,
executor=main.single_job_executor,
**execkwargs):
+ # type: (tCallable[[Dict[str, Any], Any], Process],tCallable[...,Union[str,Dict[str,str]]], **Any) -> None
self.makeTool = makeTool
self.executor = executor
self.execkwargs = execkwargs
diff --git a/cwltool/flatten.py b/cwltool/flatten.py
index 54e918a..477444a 100644
--- a/cwltool/flatten.py
+++ b/cwltool/flatten.py
@@ -1,5 +1,10 @@
+from typing import Any, Callable, List, cast
+
# http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
+
+
def flatten(l, ltypes=(list, tuple)):
+ # type: (Any, Any) -> List[Any]
if l is None:
return []
if not isinstance(l, ltypes):
@@ -17,4 +22,4 @@ def flatten(l, ltypes=(list, tuple)):
else:
l[i:i + 1] = l[i]
i += 1
- return ltype(l)
+ return cast(Callable[[Any], List], ltype)(l)
diff --git a/cwltool/job.py b/cwltool/job.py
index 6e9b0dc..9a3f78b 100644
--- a/cwltool/job.py
+++ b/cwltool/job.py
@@ -1,4 +1,5 @@
import subprocess
+import io
import os
import tempfile
import glob
@@ -7,20 +8,24 @@ import yaml
import logging
import sys
import requests
-import docker
-from process import get_feature, empty_subtree
-from errors import WorkflowException
+from . import docker
+from .process import get_feature, empty_subtree
+from .errors import WorkflowException
import shutil
import stat
import re
import shellescape
-from docker_uid import docker_vm_uid
+from .docker_uid import docker_vm_uid
+from .builder import Builder
+from typing import Union, Iterable, Callable, Any, Mapping, IO, cast, Tuple
+from .pathmapper import PathMapper
+import functools
_logger = logging.getLogger("cwltool")
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
-def deref_links(outputs):
+def deref_links(outputs): # type: (Any) -> None
if isinstance(outputs, dict):
if outputs.get("class") == "File":
st = os.lstat(outputs["path"])
@@ -34,21 +39,44 @@ def deref_links(outputs):
deref_links(v)
class CommandLineJob(object):
- def run(self, dry_run=False, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs=True, **kwargs):
+
+ def __init__(self): # type: () -> None
+ self.builder = None # type: Builder
+ self.joborder = None # type: Dict[str,str]
+ self.stdin = None # type: str
+ self.stdout = None # type: str
+ self.successCodes = None # type: Iterable[int]
+ self.temporaryFailCodes = None # type: Iterable[int]
+ self.permanentFailCodes = None # type: Iterable[int]
+ self.requirements = None # type: List[Dict[str, str]]
+ self.hints = None # type: Dict[str,str]
+ self.name = None # type: unicode
+ self.command_line = None # type: List[unicode]
+ self.pathmapper = None # type: PathMapper
+ self.collect_outputs = None # type: Union[Callable[[Any], Any],functools.partial[Any]]
+ self.output_callback = None # type: Callable[[Any, Any], Any]
+ self.outdir = None # type: str
+ self.tmpdir = None # type: str
+ self.environment = None # type: Dict[str,str]
+ self.generatefiles = None # type: Dict[str,Union[Dict[str,str],str]]
+
+ def run(self, dry_run=False, pull_image=True, rm_container=True,
+ rm_tmpdir=True, move_outputs=True, **kwargs):
+ # type: (bool, bool, bool, bool, bool, **Any) -> Union[Tuple[str,Dict[None,None]],None]
if not os.path.exists(self.outdir):
os.makedirs(self.outdir)
#with open(os.path.join(outdir, "cwl.input.json"), "w") as fp:
# json.dump(self.joborder, fp)
- runtime = []
- env = {"TMPDIR": self.tmpdir}
+ runtime = [] # type: List[unicode]
+ env = {"TMPDIR": self.tmpdir} # type: Mapping[str,str]
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
for f in self.pathmapper.files():
if not os.path.isfile(self.pathmapper.mapper(f)[0]):
- raise WorkflowException("Required input file %s not found or is not a regular file." % self.pathmapper.mapper(f)[0])
+ raise WorkflowException(u"Required input file %s not found or is not a regular file." % self.pathmapper.mapper(f)[0])
img_id = None
if docker_req and kwargs.get("use_container") is not False:
@@ -62,19 +90,22 @@ class CommandLineJob(object):
runtime = ["docker", "run", "-i"]
for src in self.pathmapper.files():
vol = self.pathmapper.mapper(src)
- runtime.append("--volume=%s:%s:ro" % vol)
- runtime.append("--volume=%s:%s:rw" % (os.path.abspath(self.outdir), "/var/spool/cwl"))
- runtime.append("--volume=%s:%s:rw" % (os.path.abspath(self.tmpdir), "/tmp"))
- runtime.append("--workdir=%s" % ("/var/spool/cwl"))
+ runtime.append(u"--volume=%s:%s:ro" % vol)
+ runtime.append(u"--volume=%s:%s:rw" % (os.path.abspath(self.outdir), "/var/spool/cwl"))
+ runtime.append(u"--volume=%s:%s:rw" % (os.path.abspath(self.tmpdir), "/tmp"))
+ runtime.append(u"--workdir=%s" % ("/var/spool/cwl"))
runtime.append("--read-only=true")
- if kwargs.get("enable_net") is not True:
+ if (kwargs.get("enable_net", None) is None and
+ kwargs.get("custom_net", None) is not None):
runtime.append("--net=none")
+ elif kwargs.get("custom_net", None) is not None:
+ runtime.append("--net={0}".format(kwargs.get("custom_net")))
if self.stdout:
runtime.append("--log-driver=none")
euid = docker_vm_uid() or os.geteuid()
- runtime.append("--user=%s" % (euid))
+ runtime.append(u"--user=%s" % (euid))
if rm_container:
runtime.append("--rm")
@@ -82,7 +113,7 @@ class CommandLineJob(object):
runtime.append("--env=TMPDIR=/tmp")
for t,v in self.environment.items():
- runtime.append("--env=%s=%s" % (t, v))
+ runtime.append(u"--env=%s=%s" % (t, v))
runtime.append(img_id)
else:
@@ -96,39 +127,42 @@ class CommandLineJob(object):
if key in vars_to_preserve and key not in env:
env[key] = value
- stdin = None
- stdout = None
+ stdin = None # type: Union[IO[Any],int]
+ stdout = None # type: IO[Any]
- scr, _ = get_feature(self, "ShellCommandRequirement")
+ scr, _ = get_feature(self, "ShellCommandRequirement")
if scr:
shouldquote = lambda x: False
else:
shouldquote = needs_shell_quoting_re.search
- _logger.info("[job %s] %s$ %s%s%s",
+ _logger.info(u"[job %s] %s$ %s%s%s",
self.name,
self.outdir,
" ".join([shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg) for arg in (runtime + self.command_line)]),
- ' < %s' % (self.stdin) if self.stdin else '',
- ' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '')
+ u' < %s' % (self.stdin) if self.stdin else '',
+ u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '')
if dry_run:
return (self.outdir, {})
- outputs = {}
+ outputs = {} # type: Dict[str,str]
try:
for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src = self.generatefiles[t]["path"]
+ entry = self.generatefiles[t]
+ if isinstance(entry, dict):
+ src = entry["path"]
dst = os.path.join(self.outdir, t)
if os.path.dirname(self.pathmapper.reversemap(src)[1]) != self.outdir:
- _logger.debug("symlinking %s to %s", dst, src)
+ _logger.debug(u"symlinking %s to %s", dst, src)
os.symlink(src, dst)
+ elif isinstance(entry, str):
+ with open(os.path.join(self.outdir, t), "w") as fout:
+ fout.write(entry)
else:
- with open(os.path.join(self.outdir, t), "w") as f:
- f.write(self.generatefiles[t])
+ raise Exception("Unhandled type")
if self.stdin:
stdin = open(self.pathmapper.mapper(self.stdin)[0], "rb")
@@ -152,12 +186,12 @@ class CommandLineJob(object):
env=env,
cwd=self.outdir)
- if stdin == subprocess.PIPE:
+ if sp.stdin:
sp.stdin.close()
rcode = sp.wait()
- if stdin != subprocess.PIPE:
+ if isinstance(stdin, file):
stdin.close()
if stdout is not sys.stderr:
@@ -176,7 +210,7 @@ class CommandLineJob(object):
for t in self.generatefiles:
if isinstance(self.generatefiles[t], dict):
- src = self.generatefiles[t]["path"]
+ src = cast(dict, self.generatefiles[t])["path"]
dst = os.path.join(self.outdir, t)
if os.path.dirname(self.pathmapper.reversemap(src)[1]) != self.outdir:
os.remove(dst)
@@ -187,31 +221,31 @@ class CommandLineJob(object):
except OSError as e:
if e.errno == 2:
if runtime:
- _logger.error("'%s' not found", runtime[0])
+ _logger.error(u"'%s' not found", runtime[0])
else:
- _logger.error("'%s' not found", self.command_line[0])
+ _logger.error(u"'%s' not found", self.command_line[0])
else:
_logger.exception("Exception while running job")
processStatus = "permanentFail"
except WorkflowException as e:
- _logger.error("Error while running job: %s" % e)
+ _logger.error(u"Error while running job: %s" % e)
processStatus = "permanentFail"
except Exception as e:
_logger.exception("Exception while running job")
processStatus = "permanentFail"
if processStatus != "success":
- _logger.warn("[job %s] completed %s", self.name, processStatus)
+ _logger.warn(u"[job %s] completed %s", self.name, processStatus)
else:
- _logger.debug("[job %s] completed %s", self.name, processStatus)
- _logger.debug("[job %s] %s", self.name, json.dumps(outputs, indent=4))
+ _logger.debug(u"[job %s] completed %s", self.name, processStatus)
+ _logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4))
self.output_callback(outputs, processStatus)
if rm_tmpdir:
- _logger.debug("[job %s] Removing temporary directory %s", self.name, self.tmpdir)
+ _logger.debug(u"[job %s] Removing temporary directory %s", self.name, self.tmpdir)
shutil.rmtree(self.tmpdir, True)
if move_outputs and empty_subtree(self.outdir):
- _logger.debug("[job %s] Removing empty output directory %s", self.name, self.outdir)
+ _logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir)
shutil.rmtree(self.outdir, True)
diff --git a/cwltool/main.py b/cwltool/main.py
index 4108557..1c502c5 100755
--- a/cwltool/main.py
+++ b/cwltool/main.py
@@ -1,27 +1,30 @@
#!/usr/bin/env python
-import draft2tool
+from . import draft2tool
import argparse
from schema_salad.ref_resolver import Loader
+import string
import json
import os
import sys
import logging
-import workflow
+from . import workflow
import schema_salad.validate as validate
import tempfile
import schema_salad.jsonld_context
import schema_salad.makedoc
import yaml
import urlparse
-import process
-import job
-from cwlrdf import printrdf, printdot
+from . import process
+from . import job
+from .cwlrdf import printrdf, printdot
import pkg_resources # part of setuptools
-import update
-from process import shortname
+from . import update
+from .process import shortname, Process
import rdflib
-from aslist import aslist
+import hashlib
+from .utils import aslist
+from typing import Union, Any, cast, Callable, Dict, Tuple, IO
_logger = logging.getLogger("cwltool")
@@ -29,7 +32,8 @@ defaultStreamHandler = logging.StreamHandler()
_logger.addHandler(defaultStreamHandler)
_logger.setLevel(logging.INFO)
-def arg_parser():
+
+def arg_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description='Reference executor for Common Workflow Language')
parser.add_argument("--conformance-test", action="store_true")
parser.add_argument("--basedir", type=str)
@@ -42,7 +46,7 @@ def arg_parser():
parser.add_argument("--preserve-environment", type=str, nargs='+',
help="Preserve specified environment variables when running CommandLineTools",
- metavar=("VAR1","VAR2"),
+ metavar=("VAR1,VAR2"),
default=("PATH",),
dest="preserve_environment")
@@ -59,10 +63,14 @@ def arg_parser():
help="Path prefix for temporary directories",
default="tmp")
- parser.add_argument("--tmp-outdir-prefix", type=str,
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--tmp-outdir-prefix", type=str,
help="Path prefix for intermediate output directories",
default="tmp")
+ exgroup.add_argument("--cachedir", type=str, default="",
+ help="Directory to cache intermediate workflow outputs to avoid recomputing steps.")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--rm-tmpdir", action="store_true", default=True,
help="Delete intermediate temporary directories (default)",
@@ -104,7 +112,8 @@ def arg_parser():
help="Print corresponding RDF graph for workflow and exit")
exgroup.add_argument("--print-dot", action="store_true", help="Print workflow visualization in graphviz format and exit")
exgroup.add_argument("--print-pre", action="store_true", help="Print CWL document after preprocessing.")
- exgroup.add_argument("--print-deps", action="store_true", help="Print CWL document dependencies from $import, $include, $schemas")
+ exgroup.add_argument("--print-deps", action="store_true", help="Print CWL document dependencies.")
+ exgroup.add_argument("--print-input-deps", action="store_true", help="Print input object document dependencies.")
exgroup.add_argument("--version", action="store_true", help="Print version and exit")
exgroup.add_argument("--update", action="store_true", help="Update to latest CWL version, print and exit")
@@ -121,23 +130,32 @@ def arg_parser():
parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
- parser.add_argument("--enable-net", action="store_true", help="Use docker's default network for container, default to disable network")
+ parser.add_argument("--relative-deps", choices=['primary', 'cwd'], default="primary",
+ help="When using --print-deps, print paths relative to primary file or current working directory.")
+ parser.add_argument("--enable-net", action="store_true",
+ help="Use docker's default networking for containers; the default is "
+ "to disable networking.")
+ parser.add_argument("--custom-net", type=str,
+ help="Will be passed to `docker run` as the '--net' parameter. "
+ "Implies '--enable-net'.")
parser.add_argument("workflow", type=str, nargs="?", default=None)
parser.add_argument("job_order", nargs=argparse.REMAINDER)
return parser
+
def single_job_executor(t, job_order, input_basedir, args, **kwargs):
+ # type: (Process, Dict[str,Any], str, argparse.Namespace,**Any) -> Union[str,Dict[str,str]]
final_output = []
final_status = []
def output_callback(out, processStatus):
final_status.append(processStatus)
if processStatus == "success":
- _logger.info("Final process status is %s", processStatus)
+ _logger.info(u"Final process status is %s", processStatus)
else:
- _logger.warn("Final process status is %s", processStatus)
+ _logger.warn(u"Final process status is %s", processStatus)
final_output.append(out)
if kwargs.get("outdir"):
@@ -173,34 +191,46 @@ def single_job_executor(t, job_order, input_basedir, args, **kwargs):
raise
except Exception as e:
_logger.exception("Got workflow error")
- raise workflow.WorkflowException("%s" % e, )
+ raise workflow.WorkflowException(unicode(e))
if final_status[0] != "success":
- raise workflow.WorkflowException("Process status is %s" % (final_status))
+ raise workflow.WorkflowException(u"Process status is %s" % (final_status))
return final_output[0]
+
class FileAction(argparse.Action):
+
def __init__(self, option_strings, dest, nargs=None, **kwargs):
+ # type: (List[str], str, Any, **Any) -> None
if nargs is not None:
raise ValueError("nargs not allowed")
super(FileAction, self).__init__(option_strings, dest, **kwargs)
+
def __call__(self, parser, namespace, values, option_string=None):
+ # type: (argparse.ArgumentParser, argparse.Namespace, str, Any) -> None
setattr(namespace, self.dest, {"class": "File", "path": values})
+
class FileAppendAction(argparse.Action):
+
def __init__(self, option_strings, dest, nargs=None, **kwargs):
+ # type: (List[str], str, Any, **Any) -> None
if nargs is not None:
raise ValueError("nargs not allowed")
super(FileAppendAction, self).__init__(option_strings, dest, **kwargs)
+
def __call__(self, parser, namespace, values, option_string=None):
+ # type: (argparse.ArgumentParser, argparse.Namespace, str, Any) -> None
g = getattr(namespace, self.dest)
if not g:
g = []
setattr(namespace, self.dest, g)
g.append({"class": "File", "path": values})
+
def generate_parser(toolparser, tool, namemap):
+ # type: (argparse.ArgumentParser, Process,Dict[str,str]) -> argparse.ArgumentParser
toolparser.add_argument("job_order", nargs="?", help="Job input json file")
namemap["job_order"] = "job_order"
@@ -222,38 +252,41 @@ def generate_parser(toolparser, tool, namemap):
if len(inptype) == 2:
inptype = inptype[1]
else:
- _logger.debug("Can't make command line argument from %s", inptype)
+ _logger.debug(u"Can't make command line argument from %s", inptype)
return None
- help = inp.get("description", "").replace("%", "%%")
- kwargs = {}
+ ahelp = inp.get("description", "").replace("%", "%%")
+ action = None # type: Union[argparse.Action,str]
+ atype = None # type: Any
+ default = None # type: Any
if inptype == "File":
- kwargs["action"] = FileAction
+ action = cast(argparse.Action, FileAction)
elif isinstance(inptype, dict) and inptype["type"] == "array":
if inptype["items"] == "File":
- kwargs["action"] = FileAppendAction
+ action = cast(argparse.Action, FileAppendAction)
else:
- kwargs["action"] = "append"
+ action = "append"
if inptype == "string":
- kwargs["type"] = str
+ atype = str
elif inptype == "int":
- kwargs["type"] = int
+ atype = int
elif inptype == "float":
- kwargs["type"] = float
+ atype = float
elif inptype == "boolean":
- kwargs["action"] = "store_true"
+ action = "store_true"
if "default" in inp:
- kwargs["default"] = inp["default"]
+ default = inp["default"]
required = False
- if "type" not in kwargs and "action" not in kwargs:
- _logger.debug("Can't make command line argument from %s", inptype)
+ if not atype and not action:
+ _logger.debug(u"Can't make command line argument from %s", inptype)
return None
- toolparser.add_argument(flag + name, required=required, help=help, **kwargs)
+ toolparser.add_argument(flag + name, required=required,
+ help=ahelp, action=action, type=atype, default=default)
return toolparser
@@ -262,61 +295,77 @@ def load_tool(argsworkflow, updateonly, strict, makeTool, debug,
print_pre=False,
print_rdf=False,
print_dot=False,
+ print_deps=False,
+ relative_deps=False,
rdf_serializer=None,
+ stdout=sys.stdout,
urifrag=None):
+ # type: (Union[str,unicode,dict[unicode,Any]], bool, bool, Callable[...,Process], bool, bool, bool, bool, bool, bool, Any, Any, Any) -> Any
(document_loader, avsc_names, schema_metadata) = process.get_schema()
if isinstance(avsc_names, Exception):
raise avsc_names
jobobj = None
- if isinstance(argsworkflow, basestring):
- split = urlparse.urlsplit(argsworkflow)
+ uri = None # type: str
+ workflowobj = None # type: Dict[unicode, Any]
+ if isinstance(argsworkflow, (basestring)):
+ split = urlparse.urlsplit(cast(str, argsworkflow))
if split.scheme:
- uri = argsworkflow
+ uri = cast(str, argsworkflow)
else:
- uri = "file://" + os.path.abspath(argsworkflow)
+ uri = "file://" + os.path.abspath(cast(str, argsworkflow))
fileuri, urifrag = urlparse.urldefrag(uri)
workflowobj = document_loader.fetch(fileuri)
- if isinstance(workflowobj, list):
- # bare list without a version must be treated as draft-2
- workflowobj = {"cwlVersion": "https://w3id.org/cwl/cwl#draft-2",
- "id": fileuri,
- "@graph": workflowobj}
elif isinstance(argsworkflow, dict):
workflowobj = argsworkflow
uri = urifrag
- fileuri = ""
+ fileuri = "#"
else:
raise schema_salad.validate.ValidationException("Must be URI or dict")
if "cwl:tool" in workflowobj:
jobobj = workflowobj
- workflowobj = document_loader.fetch(urlparse.urljoin(uri, workflowobj["cwl:tool"]))
+ uri = urlparse.urljoin(uri, jobobj["cwl:tool"])
+ fileuri, urifrag = urlparse.urldefrag(uri)
+ workflowobj = document_loader.fetch(fileuri)
+ del jobobj["cwl:tool"]
+
+ if isinstance(workflowobj, list):
+ # bare list without a version must be treated as draft-2
+ workflowobj = {"cwlVersion": "https://w3id.org/cwl/cwl#draft-2",
+ "id": fileuri,
+ "@graph": workflowobj}
workflowobj = update.update(workflowobj, document_loader, fileuri)
document_loader.idx.clear()
if updateonly:
- print json.dumps(workflowobj, indent=4)
+ stdout.write(json.dumps(workflowobj, indent=4))
+ return 0
+
+ if print_deps:
+ printdeps(workflowobj, document_loader, stdout, relative_deps)
return 0
try:
- processobj, metadata = schema_salad.schema.load_and_validate(document_loader, avsc_names, workflowobj, strict)
+ processobj, metadata = schema_salad.schema.load_and_validate(
+ document_loader, avsc_names, workflowobj, strict)
except (schema_salad.validate.ValidationException, RuntimeError) as e:
- _logger.error("Tool definition failed validation:\n%s", e, exc_info=(e if debug else False))
+ _logger.error(u"Tool definition failed validation:\n%s", e,
+ exc_info=(e if debug else False))
return 1
if print_pre:
- print json.dumps(processobj, indent=4)
+ stdout.write(json.dumps(processobj, indent=4))
return 0
if print_rdf:
- printrdf(argsworkflow, processobj, document_loader.ctx, rdf_serializer)
+ printrdf(str(argsworkflow), processobj, document_loader.ctx, rdf_serializer, stdout)
return 0
if print_dot:
- printdot(argsworkflow, processobj, document_loader.ctx)
+ printdot(str(argsworkflow), processobj, document_loader.ctx, stdout)
return 0
if urifrag:
@@ -325,7 +374,7 @@ def load_tool(argsworkflow, updateonly, strict, makeTool, debug,
if 1 == len(processobj):
processobj = processobj[0]
else:
- _logger.error("Tool file contains graph of multiple objects, must specify one of #%s",
+ _logger.error(u"Tool file contains graph of multiple objects, must specify one of #%s",
", #".join(urlparse.urldefrag(i["id"])[1]
for i in processobj if "id" in i))
return 1
@@ -333,10 +382,10 @@ def load_tool(argsworkflow, updateonly, strict, makeTool, debug,
try:
t = makeTool(processobj, strict=strict, makeTool=makeTool, loader=document_loader, avsc_names=avsc_names)
except (schema_salad.validate.ValidationException) as e:
- _logger.error("Tool definition failed validation:\n%s", e, exc_info=(e if debug else False))
+ _logger.error(u"Tool definition failed validation:\n%s", e, exc_info=(e if debug else False))
return 1
except (RuntimeError, workflow.WorkflowException) as e:
- _logger.error("Tool definition failed initialization:\n%s", e, exc_info=(e if debug else False))
+ _logger.error(u"Tool definition failed initialization:\n%s", e, exc_info=(e if debug else False))
return 1
if jobobj:
@@ -351,14 +400,19 @@ def load_tool(argsworkflow, updateonly, strict, makeTool, debug,
return t
-def load_job_order(args, t, parser, stdin):
+
+def load_job_order(args, t, parser, stdin, print_input_deps=False, relative_deps=False, stdout=sys.stdout):
+ # type: (argparse.Namespace, Process, argparse.ArgumentParser, IO[Any], bool, bool, IO[Any]) -> Union[int,Tuple[Dict[str,Any],str]]
job_order_object = None
if args.conformance_test:
loader = Loader({})
else:
- jobloaderctx = {"path": {"@type": "@id"}, "format": {"@type": "@id"}}
+ jobloaderctx = {
+ "path": {"@type": "@id"},
+ "format": {"@type": "@id"},
+ "id": "@id"}
jobloaderctx.update(t.metadata.get("$namespaces", {}))
loader = Loader(jobloaderctx)
@@ -377,12 +431,12 @@ def load_job_order(args, t, parser, stdin):
try:
job_order_object, _ = loader.resolve_ref(job_order_file)
except Exception as e:
- _logger.error(e, exc_info=(e if args.debug else False))
+ _logger.error(str(e), exc_info=(e if args.debug else False))
return 1
toolparser = None
else:
input_basedir = args.basedir if args.basedir else os.getcwd()
- namemap = {}
+ namemap = {} # type: Dict[str,str]
toolparser = generate_parser(argparse.ArgumentParser(prog=args.workflow), t, namemap)
if toolparser:
if args.tool_help:
@@ -395,14 +449,14 @@ def load_job_order(args, t, parser, stdin):
input_basedir = args.basedir if args.basedir else os.path.abspath(os.path.dirname(cmd_line["job_order"]))
job_order_object = loader.resolve_ref(cmd_line["job_order"])
except Exception as e:
- _logger.error(e, exc_info=(e if args.debug else False))
+ _logger.error(str(e), exc_info=(e if args.debug else False))
return 1
else:
- job_order_object = {}
+ job_order_object = {"id": args.workflow}
job_order_object.update({namemap[k]: v for k,v in cmd_line.items()})
- _logger.debug("Parsed job order from command line: %s", job_order_object)
+ _logger.debug(u"Parsed job order from command line: %s", json.dumps(job_order_object, indent=4))
else:
job_order_object = None
@@ -415,93 +469,95 @@ def load_job_order(args, t, parser, stdin):
if not job_order_object and len(t.tool["inputs"]) > 0:
parser.print_help()
if toolparser:
- print "\nOptions for %s " % args.workflow
+ print u"\nOptions for %s " % args.workflow
toolparser.print_help()
_logger.error("")
_logger.error("Input object required")
return 1
+ if print_input_deps:
+ printdeps(job_order_object, loader, stdout, relative_deps,
+ basedir=u"file://%s/" % input_basedir)
+ return 0
+
+ if "cwl:tool" in job_order_object:
+ del job_order_object["cwl:tool"]
+ if "id" in job_order_object:
+ del job_order_object["id"]
+
return (job_order_object, input_basedir)
-def print_deps(fn):
- with open(fn) as f:
- deps = {"class": "File",
- "path": fn}
- sf = process.scandeps(os.path.dirname(fn), yaml.load(f),
- set(("$import", "run")),
- set(("$include", "$schemas", "path")))
- if sf:
- deps["secondaryFiles"] = sf
- print json.dumps(deps, indent=4)
-
-def scandeps(base, doc):
- r = []
- if isinstance(doc, dict):
- if "$import" in doc:
- p = os.path.join(base, doc["$import"])
- with open(p) as f:
- r.append({
- "class": "File",
- "path": p,
- "secondaryFiles": scandeps(os.path.dirname(p), yaml.load(f))
- })
- elif "$include" in doc:
- p = os.path.join(base, doc["$include"])
- r.append({
- "class": "File",
- "path": p
- })
- elif "$schemas" in doc:
- for s in doc["$schemas"]:
- p = os.path.join(base, s)
- r.append({
- "class": "File",
- "path": p
- })
+
+def printdeps(obj, document_loader, stdout, relative_deps, basedir=None):
+ # type: (Dict[unicode, Any], Loader, IO[Any], bool, str) -> None
+ deps = {"class": "File",
+ "path": obj.get("id", "#")}
+
+ def loadref(b, u):
+ return document_loader.resolve_ref(u, base_url=b)[0]
+
+ sf = process.scandeps(basedir if basedir else obj["id"], obj,
+ set(("$import", "run")),
+ set(("$include", "$schemas", "path")), loadref)
+ if sf:
+ deps["secondaryFiles"] = sf
+
+ if relative_deps:
+ if relative_deps == "primary":
+ base = basedir if basedir else os.path.dirname(obj["id"])
+ elif relative_deps == "cwd":
+ base = "file://" + os.getcwd()
else:
- for d in doc.itervalues():
- r.extend(scandeps(base, d))
- elif isinstance(doc, list):
- for d in doc:
- r.extend(scandeps(base, d))
- return r
-
-def print_deps(fn):
- with open(fn) as f:
- print json.dumps(scandeps(os.path.dirname(fn), yaml.load(f)), indent=4)
-
-def main(args=None,
+ raise Exception(u"Unknown relative_deps %s" % relative_deps)
+ def makeRelative(u):
+ if ":" in u.split("/")[0] and not u.startswith("file://"):
+ return u
+ return os.path.relpath(u, base)
+ process.adjustFiles(deps, makeRelative)
+
+ stdout.write(json.dumps(deps, indent=4))
+
+def versionstring():
+ # type: () -> unicode
+ pkg = pkg_resources.require("cwltool")
+ if pkg:
+ return u"%s %s" % (sys.argv[0], pkg[0].version)
+ else:
+ return u"%s %s" % (sys.argv[0], "unknown version")
+
+
+def main(argsl=None,
executor=single_job_executor,
makeTool=workflow.defaultMakeTool,
selectResources=None,
parser=None,
stdin=sys.stdin,
stdout=sys.stdout,
- stderr=sys.stderr):
+ stderr=sys.stderr,
+ versionfunc=versionstring):
+ # type: (List[str],Callable[...,Union[str,Dict[str,str]]],Callable[...,Process],Callable[[Dict[str,int]],Dict[str,int]],argparse.ArgumentParser,IO[Any],IO[Any],IO[Any],Callable[[],unicode]) -> int
_logger.removeHandler(defaultStreamHandler)
_logger.addHandler(logging.StreamHandler(stderr))
- if args is None:
- args = sys.argv[1:]
+ if argsl is None:
+ argsl = sys.argv[1:]
if parser is None:
parser = arg_parser()
- args = parser.parse_args(args)
+ args = parser.parse_args(argsl)
if args.quiet:
_logger.setLevel(logging.WARN)
if args.debug:
_logger.setLevel(logging.DEBUG)
- pkg = pkg_resources.require("cwltool")
- if pkg:
- if args.version:
- print "%s %s" % (sys.argv[0], pkg[0].version)
- return 0
- else:
- _logger.info("%s %s", sys.argv[0], pkg[0].version)
+ if args.version:
+ print versionfunc()
+ return 0
+ else:
+ _logger.info(versionfunc())
if not args.workflow:
parser.print_help()
@@ -509,21 +565,20 @@ def main(args=None,
_logger.error("CWL document required")
return 1
- if args.print_deps:
- print_deps(args.workflow)
- return 0
-
try:
t = load_tool(args.workflow, args.update, args.strict, makeTool, args.debug,
print_pre=args.print_pre,
print_rdf=args.print_rdf,
print_dot=args.print_dot,
- rdf_serializer=args.rdf_serializer)
+ print_deps=args.print_deps,
+ relative_deps=args.relative_deps,
+ rdf_serializer=args.rdf_serializer,
+ stdout=stdout)
except Exception as e:
- _logger.error("I'm sorry, I couldn't load this CWL file, try again with --debug for more information.\n%s\n", e, exc_info=(e if args.debug else False))
+ _logger.error(u"I'm sorry, I couldn't load this CWL file, try again with --debug for more information.\n%s\n", e, exc_info=(e if args.debug else False))
return 1
- if type(t) == int:
+ if isinstance(t, int):
return t
if args.tmp_outdir_prefix != 'tmp':
@@ -540,18 +595,25 @@ def main(args=None,
_logger.error("Temporary directory prefix doesn't exist.")
return 1
- job_order_object = load_job_order(args, t, parser, stdin)
+ job_order_object = load_job_order(args, t, parser, stdin,
+ print_input_deps=args.print_input_deps,
+ relative_deps=args.relative_deps,
+ stdout=stdout)
- if type(job_order_object) == int:
+ if isinstance(job_order_object, int):
return job_order_object
+ if args.cachedir:
+ args.cachedir = os.path.abspath(args.cachedir)
+ args.move_outputs = False
+
try:
out = executor(t, job_order_object[0],
job_order_object[1], args,
conformance_test=args.conformance_test,
dry_run=args.dry_run,
outdir=args.outdir,
- tmp_outdir_prefix=args.tmp_outdir_prefix,
+ tmp_outdir_prefix=args.cachedir if args.cachedir else args.tmp_outdir_prefix,
use_container=args.use_container,
preserve_environment=args.preserve_environment,
pull_image=args.enable_pull,
@@ -562,22 +624,27 @@ def main(args=None,
makeTool=makeTool,
move_outputs=args.move_outputs,
select_resources=selectResources,
- eval_timeout=args.eval_timeout
+ eval_timeout=args.eval_timeout,
+ cachedir=args.cachedir
)
# This is the workflow output, it needs to be written
if out is not None:
- stdout.write(json.dumps(out, indent=4))
+ if isinstance(out, basestring):
+ stdout.write(out)
+ else:
+ stdout.write(json.dumps(out, indent=4))
+ stdout.write("\n")
stdout.flush()
else:
return 1
except (validate.ValidationException) as e:
- _logger.error("Input object failed validation:\n%s", e, exc_info=(e if args.debug else False))
+ _logger.error(u"Input object failed validation:\n%s", e, exc_info=(e if args.debug else False))
return 1
except workflow.WorkflowException as e:
- _logger.error("Workflow error, try again with --debug for more information:\n %s", e, exc_info=(e if args.debug else False))
+ _logger.error(u"Workflow error, try again with --debug for more information:\n %s", e, exc_info=(e if args.debug else False))
return 1
except Exception as e:
- _logger.error("Unhandled error, try again with --debug for more information:\n %s", e, exc_info=(e if args.debug else False))
+ _logger.error(u"Unhandled error, try again with --debug for more information:\n %s", e, exc_info=(e if args.debug else False))
return 1
return 0
diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py
index 68bcb56..33e6f59 100644
--- a/cwltool/pathmapper.py
+++ b/cwltool/pathmapper.py
@@ -2,68 +2,91 @@ import os
import random
import logging
import stat
+from typing import Tuple, Set, Union, Any
_logger = logging.getLogger("cwltool")
-def abspath(src, basedir):
+
+def abspath(src, basedir): # type: (str,str) -> str
if src.startswith("file://"):
ab = src[7:]
else:
ab = src if os.path.isabs(src) else os.path.join(basedir, src)
return ab
+
class PathMapper(object):
+
"""Mapping of files from relative path provided in the file to a tuple of
(absolute local path, absolute container path)"""
def __init__(self, referenced_files, basedir):
- self._pathmap = {}
+ # type: (Set[str], str) -> None
+ self._pathmap = {} # type: Dict[str, Tuple[str, str]]
+ self.setup(referenced_files, basedir)
+
+ def setup(self, referenced_files, basedir):
+ # type: (Set[str], str) -> None
for src in referenced_files:
ab = abspath(src, basedir)
self._pathmap[src] = (ab, ab)
- def mapper(self, src):
- return self._pathmap[src]
+ def mapper(self, src): # type: (str) -> Tuple[str,str]
+ if "#" in src:
+ i = src.index("#")
+ p = self._pathmap[src[:i]]
+ return (p[0], p[1] + src[i:])
+ else:
+ return self._pathmap[src]
- def files(self):
+ def files(self): # type: () -> List[str]
return self._pathmap.keys()
- def reversemap(self, target):
- for k,v in self._pathmap.items():
+ def items(self): # type: () -> List[Tuple[str,Tuple[str,str]]]
+ return self._pathmap.items()
+
+ def reversemap(self, target): # type: (str) -> Tuple[str, str]
+ for k, v in self._pathmap.items():
if v[1] == target:
return (k, v[0])
return None
+
class DockerPathMapper(PathMapper):
+
def __init__(self, referenced_files, basedir):
- self._pathmap = {}
- self.dirs = {}
+ # type: (Set[str], str) -> None
+ self.dirs = {} # type: Dict[str, Union[bool, str]]
+ super(DockerPathMapper, self).__init__(referenced_files, basedir)
+
+ def setup(self, referenced_files, basedir):
for src in referenced_files:
ab = abspath(src, basedir)
- dir, fn = os.path.split(ab)
+ dirn, fn = os.path.split(ab)
subdir = False
for d in self.dirs:
- if dir.startswith(d):
+ if dirn.startswith(d):
subdir = True
break
if not subdir:
for d in list(self.dirs):
- if d.startswith(dir):
- # 'dir' is a parent of 'd'
+ if d.startswith(dirn):
+ # 'dirn' is a parent of 'd'
del self.dirs[d]
- self.dirs[dir] = True
+ self.dirs[dirn] = True
prefix = "job" + str(random.randint(1, 1000000000)) + "_"
- names = set()
+ names = set() # type: Set[str]
for d in self.dirs:
name = os.path.join("/var/lib/cwl", prefix + os.path.basename(d))
i = 1
while name in names:
i += 1
- name = os.path.join("/var/lib/cwl", prefix + os.path.basename(d) + str(i))
+ name = os.path.join("/var/lib/cwl",
+ prefix + os.path.basename(d) + str(i))
names.add(name)
self.dirs[d] = name
@@ -74,9 +97,11 @@ class DockerPathMapper(PathMapper):
st = os.lstat(deref)
while stat.S_ISLNK(st.st_mode):
rl = os.readlink(deref)
- deref = rl if os.path.isabs(rl) else os.path.join(os.path.dirname(deref), rl)
+ deref = rl if os.path.isabs(rl) else os.path.join(
+ os.path.dirname(deref), rl)
st = os.lstat(deref)
for d in self.dirs:
if ab.startswith(d):
- self._pathmap[src] = (deref, os.path.join(self.dirs[d], ab[len(d)+1:]))
+ self._pathmap[src] = (deref, os.path.join(
+ self.dirs[d], ab[len(d)+1:]))
diff --git a/cwltool/process.py b/cwltool/process.py
index 952291a..94d9aab 100644
--- a/cwltool/process.py
+++ b/cwltool/process.py
@@ -1,3 +1,4 @@
+import abc
import avro.schema
import os
import json
@@ -7,27 +8,29 @@ import yaml
import copy
import logging
import pprint
-from aslist import aslist
+from .utils import aslist, get_feature
import schema_salad.schema
+from schema_salad.ref_resolver import Loader
import urlparse
import pprint
from pkg_resources import resource_stream
import stat
-from builder import Builder
+from .builder import Builder, adjustFileObjs
import tempfile
import glob
-from errors import WorkflowException
-from pathmapper import abspath
-
+from .errors import WorkflowException
+from .pathmapper import abspath
+from typing import Any, Callable, Generator, Union, IO, AnyStr, Tuple
+from collections import Iterable
from rdflib import URIRef
from rdflib.namespace import RDFS, OWL
-
+from .stdfsaccess import StdFsAccess
import errno
+from rdflib import Graph
_logger = logging.getLogger("cwltool")
supportedProcessRequirements = ["DockerRequirement",
- "ExpressionEngineRequirement",
"SchemaDefRequirement",
"EnvVarRequirement",
"CreateFileRequirement",
@@ -36,7 +39,8 @@ supportedProcessRequirements = ["DockerRequirement",
"MultipleInputFeatureRequirement",
"InlineJavascriptRequirement",
"ShellCommandRequirement",
- "StepInputExpressionRequirement"]
+ "StepInputExpressionRequirement",
+ "ResourceRequirement"]
cwl_files = ("Workflow.yml",
"CommandLineTool.yml",
@@ -67,8 +71,8 @@ salad_files = ('metaschema.yml',
'vocab_res_schema.yml',
'vocab_res_src.yml',
'vocab_res_proc.yml')
-
def get_schema():
+ # type: () -> Tuple[Loader, Union[avro.schema.Names, avro.schema.SchemaParseException], Dict[unicode,Any]]
cache = {}
for f in cwl_files:
rs = resource_stream(__name__, 'schemas/draft-3/' + f)
@@ -82,54 +86,32 @@ def get_schema():
return schema_salad.schema.load_schema("https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache)
-def get_feature(self, feature):
- for t in reversed(self.requirements):
- if t["class"] == feature:
- return (t, True)
- for t in reversed(self.hints):
- if t["class"] == feature:
- return (t, False)
- return (None, None)
-
def shortname(inputid):
+ # type: (str) -> str
d = urlparse.urlparse(inputid)
if d.fragment:
return d.fragment.split("/")[-1].split(".")[-1]
else:
return d.path.split("/")[-1]
-class StdFsAccess(object):
- def __init__(self, basedir):
- self.basedir = basedir
-
- def _abs(self, p):
- return abspath(p, self.basedir)
-
- def glob(self, pattern):
- return glob.glob(self._abs(pattern))
-
- def open(self, fn, mode):
- return open(self._abs(fn), mode)
- def exists(self, fn):
- return os.path.exists(self._abs(fn))
+class UnsupportedRequirement(Exception):
+ pass
def checkRequirements(rec, supportedProcessRequirements):
+ # type: (Any, Iterable[Any]) -> None
if isinstance(rec, dict):
if "requirements" in rec:
for r in rec["requirements"]:
if r["class"] not in supportedProcessRequirements:
- raise Exception("Unsupported requirement %s" % r["class"])
- if "scatter" in rec:
- if isinstance(rec["scatter"], list) and rec["scatter"] > 1:
- raise Exception("Unsupported complex scatter type '%s'" % rec.get("scatterMethod"))
+ raise UnsupportedRequirement(u"Unsupported requirement %s" % r["class"])
for d in rec:
checkRequirements(rec[d], supportedProcessRequirements)
if isinstance(rec, list):
for d in rec:
checkRequirements(d, supportedProcessRequirements)
-def adjustFiles(rec, op):
+def adjustFiles(rec, op): # type: (Any, Callable[..., Any]) -> None
"""Apply a mapping function to each File path in the object `rec`."""
if isinstance(rec, dict):
@@ -141,19 +123,25 @@ def adjustFiles(rec, op):
for d in rec:
adjustFiles(d, op)
-def adjustFileObjs(rec, op):
- """Apply an update function to each File object in the object `rec`."""
+def adjustFilesWithSecondary(rec, op, primary=None):
+ """Apply a mapping function to each File path in the object `rec`, propagating
+ the primary file associated with a group of secondary files.
+ """
if isinstance(rec, dict):
if rec.get("class") == "File":
- op(rec)
- for d in rec:
- adjustFileObjs(rec[d], op)
+ rec["path"] = op(rec["path"], primary=primary)
+ adjustFilesWithSecondary(rec.get("secondaryFiles", []), op,
+ primary if primary else rec["path"])
+ else:
+ for d in rec:
+ adjustFilesWithSecondary(rec[d], op)
if isinstance(rec, list):
for d in rec:
- adjustFileObjs(d, op)
+ adjustFilesWithSecondary(d, op, primary)
def formatSubclassOf(fmt, cls, ontology, visited):
+ # type: (str, str, Graph, Set[str]) -> bool
"""Determine if `fmt` is a subclass of `cls`."""
if URIRef(fmt) == URIRef(cls):
@@ -163,52 +151,75 @@ def formatSubclassOf(fmt, cls, ontology, visited):
return False
if fmt in visited:
- return
+ return False
visited.add(fmt)
- fmt = URIRef(fmt)
+ uriRefFmt = URIRef(fmt)
- for s,p,o in ontology.triples( (fmt, RDFS.subClassOf, None) ):
+ for s,p,o in ontology.triples( (uriRefFmt, RDFS.subClassOf, None) ):
# Find parent classes of `fmt` and search upward
if formatSubclassOf(o, cls, ontology, visited):
return True
- for s,p,o in ontology.triples( (fmt, OWL.equivalentClass, None) ):
+ for s,p,o in ontology.triples( (uriRefFmt, OWL.equivalentClass, None) ):
# Find equivalent classes of `fmt` and search horizontally
if formatSubclassOf(o, cls, ontology, visited):
return True
- for s,p,o in ontology.triples( (None, OWL.equivalentClass, fmt) ):
+ for s,p,o in ontology.triples( (None, OWL.equivalentClass, uriRefFmt) ):
# Find equivalent classes of `fmt` and search horizontally
if formatSubclassOf(s, cls, ontology, visited):
return True
return False
-def checkFormat(actualFile, inputFormats, requirements, ontology):
+
+def checkFormat(actualFile, inputFormats, ontology):
+ # type: (Union[Dict[str, Any], List[Dict[str, Any]]], Any, Graph) -> None
for af in aslist(actualFile):
if "format" not in af:
- raise validate.ValidationException("Missing required 'format' for File %s" % af)
+ raise validate.ValidationException(u"Missing required 'format' for File %s" % af)
for inpf in aslist(inputFormats):
if af["format"] == inpf or formatSubclassOf(af["format"], inpf, ontology, set()):
return
- raise validate.ValidationException("Incompatible file format %s required format(s) %s" % (af["format"], inputFormats))
+ raise validate.ValidationException(u"Incompatible file format %s required format(s) %s" % (af["format"], inputFormats))
+
+def fillInDefaults(inputs, job):
+ # type: (List[Dict[str, str]], Dict[str, str]) -> None
+ for inp in inputs:
+ if shortname(inp["id"]) in job:
+ pass
+ elif shortname(inp["id"]) not in job and "default" in inp:
+ job[shortname(inp["id"])] = copy.copy(inp["default"])
+ elif shortname(inp["id"]) not in job and inp["type"][0] == "null":
+ pass
+ else:
+ raise validate.ValidationException("Missing input parameter `%s`" % shortname(inp["id"]))
class Process(object):
+ __metaclass__ = abc.ABCMeta
+
def __init__(self, toolpath_object, **kwargs):
- (_, self.names, _) = get_schema()
+ # type: (Dict[str,Any], **Any) -> None
+ self.metadata = None # type: Dict[str,Any]
+ self.names = None # type: avro.schema.Names
+ n = get_schema()[1]
+ if isinstance(n, avro.schema.SchemaParseException):
+ raise n
+ else:
+ self.names = n
self.tool = toolpath_object
self.requirements = kwargs.get("requirements", []) + self.tool.get("requirements", [])
self.hints = kwargs.get("hints", []) + self.tool.get("hints", [])
+ self.formatgraph = None # type: Graph
if "loader" in kwargs:
self.formatgraph = kwargs["loader"].graph
- else:
- self.formatgraph = None
+ checkRequirements(self.tool, supportedProcessRequirements)
self.validate_hints(self.tool.get("hints", []), strict=kwargs.get("strict"))
- self.schemaDefs = {}
+ self.schemaDefs = {} # type: Dict[str,Dict[unicode, Any]]
sd, _ = self.get_requirement("SchemaDefRequirement")
@@ -220,18 +231,21 @@ class Process(object):
avro.schema.make_avsc_object(av, self.names)
# Build record schema from inputs
- self.inputs_record_schema = {"name": "input_record_schema", "type": "record", "fields": []}
- self.outputs_record_schema = {"name": "outputs_record_schema", "type": "record", "fields": []}
+ self.inputs_record_schema = {
+ "name": "input_record_schema", "type": "record",
+ "fields": []} # type: Dict[unicode, Any]
+ self.outputs_record_schema = {
+ "name": "outputs_record_schema", "type": "record",
+ "fields": []} # type: Dict[unicode, Any]
for key in ("inputs", "outputs"):
for i in self.tool[key]:
c = copy.copy(i)
- doc_url, _ = urlparse.urldefrag(c['id'])
c["name"] = shortname(c["id"])
del c["id"]
if "type" not in c:
- raise validate.ValidationException("Missing `type` in parameter `%s`" % c["name"])
+ raise validate.ValidationException(u"Missing `type` in parameter `%s`" % c["name"])
if "default" in c and "null" not in aslist(c["type"]):
c["type"] = ["null"] + aslist(c["type"])
@@ -239,35 +253,29 @@ class Process(object):
c["type"] = c["type"]
if key == "inputs":
- self.inputs_record_schema["fields"].append(c)
+ self.inputs_record_schema["fields"].append(c) # type: ignore
elif key == "outputs":
- self.outputs_record_schema["fields"].append(c)
+ self.outputs_record_schema["fields"].append(c) # type: ignore
try:
self.inputs_record_schema = schema_salad.schema.make_valid_avro(self.inputs_record_schema, {}, set())
avro.schema.make_avsc_object(self.inputs_record_schema, self.names)
except avro.schema.SchemaParseException as e:
- raise validate.ValidationException("Got error `%s` while prcoessing inputs of %s:\n%s" % (str(e), self.tool["id"], json.dumps(self.inputs_record_schema, indent=4)))
+ raise validate.ValidationException(u"Got error `%s` while prcoessing inputs of %s:\n%s" % (str(e), self.tool["id"], json.dumps(self.inputs_record_schema, indent=4)))
try:
self.outputs_record_schema = schema_salad.schema.make_valid_avro(self.outputs_record_schema, {}, set())
avro.schema.make_avsc_object(self.outputs_record_schema, self.names)
except avro.schema.SchemaParseException as e:
- raise validate.ValidationException("Got error `%s` while prcoessing outputs of %s:\n%s" % (str(e), self.tool["id"], json.dumps(self.outputs_record_schema, indent=4)))
+ raise validate.ValidationException(u"Got error `%s` while prcoessing outputs of %s:\n%s" % (str(e), self.tool["id"], json.dumps(self.outputs_record_schema, indent=4)))
def _init_job(self, joborder, input_basedir, **kwargs):
+ # type: (Dict[str, str], str, **Any) -> Builder
builder = Builder()
builder.job = copy.deepcopy(joborder)
- for i in self.tool["inputs"]:
- d = shortname(i["id"])
- if d not in builder.job and "default" in i:
- builder.job[d] = i["default"]
-
- for r in self.requirements:
- if r["class"] not in supportedProcessRequirements:
- raise WorkflowException("Unsupported process requirement %s" % (r["class"]))
+ fillInDefaults(self.tool["inputs"], builder.job)
# Validate job order
try:
@@ -297,15 +305,42 @@ class Process(object):
for i in self.tool["inputs"]:
d = shortname(i["id"])
if d in builder.job and i.get("format"):
- checkFormat(builder.job[d], builder.do_eval(i["format"]), self.requirements, self.formatgraph)
+ checkFormat(builder.job[d], builder.do_eval(i["format"]), self.formatgraph)
builder.bindings.extend(builder.bind_input(self.inputs_record_schema, builder.job))
+ if self.tool.get("baseCommand"):
+ for n, b in enumerate(aslist(self.tool["baseCommand"])):
+ builder.bindings.append({
+ "position": [-1000000, n],
+ "valueFrom": b
+ })
+
+ if self.tool.get("arguments"):
+ for i, a in enumerate(self.tool["arguments"]):
+ if isinstance(a, dict):
+ a = copy.copy(a)
+ if a.get("position"):
+ a["position"] = [a["position"], i]
+ else:
+ a["position"] = [0, i]
+ a["do_eval"] = a["valueFrom"]
+ a["valueFrom"] = None
+ builder.bindings.append(a)
+ else:
+ builder.bindings.append({
+ "position": [0, i],
+ "valueFrom": a
+ })
+
+ builder.bindings.sort(key=lambda a: a["position"])
+
builder.resources = self.evalResources(builder, kwargs)
return builder
def evalResources(self, builder, kwargs):
+ # type: (Builder, Dict[str, Any]) -> Dict[str, Union[int, str]]
resourceReq, _ = self.get_requirement("ResourceRequirement")
if resourceReq is None:
resourceReq = {}
@@ -346,19 +381,29 @@ class Process(object):
}
def validate_hints(self, hints, strict):
+ # type: (List[Dict[str, Any]], bool) -> None
for r in hints:
try:
if self.names.get_name(r["class"], "") is not None:
validate.validate_ex(self.names.get_name(r["class"], ""), r, strict=strict)
else:
- _logger.info(validate.ValidationException("Unknown hint %s" % (r["class"])))
+ _logger.info(str(validate.ValidationException(
+ u"Unknown hint %s" % (r["class"]))))
except validate.ValidationException as v:
- raise validate.ValidationException("Validating hint `%s`: %s" % (r["class"], str(v)))
+ raise validate.ValidationException(u"Validating hint `%s`: %s" % (r["class"], str(v)))
- def get_requirement(self, feature):
+ def get_requirement(self, feature): # type: (Any) -> Tuple[Any, bool]
return get_feature(self, feature)
-def empty_subtree(dirpath):
+ def visit(self, op):
+ op(self.tool)
+
+ @abc.abstractmethod
+ def job(self, job_order, input_basedir, output_callbacks, **kwargs):
+ # type: (Dict[str, str], str, Callable[[Any, Any], Any], **Any) -> Generator[Any, None, None]
+ return None
+
+def empty_subtree(dirpath): # type: (AnyStr) -> bool
# Test if a directory tree contains any files (does not count empty
# subdirectories)
for d in os.listdir(dirpath):
@@ -376,44 +421,57 @@ def empty_subtree(dirpath):
raise
return True
-_names = set()
-def uniquename(stem):
+_names = set() # type: Set[unicode]
+
+
+def uniquename(stem): # type: (unicode) -> unicode
c = 1
u = stem
while u in _names:
c += 1
- u = "%s_%s" % (stem, c)
+ u = u"%s_%s" % (stem, c)
_names.add(u)
return u
-def scandeps(base, doc, reffields, urlfields):
+def scandeps(base, doc, reffields, urlfields, loadref):
+ # type: (str, Any, Set[str], Set[str], Callable[[str, str], Any]) -> List[Dict[str, str]]
r = []
if isinstance(doc, dict):
+ if "id" in doc:
+ if doc["id"].startswith("file://"):
+ df, _ = urlparse.urldefrag(doc["id"])
+ if base != df:
+ r.append({
+ "class": "File",
+ "path": df
+ })
+ base = df
+
for k, v in doc.iteritems():
if k in reffields:
for u in aslist(v):
- if not isinstance(u, basestring):
- continue
- p = os.path.join(base, u)
- with open(p) as f:
+ if isinstance(u, dict):
+ r.extend(scandeps(base, u, reffields, urlfields, loadref))
+ else:
+ sub = loadref(base, u)
+ subid = urlparse.urljoin(base, u)
deps = {
"class": "File",
- "path": p
- }
- sf = scandeps(os.path.dirname(p), yaml.load(f), reffields, urlfields)
+ "path": subid
+ } # type: Dict[str, Any]
+ sf = scandeps(subid, sub, reffields, urlfields, loadref)
if sf:
deps["secondaryFiles"] = sf
r.append(deps)
elif k in urlfields:
for u in aslist(v):
- p = os.path.join(base, u)
r.append({
"class": "File",
- "path": p
+ "path": urlparse.urljoin(base, u)
})
else:
- r.extend(scandeps(base, v, reffields, urlfields))
+ r.extend(scandeps(base, v, reffields, urlfields, loadref))
elif isinstance(doc, list):
for d in doc:
- r.extend(scandeps(base, d, reffields, urlfields))
+ r.extend(scandeps(base, d, reffields, urlfields, loadref))
return r
diff --git a/cwltool/sandboxjs.py b/cwltool/sandboxjs.py
index 3cb3cde..4144cb2 100644
--- a/cwltool/sandboxjs.py
+++ b/cwltool/sandboxjs.py
@@ -2,21 +2,23 @@ import subprocess
import json
import threading
import errno
+import logging
+from typing import Any, Union, TypeVar, Dict, List, Mapping
+
class JavascriptException(Exception):
pass
-def execjs(js, jslib, timeout=None):
+_logger = logging.getLogger("cwltool")
+
+JSON = Union[Dict[Any,Any], List[Any], unicode, int, long, float, bool, None]
+
+def execjs(js, jslib, timeout=None): # type: (Union[Mapping,str], Any, int) -> JSON
nodejs = None
- trynodes = (["nodejs"], ["node"], ["docker", "run",
- "--attach=STDIN", "--attach=STDOUT", "--attach=STDERR",
- "--sig-proxy=true",
- "--interactive",
- "--rm",
- "node:slim"])
+ trynodes = ("nodejs", "node")
for n in trynodes:
try:
- nodejs = subprocess.Popen(n, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ nodejs = subprocess.Popen([n], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
break
except OSError as e:
if e.errno == errno.ENOENT:
@@ -25,10 +27,33 @@ def execjs(js, jslib, timeout=None):
raise
if nodejs is None:
- raise JavascriptException("cwltool requires Node.js engine to evaluate Javascript expressions, but couldn't find it. Tried %s" % (trynodes,))
+ try:
+ nodeimg = "node:slim"
+ dlist = subprocess.check_output(["docker", "images", nodeimg])
+ if "node" not in dlist:
+ nodejsimg = subprocess.check_output(["docker", "pull", nodeimg])
+ _logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg)
+ nodejs = subprocess.Popen(["docker", "run",
+ "--attach=STDIN", "--attach=STDOUT", "--attach=STDERR",
+ "--sig-proxy=true", "--interactive",
+ "--rm", nodeimg],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ pass
+ else:
+ raise
+ except subprocess.CalledProcessError:
+ pass
+
+ if nodejs is None:
+ raise JavascriptException(
+ u"cwltool requires Node.js engine to evaluate Javascript "
+ "expressions, but couldn't find it. Tried %s, docker run "
+ "node:slim" % u", ".join(trynodes))
- fn = "\"use strict\";%s\n(function()%s)()" % (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js))
- script = "console.log(JSON.stringify(require(\"vm\").runInNewContext(%s, {})));\n" % json.dumps(fn)
+ fn = u"\"use strict\";\n%s\n(function()%s)()" % (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js))
+ script = u"console.log(JSON.stringify(require(\"vm\").runInNewContext(%s, {})));\n" % json.dumps(fn)
killed = []
def term():
@@ -47,21 +72,25 @@ def execjs(js, jslib, timeout=None):
stdoutdata, stderrdata = nodejs.communicate(script)
tm.cancel()
+ def fn_linenum(): # type: () -> unicode
+ return u"\n".join(u"%04i %s" % (i+1, b) for i, b in enumerate(fn.split("\n")))
+
if killed:
- raise JavascriptException("Long-running script killed after %s seconds.\nscript was: %s\n" % (timeout, fn))
+ raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum()))
if nodejs.returncode != 0:
- raise JavascriptException("Returncode was: %s\nscript was: %s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn, stdoutdata, stderrdata))
+ raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata))
else:
try:
return json.loads(stdoutdata)
except ValueError as e:
- raise JavascriptException("%s\nscript was: %s\nstdout was: '%s'\nstderr was: '%s'\n" % (e, fn, stdoutdata, stderrdata))
+ raise JavascriptException(u"%s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (e, fn_linenum(), stdoutdata, stderrdata))
class SubstitutionError(Exception):
pass
-def scanner(scan):
+
+def scanner(scan): # type: (str) -> List[int]
DEFAULT = 0
DOLLAR = 1
PAREN = 2
@@ -133,7 +162,7 @@ def scanner(scan):
return None
-def interpolate(scan, jslib, timeout=None):
+def interpolate(scan, jslib, timeout=None): # type: (str, Union[str, unicode], int) -> JSON
scan = scan.strip()
parts = []
w = scanner(scan)
diff --git a/cwltool/stdfsaccess.py b/cwltool/stdfsaccess.py
new file mode 100644
index 0000000..e70208b
--- /dev/null
+++ b/cwltool/stdfsaccess.py
@@ -0,0 +1,22 @@
+from typing import Any, IO
+from .pathmapper import abspath
+import glob
+import os
+
+
+class StdFsAccess(object):
+
+ def __init__(self, basedir): # type: (str) -> None
+ self.basedir = basedir
+
+ def _abs(self, p): # type: (str) -> str
+ return abspath(p, self.basedir)
+
+ def glob(self, pattern): # type: (str) -> List[str]
+ return glob.glob(self._abs(pattern))
+
+ def open(self, fn, mode): # type: (str, str) -> IO[Any]
+ return open(self._abs(fn), mode)
+
+ def exists(self, fn): # type: (str) -> bool
+ return os.path.exists(self._abs(fn))
diff --git a/cwltool/update.py b/cwltool/update.py
index ed7b8dc..cccbc5b 100644
--- a/cwltool/update.py
+++ b/cwltool/update.py
@@ -2,9 +2,12 @@ import sys
import urlparse
import json
import re
-from aslist import aslist
+from .utils import aslist
+from typing import Any, Dict, Callable, List, Tuple, Union
+import traceback
+from schema_salad.ref_resolver import Loader
-def findId(doc, frg):
+def findId(doc, frg): # type: (Any, Any) -> Dict
if isinstance(doc, dict):
if "id" in doc and doc["id"] == frg:
return doc
@@ -20,23 +23,28 @@ def findId(doc, frg):
return f
return None
-def fixType(doc):
+def fixType(doc): # type: (Any) -> Any
if isinstance(doc, list):
return [fixType(f) for f in doc]
- if isinstance(doc, basestring):
+ if isinstance(doc, (str, unicode)):
if doc not in ("null", "boolean", "int", "long", "float", "double", "string", "File", "record", "enum", "array", "Any") and "#" not in doc:
return "#" + doc
return doc
-def _draft2toDraft3dev1(doc, loader, baseuri):
+def _draft2toDraft3dev1(doc, loader, baseuri): # type: (Any, Loader, str) -> Any
try:
if isinstance(doc, dict):
if "import" in doc:
imp = urlparse.urljoin(baseuri, doc["import"])
- r = loader.fetch(imp)
- if isinstance(r, list):
- r = {"@graph": r}
+ impLoaded = loader.fetch(imp)
+ r = None # type: Dict[str, Any]
+ if isinstance(impLoaded, list):
+ r = {"@graph": impLoaded}
+ elif isinstance(impLoaded, dict):
+ r = impLoaded
+ else:
+ raise Exception("Unexpected code path.")
r["id"] = imp
_, frag = urlparse.urldefrag(imp)
if frag:
@@ -78,26 +86,27 @@ def _draft2toDraft3dev1(doc, loader, baseuri):
elif "name" in doc:
err = doc["name"]
import traceback
- raise Exception("Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc(e)))
+ raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
-def draft2toDraft3dev1(doc, loader, baseuri):
+def draft2toDraft3dev1(doc, loader, baseuri): # type: (Any, Loader, str) -> Any
return (_draft2toDraft3dev1(doc, loader, baseuri), "https://w3id.org/cwl/cwl#draft-3.dev1")
digits = re.compile("\d+")
-def updateScript(sc):
+def updateScript(sc): # type: (str) -> str
sc = sc.replace("$job", "inputs")
sc = sc.replace("$tmpdir", "runtime.tmpdir")
sc = sc.replace("$outdir", "runtime.outdir")
sc = sc.replace("$self", "self")
return sc
-def _updateDev2Script(ent):
+
+def _updateDev2Script(ent): # type: (Any) -> Any
if isinstance(ent, dict) and "engine" in ent:
if ent["engine"] == "cwl:JsonPointer":
sp = ent["script"].split("/")
if sp[0] in ("tmpdir", "outdir"):
- return "$(runtime.%s)" % sp[0]
+ return u"$(runtime.%s)" % sp[0]
else:
if not sp[0]:
sp.pop(0)
@@ -105,19 +114,21 @@ def _updateDev2Script(ent):
sp = [str(i) if digits.match(i) else "'"+i+"'"
for i in sp]
if front == "job":
- return "$(inputs[%s])" % ']['.join(sp)
+ return u"$(inputs[%s])" % ']['.join(sp)
elif front == "context":
- return "$(self[%s])" % ']['.join(sp)
+ return u"$(self[%s])" % ']['.join(sp)
else:
sc = updateScript(ent["script"])
if sc[0] == "{":
return "$" + sc
else:
- return "$(%s)" % sc
+ return u"$(%s)" % sc
else:
return ent
+
def _draftDraft3dev1toDev2(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
doc = _updateDev2Script(doc)
if isinstance(doc, basestring):
return doc
@@ -125,8 +136,13 @@ def _draftDraft3dev1toDev2(doc, loader, baseuri):
# Convert expressions
if isinstance(doc, dict):
if "@import" in doc:
- r, _ = loader.resolve_ref(doc["@import"], base_url=baseuri)
- return _draftDraft3dev1toDev2(r, loader, r["id"])
+ resolved_doc = loader.resolve_ref(
+ doc["@import"], base_url=baseuri)[0]
+ if isinstance(resolved_doc, dict):
+ return _draftDraft3dev1toDev2(
+ resolved_doc, loader, resolved_doc["id"])
+ else:
+ raise Exception("Unexpected codepath")
for a in doc:
doc[a] = _draftDraft3dev1toDev2(doc[a], loader, baseuri)
@@ -154,10 +170,13 @@ def _draftDraft3dev1toDev2(doc, loader, baseuri):
return doc
+
def draftDraft3dev1toDev2(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
return (_draftDraft3dev1toDev2(doc, loader, baseuri), "https://w3id.org/cwl/cwl#draft-3.dev2")
def _draftDraft3dev2toDev3(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
try:
if isinstance(doc, dict):
if "@import" in doc:
@@ -165,11 +184,15 @@ def _draftDraft3dev2toDev3(doc, loader, baseuri):
return doc["@import"]
else:
imp = urlparse.urljoin(baseuri, doc["@import"])
- r = loader.fetch(imp)
- if isinstance(r, list):
- r = {"@graph": r}
+ impLoaded = loader.fetch(imp)
+ if isinstance(impLoaded, list):
+ r = {"@graph": impLoaded}
+ elif isinstance(impLoaded, dict):
+ r = impLoaded
+ else:
+ raise Exception("Unexpected code path.")
r["id"] = imp
- _, frag = urlparse.urldefrag(imp)
+ frag = urlparse.urldefrag(imp)[1]
if frag:
frag = "#" + frag
r = findId(r, frag)
@@ -192,21 +215,27 @@ def _draftDraft3dev2toDev3(doc, loader, baseuri):
elif "name" in doc:
err = doc["name"]
import traceback
- raise Exception("Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc(e)))
+ raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
def draftDraft3dev2toDev3(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
return (_draftDraft3dev2toDev3(doc, loader, baseuri), "https://w3id.org/cwl/cwl#draft-3.dev3")
def traverseImport(doc, loader, baseuri, func):
+ # type: (Any, Loader, str, Callable[[Any, Loader, str], Any]) -> Any
if "$import" in doc:
if doc["$import"][0] == "#":
return doc["$import"]
else:
imp = urlparse.urljoin(baseuri, doc["$import"])
- r = loader.fetch(imp)
- if isinstance(r, list):
- r = {"$graph": r}
+ impLoaded = loader.fetch(imp)
+ if isinstance(impLoaded, list):
+ r = {"$graph": impLoaded}
+ elif isinstance(impLoaded, dict):
+ r = impLoaded
+ else:
+ raise Exception("Unexpected code path.")
r["id"] = imp
_, frag = urlparse.urldefrag(imp)
if frag:
@@ -214,7 +243,9 @@ def traverseImport(doc, loader, baseuri, func):
r = findId(r, frag)
return func(r, loader, imp)
+
def _draftDraft3dev3toDev4(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
try:
if isinstance(doc, dict):
r = traverseImport(doc, loader, baseuri, _draftDraft3dev3toDev4)
@@ -239,13 +270,15 @@ def _draftDraft3dev3toDev4(doc, loader, baseuri):
elif "name" in doc:
err = doc["name"]
import traceback
- raise Exception("Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc(e)))
+ raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
def draftDraft3dev3toDev4(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
return (_draftDraft3dev3toDev4(doc, loader, baseuri), "https://w3id.org/cwl/cwl#draft-3.dev4")
def _draftDraft3dev4toDev5(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
try:
if isinstance(doc, dict):
r = traverseImport(doc, loader, baseuri, _draftDraft3dev4toDev5)
@@ -270,18 +303,20 @@ def _draftDraft3dev4toDev5(doc, loader, baseuri):
err = doc["id"]
elif "name" in doc:
err = doc["name"]
- import traceback
- raise Exception("Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc(e)))
+ raise Exception(u"Error updating '%s'\n %s\n%s" % (err, e, traceback.format_exc()))
def draftDraft3dev4toDev5(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
return (_draftDraft3dev4toDev5(doc, loader, baseuri), "https://w3id.org/cwl/cwl#draft-3.dev5")
def draftDraft3dev5toFinal(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
return (doc, "https://w3id.org/cwl/cwl#draft-3")
def update(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Any
updates = {
"https://w3id.org/cwl/cwl#draft-2": draft2toDraft3dev1,
"https://w3id.org/cwl/cwl#draft-3.dev1": draftDraft3dev1toDev2,
@@ -290,9 +325,10 @@ def update(doc, loader, baseuri):
"https://w3id.org/cwl/cwl#draft-3.dev4": draftDraft3dev4toDev5,
"https://w3id.org/cwl/cwl#draft-3.dev5": draftDraft3dev5toFinal,
"https://w3id.org/cwl/cwl#draft-3": None
- }
+ } # type: Dict[unicode, Any]
def identity(doc, loader, baseuri):
+ # type: (Any, Loader, str) -> Tuple[Any, Union[str, unicode]]
v = doc.get("cwlVersion")
if v:
return (doc, loader.expand_url(v, ""))
@@ -306,7 +342,7 @@ def update(doc, loader, baseuri):
if version in updates:
nextupdate = updates[version]
else:
- raise Exception("Unrecognized version %s" % version)
+ raise Exception(u"Unrecognized version %s" % version)
doc["cwlVersion"] = version
diff --git a/cwltool/utils.py b/cwltool/utils.py
new file mode 100644
index 0000000..ec33cad
--- /dev/null
+++ b/cwltool/utils.py
@@ -0,0 +1,18 @@
+# no imports from cwltool allowed
+
+from typing import Any, Tuple
+
+def aslist(l): # type: (Any) -> List[Any]
+ if isinstance(l, list):
+ return l
+ else:
+ return [l]
+
+def get_feature(self, feature): # type: (Any, Any) -> Tuple[Any, bool]
+ for t in reversed(self.requirements):
+ if t["class"] == feature:
+ return (t, True)
+ for t in reversed(self.hints):
+ if t["class"] == feature:
+ return (t, False)
+ return (None, None)
diff --git a/cwltool/workflow.py b/cwltool/workflow.py
index 6ba693e..6b369c8 100644
--- a/cwltool/workflow.py
+++ b/cwltool/workflow.py
@@ -1,8 +1,8 @@
-import job
-import draft2tool
-from aslist import aslist
-from process import Process, get_feature, empty_subtree, shortname, uniquename
-from errors import WorkflowException
+from . import job
+from . import draft2tool
+from .utils import aslist
+from .process import Process, get_feature, empty_subtree, shortname, uniquename
+from .errors import WorkflowException
import copy
import logging
import random
@@ -17,15 +17,17 @@ import tempfile
import shutil
import json
import schema_salad
-import expression
+from . import expression
+from typing import Iterable, List, Callable, Any, Union, Generator, cast
_logger = logging.getLogger("cwltool")
WorkflowStateItem = namedtuple('WorkflowStateItem', ['parameter', 'value'])
def defaultMakeTool(toolpath_object, **kwargs):
+ # type: (Dict[str, Any], **Any) -> Process
if not isinstance(toolpath_object, dict):
- raise WorkflowException("Not a dict: `%s`" % toolpath_object)
+ raise WorkflowException(u"Not a dict: `%s`" % toolpath_object)
if "class" in toolpath_object:
if toolpath_object["class"] == "CommandLineTool":
return draft2tool.CommandLineTool(toolpath_object, **kwargs)
@@ -34,9 +36,9 @@ def defaultMakeTool(toolpath_object, **kwargs):
elif toolpath_object["class"] == "Workflow":
return Workflow(toolpath_object, **kwargs)
- raise WorkflowException("Missing or invalid 'class' field in %s, expecting one of: CommandLineTool, ExpressionTool, Workflow" % toolpath_object["id"])
+ raise WorkflowException(u"Missing or invalid 'class' field in %s, expecting one of: CommandLineTool, ExpressionTool, Workflow" % toolpath_object["id"])
-def findfiles(wo, fn=None):
+def findfiles(wo, fn=None): # type: (Any, List) -> List[Dict[str, Any]]
if fn is None:
fn = []
if isinstance(wo, dict):
@@ -53,6 +55,7 @@ def findfiles(wo, fn=None):
def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom):
+ # type: (Union[List[str],str], WorkflowStateItem, str, Dict[str, Any], str, str) -> bool
if isinstance(sinktype, list):
# Sink is union type
for st in sinktype:
@@ -78,7 +81,7 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom):
else:
inputobj[iid].append(src.value)
else:
- raise WorkflowException("Unrecognized linkMerge enum '%s'" % linkMerge)
+ raise WorkflowException(u"Unrecognized linkMerge enum '%s'" % linkMerge)
return True
elif valueFrom is not None or are_same_type(src.parameter["type"], sinktype) or sinktype == "Any":
# simply assign the value from state to input
@@ -86,22 +89,23 @@ def match_types(sinktype, src, iid, inputobj, linkMerge, valueFrom):
return True
return False
-def are_same_type(one, two):
+def are_same_type(src, sink): # type: (Any, Any) -> bool
"""Check for identical type specifications, ignoring extra keys like inputBinding.
"""
- if isinstance(one, dict) and isinstance(two, dict):
- if one["type"] == "array" and two["type"] == "array":
- return are_same_type(one["items"], two["items"])
- elif one["type"] == two["type"]:
+ if isinstance(src, dict) and isinstance(sink, dict):
+ if src["type"] == "array" and sink["type"] == "array":
+ return are_same_type(src["items"], sink["items"])
+ elif src["type"] == sink["type"]:
return True
else:
return False
else:
- return one == two
+ return src == sink
def object_from_state(state, parms, frag_only, supportsMultipleInput):
- inputobj = {}
+ # type: (Dict[str,WorkflowStateItem], List[Dict[str, Any]], bool, bool) -> Dict[str, str]
+ inputobj = {} # type: Dict[str, str]
for inp in parms:
iid = inp["id"]
if frag_only:
@@ -115,9 +119,9 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput):
if not match_types(inp["type"], state[src], iid, inputobj,
inp.get("linkMerge", ("merge_nested" if len(connections) > 1 else None)),
valueFrom=inp.get("valueFrom")):
- raise WorkflowException("Type mismatch between source '%s' (%s) and sink '%s' (%s)" % (src, state[src].parameter["type"], inp["id"], inp["type"]))
+ raise WorkflowException(u"Type mismatch between source '%s' (%s) and sink '%s' (%s)" % (src, state[src].parameter["type"], inp["id"], inp["type"]))
elif src not in state:
- raise WorkflowException("Connect source '%s' on parameter '%s' does not exist" % (src, inp["id"]))
+ raise WorkflowException(u"Connect source '%s' on parameter '%s' does not exist" % (src, inp["id"]))
else:
return None
elif "default" in inp:
@@ -125,32 +129,39 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput):
elif "valueFrom" in inp:
inputobj[iid] = None
else:
- raise WorkflowException("Value for %s not specified" % (inp["id"]))
+ raise WorkflowException(u"Value for %s not specified" % (inp["id"]))
return inputobj
class WorkflowJobStep(object):
- def __init__(self, step):
+
+ def __init__(self, step): # type: (Any) -> None
self.step = step
self.tool = step.tool
self.id = step.id
self.submitted = False
self.completed = False
- self.iterable = None
- self.name = uniquename("step %s" % shortname(self.id))
+ self.iterable = None # type: Iterable
+ self.name = uniquename(u"step %s" % shortname(self.id))
def job(self, joborder, basedir, output_callback, **kwargs):
+ # type: (Dict[str,str], str, functools.partial[None], **Any) -> Generator
kwargs["part_of"] = self.name
kwargs["name"] = shortname(self.id)
for j in self.step.job(joborder, basedir, output_callback, **kwargs):
yield j
+
class WorkflowJob(object):
+
def __init__(self, workflow, **kwargs):
+ # type: (Workflow, **Any) -> None
self.workflow = workflow
self.tool = workflow.tool
self.steps = [WorkflowJobStep(s) for s in workflow.steps]
self.id = workflow.tool["id"]
+ self.state = None # type: Dict[str, WorkflowStateItem]
+ self.processStatus = None # type: str
if "outdir" in kwargs:
self.outdir = kwargs["outdir"]
elif "tmp_outdir_prefix" in kwargs:
@@ -159,32 +170,34 @@ class WorkflowJob(object):
# tmp_outdir_prefix defaults to tmp, so this is unlikely to be used
self.outdir = tempfile.mkdtemp()
- self.name = uniquename("workflow %s" % kwargs.get("name", shortname(self.workflow.tool["id"])))
+ self.name = uniquename(u"workflow %s" % kwargs.get("name", shortname(self.workflow.tool["id"])))
- _logger.debug("[%s] initialized step from %s", self.name, self.tool["id"])
+ _logger.debug(u"[%s] initialized step from %s", self.name, self.tool["id"])
def receive_output(self, step, outputparms, jobout, processStatus):
+ # type: (WorkflowJobStep, List[Dict[str,str]], Dict[str,str], str) -> None
for i in outputparms:
if "id" in i:
if i["id"] in jobout:
self.state[i["id"]] = WorkflowStateItem(i, jobout[i["id"]])
else:
- _logger.error("Output is missing expected field %s" % i["id"])
+ _logger.error(u"Output is missing expected field %s" % i["id"])
processStatus = "permanentFail"
- _logger.debug("[%s] produced output %s", step.name, json.dumps(jobout, indent=4))
+ _logger.debug(u"[%s] produced output %s", step.name, json.dumps(jobout, indent=4))
if processStatus != "success":
if self.processStatus != "permanentFail":
self.processStatus = processStatus
- _logger.warn("[%s] completion status is %s", step.name, processStatus)
+ _logger.warn(u"[%s] completion status is %s", step.name, processStatus)
else:
- _logger.info("[%s] completion status is %s", step.name, processStatus)
+ _logger.info(u"[%s] completion status is %s", step.name, processStatus)
step.completed = True
def try_make_job(self, step, basedir, **kwargs):
+ # type: (WorkflowJobStep, str, **Any) -> Generator
inputparms = step.tool["inputs"]
outputparms = step.tool["outputs"]
@@ -193,13 +206,13 @@ class WorkflowJob(object):
try:
inputobj = object_from_state(self.state, inputparms, False, supportsMultipleInput)
if inputobj is None:
- _logger.debug("[%s] job step %s not ready", self.name, step.id)
+ _logger.debug(u"[%s] job step %s not ready", self.name, step.id)
return
if step.submitted:
return
- _logger.debug("[%s] starting %s", self.name, step.name)
+ _logger.debug(u"[%s] starting %s", self.name, step.name)
callback = functools.partial(self.receive_output, step, outputparms)
@@ -209,7 +222,7 @@ class WorkflowJob(object):
raise WorkflowException("Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements")
vfinputs = {shortname(k): v for k,v in inputobj.iteritems()}
- def valueFromFunc(k, v):
+ def valueFromFunc(k, v): # type: (Any, Any) -> Any
if k in valueFrom:
return expression.do_eval(valueFrom[k], vfinputs, self.workflow.requirements,
None, None, {}, context=v)
@@ -221,21 +234,33 @@ class WorkflowJob(object):
method = step.tool.get("scatterMethod")
if method is None and len(scatter) != 1:
raise WorkflowException("Must specify scatterMethod when scattering over multiple inputs")
- if "valueFrom" not in kwargs:
- kwargs["valueFrom"] = valueFromFunc
+ kwargs["valueFrom"] = valueFromFunc
+
+ inputobj = {k: valueFromFunc(k, v) if k not in scatter else v
+ for k,v in inputobj.items()}
+
if method == "dotproduct" or method is None:
jobs = dotproduct_scatter(step, inputobj, basedir, scatter,
- callback, **kwargs)
+ cast( # known bug with mypy
+ # https://github.com/python/mypy/issues/797
+ Callable[[Any], Any],callback), **kwargs)
elif method == "nested_crossproduct":
- jobs = nested_crossproduct_scatter(step, inputobj,
- basedir, scatter, callback, **kwargs)
+ jobs = nested_crossproduct_scatter(step, inputobj, basedir,
+ scatter, cast(Callable[[Any], Any], callback),
+ # known bug in mypy
+ # https://github.com/python/mypy/issues/797
+ **kwargs)
elif method == "flat_crossproduct":
jobs = flat_crossproduct_scatter(step, inputobj, basedir,
- scatter, callback, 0, **kwargs)
+ scatter,
+ cast(Callable[[Any], Any],
+ # known bug in mypy
+ # https://github.com/python/mypy/issues/797
+ callback), 0, **kwargs)
else:
- _logger.debug("[job %s] job input %s", step.name, json.dumps(inputobj, indent=4))
+ _logger.debug(u"[job %s] job input %s", step.name, json.dumps(inputobj, indent=4))
inputobj = {k: valueFromFunc(k, v) for k,v in inputobj.items()}
- _logger.debug("[job %s] evaluated job input to %s", step.name, json.dumps(inputobj, indent=4))
+ _logger.debug(u"[job %s] evaluated job input to %s", step.name, json.dumps(inputobj, indent=4))
jobs = step.job(inputobj, basedir, callback, **kwargs)
step.submitted = True
@@ -244,15 +269,16 @@ class WorkflowJob(object):
yield j
except WorkflowException:
raise
- except Exception as e:
+ except Exception:
_logger.exception("Unhandled exception")
self.processStatus = "permanentFail"
step.completed = True
def run(self, **kwargs):
- _logger.debug("[%s] workflow starting", self.name)
+ _logger.debug(u"[%s] workflow starting", self.name)
def job(self, joborder, basedir, output_callback, move_outputs=True, **kwargs):
+ # type: (Dict[str,str], str, Callable[[Any, Any], Any], bool, **Any) -> Generator[WorkflowJob, None, None]
self.state = {}
self.processStatus = "success"
@@ -266,7 +292,7 @@ class WorkflowJob(object):
elif "default" in i:
self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(i["default"]))
else:
- raise WorkflowException("Input '%s' not in input object and does not have a default value." % (i["id"]))
+ raise WorkflowException(u"Input '%s' not in input object and does not have a default value." % (i["id"]))
for s in self.steps:
for out in s.tool["outputs"]:
@@ -275,7 +301,6 @@ class WorkflowJob(object):
output_dirs = set()
completed = 0
- iterables = []
while completed < len(self.steps) and self.processStatus == "success":
made_progress = False
@@ -306,7 +331,7 @@ class WorkflowJob(object):
raise WorkflowException("Output for workflow not available")
if move_outputs:
- targets = set()
+ targets = set() # type: Set[str]
conflicts = set()
outfiles = findfiles(wo)
@@ -328,27 +353,28 @@ class WorkflowJob(object):
dst = os.path.join(self.outdir, src[len(a)+1:])
if dst in conflicts:
sp = os.path.splitext(dst)
- dst = "%s-%s%s" % (sp[0], str(random.randint(1, 1000000000)), sp[1])
+ dst = u"%s-%s%s" % (sp[0], str(random.randint(1, 1000000000)), sp[1])
dirname = os.path.dirname(dst)
if not os.path.exists(dirname):
os.makedirs(dirname)
- _logger.debug("[%s] Moving '%s' to '%s'", self.name, src, dst)
+ _logger.debug(u"[%s] Moving '%s' to '%s'", self.name, src, dst)
shutil.move(src, dst)
f["path"] = dst
for a in output_dirs:
if os.path.exists(a) and empty_subtree(a):
if kwargs.get("rm_tmpdir", True):
- _logger.debug("[%s] Removing intermediate output directory %s", self.name, a)
+ _logger.debug(u"[%s] Removing intermediate output directory %s", self.name, a)
shutil.rmtree(a, True)
- _logger.info("[%s] outdir is %s", self.name, self.outdir)
+ _logger.info(u"[%s] outdir is %s", self.name, self.outdir)
output_callback(wo, self.processStatus)
class Workflow(Process):
def __init__(self, toolpath_object, **kwargs):
+ # type: (Dict[str, Any], **Any) -> None
super(Workflow, self).__init__(toolpath_object, **kwargs)
kwargs["requirements"] = self.requirements
@@ -361,18 +387,26 @@ class Workflow(Process):
# TODO: statically validate data links instead of doing it at runtime.
def job(self, joborder, basedir, output_callback, **kwargs):
+ # type: (Dict[str,str], str, Callable[[Any, Any], Any], **Any) -> Generator[WorkflowJob, None, None]
builder = self._init_job(joborder, basedir, **kwargs)
wj = WorkflowJob(self, **kwargs)
yield wj
- kwargs["part_of"] = "workflow %s" % wj.name
+ kwargs["part_of"] = u"workflow %s" % wj.name
for w in wj.job(builder.job, basedir, output_callback, **kwargs):
yield w
+ def visit(self, op):
+ op(self.tool)
+ for s in self.steps:
+ s.visit(op)
+
class WorkflowStep(Process):
+
def __init__(self, toolpath_object, pos, **kwargs):
+ # type: (Dict[str, Any], int, **Any) -> None
if "id" in toolpath_object:
self.id = toolpath_object["id"]
else:
@@ -381,16 +415,15 @@ class WorkflowStep(Process):
try:
makeTool = kwargs.get("makeTool")
runobj = None
- if isinstance(toolpath_object["run"], basestring):
- runobj, _ = schema_salad.schema.load_and_validate(kwargs["loader"],
- kwargs["avsc_names"],
- toolpath_object["run"],
- True)
+ if isinstance(toolpath_object["run"], (str, unicode)):
+ runobj = schema_salad.schema.load_and_validate(
+ kwargs["loader"], kwargs["avsc_names"],
+ toolpath_object["run"], True)[0]
else:
runobj = toolpath_object["run"]
self.embedded_tool = makeTool(runobj, **kwargs)
except validate.ValidationException as v:
- raise WorkflowException("Tool definition %s failed validation:\n%s" % (toolpath_object["run"], validate.indent(str(v))))
+ raise WorkflowException(u"Tool definition %s failed validation:\n%s" % (toolpath_object["run"], validate.indent(str(v))))
for field in ("inputs", "outputs"):
for i in toolpath_object[field]:
@@ -430,7 +463,7 @@ class WorkflowStep(Process):
inp_map = {i["id"]: i for i in inputparms}
for s in scatter:
if s not in inp_map:
- raise WorkflowException("Scatter parameter '%s' does not correspond to an input parameter of this step, inputs are %s" % (s, inp_map.keys()))
+ raise WorkflowException(u"Scatter parameter '%s' does not correspond to an input parameter of this step, inputs are %s" % (s, inp_map.keys()))
inp_map[s]["type"] = {"type": "array", "items": inp_map[s]["type"]}
@@ -446,6 +479,7 @@ class WorkflowStep(Process):
self.tool["outputs"] = outputparms
def receive_output(self, output_callback, jobout, processStatus):
+ # type: (Callable[...,Any], Dict[str, str], str) -> None
#_logger.debug("WorkflowStep output from run is %s", jobout)
output = {}
for i in self.tool["outputs"]:
@@ -457,6 +491,7 @@ class WorkflowStep(Process):
output_callback(output, processStatus)
def job(self, joborder, basedir, output_callback, **kwargs):
+ # type: (Dict[str, Any], str, Callable[...,Any], **Any) -> Generator
for i in self.tool["inputs"]:
p = i["id"]
field = shortname(p)
@@ -472,22 +507,28 @@ class WorkflowStep(Process):
**kwargs):
yield t
except WorkflowException:
- _logger.error("Exception on step '%s'", kwargs.get("name"))
+ _logger.error(u"Exception on step '%s'", kwargs.get("name"))
raise
except Exception as e:
_logger.exception("Unexpected exception")
raise WorkflowException(str(e))
+ def visit(self, op):
+ self.embedded_tool.visit(op)
+
class ReceiveScatterOutput(object):
+
def __init__(self, output_callback, dest):
+ # type: (Callable[..., Any], Dict[str,List[str]]) -> None
self.dest = dest
self.completed = 0
self.processStatus = "success"
- self.total = None
+ self.total = None # type: int
self.output_callback = output_callback
def receive_scatter_output(self, index, jobout, processStatus):
+ # type: (int, Dict[str, str], str) -> None
for k,v in jobout.items():
self.dest[k][index] = v
@@ -500,13 +541,14 @@ class ReceiveScatterOutput(object):
if self.completed == self.total:
self.output_callback(self.dest, self.processStatus)
- def setTotal(self, total):
+ def setTotal(self, total): # type: (int) -> None
self.total = total
if self.completed == self.total:
self.output_callback(self.dest, self.processStatus)
def dotproduct_scatter(process, joborder, basedir, scatter_keys, output_callback, **kwargs):
+ # type: (WorkflowJobStep, Dict[str, Any], str, List[str], Callable[..., Any], **Any) -> Generator[WorkflowJob, None, None]
l = None
for s in scatter_keys:
if l is None:
@@ -514,7 +556,7 @@ def dotproduct_scatter(process, joborder, basedir, scatter_keys, output_callback
elif l != len(joborder[s]):
raise WorkflowException("Length of input arrays must be equal when performing dotproduct scatter.")
- output = {}
+ output = {} # type: Dict[str,List[str]]
for i in process.tool["outputs"]:
output[i["id"]] = [None] * l
@@ -532,9 +574,10 @@ def dotproduct_scatter(process, joborder, basedir, scatter_keys, output_callback
def nested_crossproduct_scatter(process, joborder, basedir, scatter_keys, output_callback, **kwargs):
+ # type: (WorkflowJobStep, Dict[str, Any], str, List[str], Callable[..., Any], **Any) -> Generator[WorkflowJob, None, None]
scatter_key = scatter_keys[0]
l = len(joborder[scatter_key])
- output = {}
+ output = {} # type: Dict[str,List[str]]
for i in process.tool["outputs"]:
output[i["id"]] = [None] * l
@@ -548,13 +591,19 @@ def nested_crossproduct_scatter(process, joborder, basedir, scatter_keys, output
for j in process.job(jo, basedir, functools.partial(rc.receive_scatter_output, n), **kwargs):
yield j
else:
- for j in nested_crossproduct_scatter(process, jo, basedir, scatter_keys[1:], functools.partial(rc.receive_scatter_output, n), **kwargs):
+ for j in nested_crossproduct_scatter(process, jo, basedir,
+ scatter_keys[1:], cast( # known bug with mypy
+ # https://github.com/python/mypy/issues/797
+ Callable[[Any], Any],
+ functools.partial(rc.receive_scatter_output, n)),
+ **kwargs):
yield j
rc.setTotal(l)
def crossproduct_size(joborder, scatter_keys):
+ # type: (Dict[str, Any], List[str]) -> int
scatter_key = scatter_keys[0]
if len(scatter_keys) == 1:
sum = len(joborder[scatter_key])
@@ -567,16 +616,20 @@ def crossproduct_size(joborder, scatter_keys):
return sum
def flat_crossproduct_scatter(process, joborder, basedir, scatter_keys, output_callback, startindex, **kwargs):
+ # type: (WorkflowJobStep, Dict[str, Any], str, List[str], Union[ReceiveScatterOutput,Callable[..., Any]], int, **Any) -> Generator[WorkflowJob, None, None]
scatter_key = scatter_keys[0]
l = len(joborder[scatter_key])
+ rc = None # type: ReceiveScatterOutput
if startindex == 0 and not isinstance(output_callback, ReceiveScatterOutput):
- output = {}
+ output = {} # type: Dict[str,List[str]]
for i in process.tool["outputs"]:
output[i["id"]] = [None] * crossproduct_size(joborder, scatter_keys)
rc = ReceiveScatterOutput(output_callback, output)
- else:
+ elif isinstance(output_callback, ReceiveScatterOutput):
rc = output_callback
+ else:
+ raise Exception("Unhandled code path. Please report this.")
put = startindex
for n in range(0, l):
diff --git a/gittaggers.py b/gittaggers.py
index 05ce123..53dda8f 100644
--- a/gittaggers.py
+++ b/gittaggers.py
@@ -2,12 +2,15 @@ from setuptools.command.egg_info import egg_info
import subprocess
import time
+
class EggInfoFromGit(egg_info):
+
"""Tag the build with git commit timestamp.
If a build tag has already been set (e.g., "egg_info -b", building
from source package), leave it alone.
"""
+
def git_timestamp_tag(self):
gitinfo = subprocess.check_output(
['git', 'log', '--first-parent', '--max-count=1',
diff --git a/setup.cfg b/setup.cfg
index bd7adb4..421cc91 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,8 @@
+[flake8]
+ignore = E124,E128,E129,E201,E202,E225,E226,E231,E265,E271,E302,E303,F401,E402,E501,W503,E731,F811,F821,F841
+
[egg_info]
-tag_build = .20160316204054
+tag_build = .20160504183010
tag_date = 0
tag_svn_revision = 0
diff --git a/setup.py b/setup.py
index 6529439..86e5aca 100644
--- a/setup.py
+++ b/setup.py
@@ -4,9 +4,6 @@ import os
import sys
import shutil
-import ez_setup
-ez_setup.use_setuptools()
-
import setuptools.command.egg_info as egg_info_cmd
from setuptools import setup, find_packages
@@ -36,10 +33,11 @@ setup(name='cwltool',
install_requires=[
'requests',
'PyYAML',
- 'rdflib >= 4.2.0',
+ 'rdflib >= 4.1.0',
'rdflib-jsonld >= 0.3.0',
'shellescape',
- 'schema_salad == 1.7.20160316203940'
+ 'schema_salad == 1.7.20160316203940',
+ 'typing'
],
test_suite='tests',
tests_require=[],
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/cwltool.git
More information about the debian-med-commit
mailing list