[med-svn] [python-pyflow] 01/04: New upstream version 1.1.14
Andreas Tille
tille at debian.org
Sun Feb 12 07:21:37 UTC 2017
This is an automated email from the git hooks/post-receive script.
tille pushed a commit to branch master
in repository python-pyflow.
commit 2e215d91603725da789e6129fc5e2e14f7399c1a
Author: Andreas Tille <tille at debian.org>
Date: Sat Feb 11 20:44:24 2017 +0100
New upstream version 1.1.14
---
pyflow/doc/ChangeLog.txt | 2 +
pyflow/src/pyflow.py | 142 +++++++++++++++++++++++++++++++++++++------
scratch/bench/README.md | 3 +
scratch/bench/manyThreads.py | 41 +++++++++++++
4 files changed, 171 insertions(+), 17 deletions(-)
diff --git a/pyflow/doc/ChangeLog.txt b/pyflow/doc/ChangeLog.txt
index bc0bcab..0ce9f92 100644
--- a/pyflow/doc/ChangeLog.txt
+++ b/pyflow/doc/ChangeLog.txt
@@ -1,3 +1,5 @@
+v1.1.14 20170112
+* STREL-391 improve task throughput for nodes with 100's of cores
v1.1.13 20160414
* fix rare issue with sets of dependent checkpoint tasks
* fix for travis CI script from Dominic Jodoin
diff --git a/pyflow/src/pyflow.py b/pyflow/src/pyflow.py
index 0ed516d..8af84ed 100644
--- a/pyflow/src/pyflow.py
+++ b/pyflow/src/pyflow.py
@@ -1132,8 +1132,6 @@ class CommandTaskRunner(BaseTaskRunner) :
@param tmpDir: location to write files containing output from
the task wrapper script (and not the wrapped task)
"""
- import pickle
-
BaseTaskRunner.__init__(self, runStatus, taskStr, sharedFlowLog, setRunstate)
self.cmd = cmd
@@ -1145,29 +1143,38 @@ class CommandTaskRunner(BaseTaskRunner) :
self.errFile = errFile
self.tmpDir = tmpDir
self.schedulerArgList = schedulerArgList
+ self.runid = runid
+ self.taskStr = taskStr
if not os.path.isfile(self.taskWrapper) :
raise Exception("Can't find task wrapper script: %s" % self.taskWrapper)
+
+ def initFileSystemItems(self):
+ import pickle
+
ensureDir(self.tmpDir)
self.wrapFile = os.path.join(self.tmpDir, "pyflowTaskWrapper.signal.txt")
# setup all the data to be passed to the taskWrapper and put this in argFile:
- taskInfo = { 'nCores' : nCores,
- 'outFile' : outFile, 'errFile' : errFile,
- 'cwd' : cmd.cwd, 'env' : cmd.env,
- 'cmd' : cmd.cmd, 'isShellCmd' : (cmd.type == "str") }
+ taskInfo = { 'nCores' : self.nCores,
+ 'outFile' : self.outFile, 'errFile' : self.errFile,
+ 'cwd' : self.cmd.cwd, 'env' : self.cmd.env,
+ 'cmd' : self.cmd.cmd, 'isShellCmd' : (self.cmd.type == "str") }
argFile = os.path.join(self.tmpDir, "taskWrapperParameters.pickle")
pickle.dump(taskInfo, open(argFile, "w"))
- self.wrapperCmd = [self.taskWrapper, runid, taskStr, argFile]
-
+ self.wrapperCmd = [self.taskWrapper, self.runid, self.taskStr, argFile]
def _run(self) :
"""
Outer loop of _run() handles task retry behavior:
"""
+
+ # these initialization steps only need to happen once:
+ self.initFileSystemItems()
+
startTime = time.time()
retries = 0
retInfo = Bunch(retval=1, taskExitMsg="", isAllowRetry=False)
@@ -1610,6 +1617,39 @@ class SGETaskRunner(CommandTaskRunner) :
+class TaskFileWriter(StoppableThread) :
+ """
+ This class runs on a separate thread and is
+ responsible for updating the state and info task
+ files
+ """
+
+ def __init__(self, writeFunc) :
+ StoppableThread.__init__(self)
+ # parameter copy:
+ self.writeFunc = writeFunc
+ # thread settings:
+ self.setDaemon(True)
+ self.setName("TaskFileWriter-Thread")
+
+ self.isWrite = threading.Event()
+
+ def run(self) :
+ while not self.stopped() :
+ self._writeIfSet()
+ time.sleep(5)
+ self.isWrite.wait()
+
+ def flush(self):
+ self._writeIfSet()
+
+ def _writeIfSet(self) :
+ if self.isWrite.isSet() :
+ self.isWrite.clear()
+ self.writeFunc()
+
+
+
class TaskManager(StoppableThread) :
"""
This class runs on a separate thread from workflowRunner,
@@ -1993,14 +2033,14 @@ class TaskNode(object) :
Represents an individual task in the task graph
"""
- def __init__(self, tdag, lock, init_id, namespace, label, payload, isContinued, isFinishedEvent) :
- self.tdag = tdag
+ def __init__(self, lock, init_id, namespace, label, payload, isContinued, isFinishedEvent, isWriteTaskStatus) :
self.lock = lock
self.id = init_id
self.namespace = namespace
self.label = label
self.payload = payload
self.isContinued = isContinued
+ self.isWriteTaskStatus = isWriteTaskStatus
# if true, do not execute this task or honor it as a dependency for child tasks
self.isIgnoreThis = False
@@ -2107,7 +2147,7 @@ class TaskNode(object) :
else :
self.runstateUpdateTimeStamp = updateTimeStamp
self.runstate = runstate
- self.tdag.writeTaskStatus()
+ self.isWriteTaskStatus.set()
#def getParents(self) :
# return self.parents
@@ -2176,6 +2216,10 @@ class TaskDAG(object) :
# unique id for each task in each run -- not persistent across continued runs:
self.taskId = 0
+ # as tasks are added, occasionally spool task info to disk, and record the last
+ # task index written + 1
+ self.lastTaskIdWritten = 0
+
# it will be easier for people to read the task status file if
# the tasks are in approximately the same order as they were
# added by the workflow:
@@ -2191,6 +2235,9 @@ class TaskDAG(object) :
# cycle applies
self.isFinishedEvent = threading.Event()
+ self.isWriteTaskInfo = None
+ self.isWriteTaskStatus = None
+
@lockMethod
def isTaskPresent(self, namespace, label) :
return ((namespace, label) in self.labelMap)
@@ -2368,7 +2415,7 @@ class TaskDAG(object) :
else:
raise Exception("Task: '%s' is already in TaskDAG" % (fullLabel))
- task = TaskNode(self, self.lock, self.taskId, namespace, label, payload, isContinued, self.isFinishedEvent)
+ task = TaskNode(self.lock, self.taskId, namespace, label, payload, isContinued, self.isFinishedEvent, self.isWriteTaskStatus)
self.taskId += 1
@@ -2399,8 +2446,8 @@ class TaskDAG(object) :
task.isReset=True
if not isContinued:
- self.writeTaskInfo(task)
- self.writeTaskStatus()
+ self.isWriteTaskInfo.set()
+ self.isWriteTaskStatus.set()
# determine if this is an ignoreTasksAfter node
if label in self.ignoreTasksAfter :
@@ -2518,9 +2565,8 @@ class TaskDAG(object) :
return val
-
@lockMethod
- def writeTaskInfo(self, task) :
+ def writeTaskInfoOld(self, task) :
"""
appends a description of new tasks to the taskInfo file
"""
@@ -2553,6 +2599,57 @@ class TaskDAG(object) :
fp.write(taskline + "\n")
fp.close()
+ @lockMethod
+ def writeTaskInfo(self) :
+ """
+ appends a description of all new tasks to the taskInfo file
+ """
+
+ def getTaskLineFromTask(task) :
+ """
+ translate a task into its single-line summary format in the taskInfo file
+ """
+ depstring = ""
+ if len(task.parents) :
+ depstring = ",".join([p.label for p in task.parents])
+
+ cmdstring = ""
+ nCores = "0"
+ memMb = "0"
+ priority = "0"
+ isForceLocal = "0"
+ payload = task.payload
+ cwdstring = ""
+ if payload.type() == "command" :
+ cmdstring = str(payload.cmd)
+ nCores = str(payload.nCores)
+ memMb = str(payload.memMb)
+ priority = str(payload.priority)
+ isForceLocal = boolToStr(payload.isForceLocal)
+ cwdstring = payload.cmd.cwd
+ elif payload.type() == "workflow" :
+ cmdstring = payload.name()
+ else :
+ assert 0
+ return "\t".join((task.label, task.namespace, payload.type(),
+ nCores, memMb, priority,
+ isForceLocal, depstring, cwdstring, cmdstring))
+
+ assert (self.lastTaskIdWritten <= self.taskId)
+
+ if self.lastTaskIdWritten == self.taskId : return
+
+ newTaskLines = []
+ while self.lastTaskIdWritten < self.taskId :
+ task = self.labelMap[self.addOrder[self.lastTaskIdWritten]]
+ newTaskLines.append(getTaskLineFromTask(task))
+ self.lastTaskIdWritten += 1
+
+ fp = open(self.taskInfoFile, "a")
+ for taskLine in newTaskLines :
+ fp.write(taskLine + "\n")
+ fp.close()
+
# workflowRunner:
@@ -3925,7 +4022,6 @@ class WorkflowRunner(object) :
stackDumpFp.close()
-
def _runWorkflow(self, param) :
#
# Primary workflow logic when nothing goes wrong:
@@ -3964,6 +4060,9 @@ class WorkflowRunner(object) :
runStatus.errorCode = 1
runStatus.errorMessage = "Thread: '%s', has stopped without a traceable cause" % (trun.getName())
+ self._taskInfoWriter.flush()
+ self._taskStatusWriter.flush()
+
return self._evalWorkflow(runStatus)
@@ -3999,6 +4098,15 @@ class WorkflowRunner(object) :
if cdata.param.isContinue :
self._setupContinuedWorkflow()
+ self._taskInfoWriter = TaskFileWriter(self._tdag.writeTaskInfo)
+ self._taskStatusWriter = TaskFileWriter(self._tdag.writeTaskStatus)
+
+ self._tdag.isWriteTaskInfo = self._taskInfoWriter.isWrite
+ self._tdag.isWriteTaskStatus = self._taskStatusWriter.isWrite
+
+ self._taskInfoWriter.start()
+ self._taskStatusWriter.start()
+
def _createContinuedStateFile(self) :
diff --git a/scratch/bench/README.md b/scratch/bench/README.md
new file mode 100644
index 0000000..f013811
--- /dev/null
+++ b/scratch/bench/README.md
@@ -0,0 +1,3 @@
+The manyThreads benchmark differentiates the optimizations introduced by STREL-391 to help
+improve total task throughput when a very high number of cores is available on a single
+machine.
diff --git a/scratch/bench/manyThreads.py b/scratch/bench/manyThreads.py
new file mode 100644
index 0000000..68d7643
--- /dev/null
+++ b/scratch/bench/manyThreads.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+import os.path
+import sys
+
+# add module path by hand
+#
+scriptDir=os.path.abspath(os.path.dirname(__file__))
+sys.path.append(scriptDir+"/../../pyflow/src")
+
+from pyflow import WorkflowRunner
+
+
+
+class SimpleWorkflow(WorkflowRunner) :
+ """
+ A workflow designed to differentiate the runtime impact
+ of STREL-391
+ """
+
+ def __init__(self) :
+ pass
+
+ def workflow(self) :
+ for i in range(4000) :
+ self.addTask("task%s" % (i),["sleep","0"])
+
+
+
+# Instantiate the workflow
+#
+# parameters are passed into the workflow via its constructor:
+#
+wflow = SimpleWorkflow()
+
+# Run the worklow:
+#
+retval=wflow.run(mode="local",nCores=400,isQuiet=True)
+
+sys.exit(retval)
+
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-pyflow.git
More information about the debian-med-commit
mailing list