[med-svn] [falcon] 02/04: Imported Upstream version 1.8.5
Afif Elghraoui
afif at moszumanska.debian.org
Mon Dec 12 09:19:06 UTC 2016
This is an automated email from the git hooks/post-receive script.
afif pushed a commit to branch master
in repository falcon.
commit d74e7a355f14ad7d9d1810ce7d6423108d82b451
Author: Afif Elghraoui <afif at debian.org>
Date: Sun Dec 11 14:35:04 2016 -0800
Imported Upstream version 1.8.5
---
FALCON-make/makefile | 2 +
FALCON/falcon_kit/bash.py | 7 ++-
FALCON/falcon_kit/mains/get_read_ctg_map.py | 56 +++--------------
FALCON/falcon_kit/mains/run1.py | 8 ++-
FALCON/falcon_kit/pype_tasks.py | 69 +++++++++++++++-----
FALCON/falcon_kit/run_support.py | 8 ++-
FALCON_unzip/falcon_unzip/phasing.py | 1 +
pypeFLOW/pwatcher/network_based.py | 2 +-
pypeFLOW/pypeflow/do_task.py | 81 +++++++++++++++++-------
pypeFLOW/pypeflow/simple_pwatcher_bridge.py | 97 ++++++++++++++++++-----------
10 files changed, 202 insertions(+), 129 deletions(-)
diff --git a/FALCON-make/makefile b/FALCON-make/makefile
index 52b42d3..e712e6e 100644
--- a/FALCON-make/makefile
+++ b/FALCON-make/makefile
@@ -37,6 +37,8 @@ install-pypeFLOW:
cd ${FALCON_WORKSPACE}/pypeFLOW; pip uninstall -v .; pip install -v ${FALCON_PIP_USER} ${FALCON_PIP_EDIT} .
install-FALCON: install-pypeFLOW
cd ${FALCON_WORKSPACE}/FALCON; pip uninstall -v .; pip install -v ${FALCON_PIP_USER} ${FALCON_PIP_EDIT} .
+install-FALCON_unzip: install-pypeFLOW
+ cd ${FALCON_WORKSPACE}/FALCON_unzip; pip uninstall -v .; pip install -v ${FALCON_PIP_USER} ${FALCON_PIP_EDIT} .
install-git-sym:
# TODO: copy vs. symlink?
ln -sf $(abspath ${FALCON_WORKSPACE}/git-sym/git-sym) ${FALCON_PREFIX}/bin/git-sym
diff --git a/FALCON/falcon_kit/bash.py b/FALCON/falcon_kit/bash.py
index dbf3378..1f64a96 100644
--- a/FALCON/falcon_kit/bash.py
+++ b/FALCON/falcon_kit/bash.py
@@ -55,7 +55,7 @@ def write_script(script, script_fn, job_done_fn=None):
exe = write_sub_script(ofs, script)
def write_script_and_wrapper_top(script, wrapper_fn, job_done):
- """
+ """NOT USED.
Write script to a filename based on wrapper_fn, in same directory.
Write wrapper to call it,
and to write job_done on success and job_done.exit on any exit.
@@ -92,6 +92,8 @@ def select_rundir(tmpdir, wdir, script):
return '{}/{}/falcontmp/{}/{}'.format(tmpdir, user, wdir, digest)
def write_script_and_wrapper_for_tmp(tmpdir, script, wrapper_fn, job_done):
+ """NOT USED. This will be done in pypeFLOW.
+ """
wdir = os.path.dirname(os.path.abspath(wrapper_fn))
root, ext = os.path.splitext(os.path.basename(wrapper_fn))
sub_script_bfn = root + '.xsub' + ext
@@ -117,7 +119,8 @@ rm -rf {rdir}
return write_script_and_wrapper_top(tmp_wrapper_script, wrapper_fn, job_done)
def get_write_script_and_wrapper(config):
- """Return a function.
+ """NOT USED. This will be done in pypeFLOW.
+ Return a function.
For now, we actually use only config['use_tmpdir'], a boolean.
"""
use_tmpdir = config.get('use_tmpdir', None)
diff --git a/FALCON/falcon_kit/mains/get_read_ctg_map.py b/FALCON/falcon_kit/mains/get_read_ctg_map.py
index 91470ff..0ef0825 100644
--- a/FALCON/falcon_kit/mains/get_read_ctg_map.py
+++ b/FALCON/falcon_kit/mains/get_read_ctg_map.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
makePypeLocalFile, fn, PypeTask)
PypeThreadTaskBase = MyFakePypeThreadTaskBase
-from falcon_kit.fc_asm_graph import AsmGraph
+from .. import pype_tasks
import argparse
import glob
import logging
@@ -11,55 +11,13 @@ import subprocess as sp
import shlex
import os
+LOG = logging.getLogger(__name__)
+
def make_dirs(d):
if not os.path.isdir(d):
+ LOG.debug('mkdirs {}'.format(d))
os.makedirs(d)
-def dump_rawread_ids(self):
- rawread_db = fn(self.rawread_db)
- rawread_id_file = fn(self.rawread_id_file)
- os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file))
-
-def dump_pread_ids(self):
- pread_db = fn(self.pread_db)
- pread_id_file = fn(self.pread_id_file)
- os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file))
-
-def generate_read_to_ctg_map(self):
- rawread_id_file = fn(self.rawread_id_file)
- pread_id_file = fn(self.pread_id_file)
- read_to_contig_map = fn(self.read_to_contig_map)
-
- pread_did_to_rid = open(pread_id_file).read().split('\n')
- rid_to_oid = open(rawread_id_file).read().split('\n')
-
- asm_G = AsmGraph(fn(self.sg_edges_list),
- fn(self.utg_data),
- fn(self.ctg_paths))
-
- pread_to_contigs = {}
-
- with open(read_to_contig_map, 'w') as f:
- for ctg in asm_G.ctg_data:
- if ctg[-1] == 'R':
- continue
- ctg_g = asm_G.get_sg_for_ctg(ctg)
- for n in ctg_g.nodes():
- pid = int(n.split(':')[0])
-
- rid = pread_did_to_rid[pid].split('/')[1]
- rid = int(int(rid)/10)
- oid = rid_to_oid[rid]
- k = (pid, rid, oid)
- pread_to_contigs.setdefault(k, set())
- pread_to_contigs[k].add(ctg)
-
-
- for k in pread_to_contigs:
- pid, rid, oid = k
- for ctg in list(pread_to_contigs[ k ]):
- print >>f, '%09d %09d %s %s' % (pid, rid, oid, ctg)
-
def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
read_map_dir = os.path.abspath(os.path.join(asm_dir, 'read_maps'))
make_dirs(read_map_dir)
@@ -82,7 +40,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
inputs = {'rawread_db': rawread_db},
outputs = {'rawread_id_file': rawread_id_file},
)
- wf.addTask(task(dump_rawread_ids))
+ wf.addTask(task(pype_tasks.task_dump_rawread_ids))
pread_db = makePypeLocalFile(os.path.join(pread_dir, 'preads.db'))
pread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids'))
@@ -91,7 +49,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
inputs = {'pread_db': pread_db},
outputs = {'pread_id_file': pread_id_file},
)
- wf.addTask(task(dump_pread_ids))
+ wf.addTask(task(pype_tasks.task_dump_pread_ids))
wf.refreshTargets() # block
@@ -111,7 +69,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
inputs = inputs,
outputs = {'read_to_contig_map': read_to_contig_map},
)
- wf.addTask(task(generate_read_to_ctg_map))
+ wf.addTask(task(pype_tasks.task_generate_read_to_ctg_map))
wf.refreshTargets() # block
diff --git a/FALCON/falcon_kit/mains/run1.py b/FALCON/falcon_kit/mains/run1.py
index d7bb13d..2e5f99d 100644
--- a/FALCON/falcon_kit/mains/run1.py
+++ b/FALCON/falcon_kit/mains/run1.py
@@ -125,7 +125,9 @@ def main1(prog_name, input_config_fn, logger_config_fn=None):
job_queue=config['job_queue'],
sge_option=config.get('sge_option', ''),
watcher_type=config['pwatcher_type'],
- watcher_directory=config['pwatcher_directory'])
+ watcher_directory=config['pwatcher_directory'],
+ use_tmpdir=config.get('use_tmpdir'),
+ )
run(wf, config,
os.path.abspath(input_config_fn),
input_fofn_plf=input_fofn_plf,
@@ -261,6 +263,7 @@ def run(wf, config,
make_task = PypeTask(
inputs = {
'gathered': las_fopfn_plf,
+ 'db': raw_reads_db_plf,
},
outputs = {
'scattered': scattered_plf,
@@ -452,3 +455,6 @@ def main(argv=sys.argv):
help='(Optional)JSON config for standard Python logging module')
args = parser.parse_args(argv[1:])
main1(argv[0], args.config, args.logger)
+
+if __name__ == '__main__':
+ main()
diff --git a/FALCON/falcon_kit/pype_tasks.py b/FALCON/falcon_kit/pype_tasks.py
index 25857e0..169bf5e 100644
--- a/FALCON/falcon_kit/pype_tasks.py
+++ b/FALCON/falcon_kit/pype_tasks.py
@@ -37,14 +37,12 @@ def task_make_fofn_abs_raw(self):
script_fn = 'noop.sh'
open(script_fn, 'w').write('echo NOOP raw')
self.generated_script_fn = script_fn
- mkdir(os.path.dirname(fn(self.o_fofn)))
support.make_fofn_abs(fn(self.i_fofn), fn(self.o_fofn))
def task_make_fofn_abs_preads(self):
script_fn = 'noop.sh'
open(script_fn, 'w').write('echo NOOP preads')
self.generated_script_fn = script_fn
- mkdir(os.path.dirname(fn(self.o_fofn)))
support.make_fofn_abs(fn(self.i_fofn), fn(self.o_fofn))
def task_build_rdb(self):
@@ -89,7 +87,6 @@ def task_build_pdb(self): #essential the same as build_rdb() but the subtle dif
def task_run_db2falcon(self):
wd = self.parameters['wd']
- mkdir(wd)
#self.las_fofn # TODO: Are there any implicit dependencies, or can we drop this?
job_done = fn(self.db2falcon_done)
preads4falcon_fn = fn(self.preads4falcon)
@@ -141,7 +138,6 @@ def task_report_pre_assembly(self):
# i_length_cutoff_fn was created long ago, so no filesystem issues.
length_cutoff = support.get_length_cutoff(length_cutoff, i_length_cutoff_fn)
cwd = self.parameters['cwd']
- mkdir(cwd)
script_fn = os.path.join(cwd , 'run_report_pre_assembly.sh')
job_done = os.path.join(cwd, 'report_pa_done')
kwds = {
@@ -162,7 +158,6 @@ def task_run_daligner(self):
daligner_script = self.parameters['daligner_script']
job_uid = self.parameters['job_uid']
cwd = self.parameters['cwd']
- #mkdir(cwd)
db_prefix = self.parameters['db_prefix']
config = self.parameters['config']
script_dir = os.path.join(cwd)
@@ -196,7 +191,6 @@ def task_run_las_merge(self):
script = self.parameters['merge_script']
job_id = self.parameters['job_id'] # aka 'block'
cwd = self.parameters['cwd']
- mkdir(cwd)
gathered_dict = read_gathered_las(gathered_las_fn)
las_paths = gathered_dict[job_id]
@@ -221,6 +215,7 @@ def task_run_las_merge(self):
def task_run_consensus(self):
las_fn = fn(self.las)
+ db_fn = fn(self.db)
out_file_fn = fn(self.out_file)
out_done = 'out.done' #fn(self.out_done)
job_id = self.parameters['job_id']
@@ -228,9 +223,8 @@ def task_run_consensus(self):
config = self.parameters['config']
prefix = self.parameters['prefix']
p_id = int(job_id)
- script_dir = os.path.join( cwd )
- script_fn = os.path.join( script_dir , 'c_%05d.sh' % (p_id))
- db_fn = os.path.abspath('{cwd}/../../{prefix}'.format(**locals())) # ASSUMING 2-levels deep. TODO: DB should be an input.
+ script_dir = os.path.join(cwd)
+ script_fn = os.path.join(script_dir, 'c_%05d.sh' % (p_id))
#merge_job_dir = os.path.dirname(merged_las_fn)
#las_fn = os.path.abspath('{merge_job_dir}/{prefix}.{job_id}.las'.format(**locals()))
args = {
@@ -317,9 +311,10 @@ def task_merge_scatter(self):
content = json.dumps(tasks, sort_keys=True, indent=4, separators=(',', ': '))
open(scatter_fn, 'w').write(content)
def task_consensus_scatter(self):
- scatter_fn = self.scattered
+ scattered_fn = self.scattered
gathered_fn = self.gathered
- wd = os.path.dirname(scatter_fn)
+ db_fn = self.db
+ wd = os.path.dirname(scattered_fn)
par = self.parameters
db_prefix = par['db_prefix']
config = par['config']
@@ -344,6 +339,7 @@ def task_consensus_scatter(self):
'sge_option': config['sge_option_cns'],
}
inputs = {'las': las_fn,
+ 'db': db_fn,
}
outputs = {'out_file': out_file_fn,
#'out_done': out_done_fn,
@@ -359,7 +355,7 @@ def task_consensus_scatter(self):
}
tasks.append(task_desc)
content = json.dumps(tasks, sort_keys=True, indent=4, separators=(',', ': '))
- open(scatter_fn, 'w').write(content)
+ open(scattered_fn, 'w').write(content)
def task_daligner_gather(self):
"""Find all .las leaves so far.
@@ -369,8 +365,6 @@ def task_daligner_gather(self):
nblock = self.parameters['nblock']
LOG.debug('nblock=%d, out_dir:\n%s'%(nblock, out_dict))
job_rundirs = [os.path.dirname(fn(dal_done)) for dal_done in out_dict.values()]
- wdir = os.path.dirname(gathered_fn)
- #mkdir(wdir)
with open(gathered_fn, 'w') as ofs:
for block, las_path in support.daligner_gather_las(job_rundirs):
ofs.write('{} {}\n'.format(block, las_path))
@@ -396,3 +390,50 @@ def task_merge_gather(self):
#pread_dir = os.path.dirname(wdir) # by convention, for now
# Generate las.fofn in run-dir. # No longer needed!
#system('find {}/m_*/ -name "preads.*.las" >| {}'.format(pread_dir, las_fofn_fn))
+
+
+def task_dump_rawread_ids(self):
+ rawread_db = fn(self.rawread_db)
+ rawread_id_file = fn(self.rawread_id_file)
+ system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file))
+
+def task_dump_pread_ids(self):
+ pread_db = fn(self.pread_db)
+ pread_id_file = fn(self.pread_id_file)
+ system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file))
+
+def task_generate_read_to_ctg_map(self):
+ from .fc_asm_graph import AsmGraph
+ rawread_id_file = fn(self.rawread_id_file)
+ pread_id_file = fn(self.pread_id_file)
+ read_to_contig_map = fn(self.read_to_contig_map)
+
+ pread_did_to_rid = open(pread_id_file).read().split('\n')
+ rid_to_oid = open(rawread_id_file).read().split('\n')
+
+ asm_G = AsmGraph(fn(self.sg_edges_list),
+ fn(self.utg_data),
+ fn(self.ctg_paths))
+
+ pread_to_contigs = {}
+
+ with open(read_to_contig_map, 'w') as f:
+ for ctg in asm_G.ctg_data:
+ if ctg[-1] == 'R':
+ continue
+ ctg_g = asm_G.get_sg_for_ctg(ctg)
+ for n in ctg_g.nodes():
+ pid = int(n.split(':')[0])
+
+ rid = pread_did_to_rid[pid].split('/')[1]
+ rid = int(int(rid)/10)
+ oid = rid_to_oid[rid]
+ k = (pid, rid, oid)
+ pread_to_contigs.setdefault(k, set())
+ pread_to_contigs[k].add(ctg)
+
+
+ for k in pread_to_contigs:
+ pid, rid, oid = k
+ for ctg in list(pread_to_contigs[ k ]):
+ print >>f, '%09d %09d %s %s' % (pid, rid, oid, ctg)
diff --git a/FALCON/falcon_kit/run_support.py b/FALCON/falcon_kit/run_support.py
index a3c56cb..a41f124 100644
--- a/FALCON/falcon_kit/run_support.py
+++ b/FALCON/falcon_kit/run_support.py
@@ -39,7 +39,9 @@ def update_env_in_script(fn, names):
ofs.write(content)
def use_tmpdir_for_files(basenames, src_dir, link_dir):
- """Generate script to copy db files to tmpdir (for speed).
+ """NOT USED. Kept only for reference. This will be done in pypeFLOW.
+
+ Generate script to copy db files to tmpdir (for speed).
- Choose tmp_dir, based on src_dir name.
- rsync basenames into tmp_dir # after 'flock', per file
- symlink from link_dir into tmp_dir.
@@ -291,7 +293,7 @@ def get_dict_from_old_falcon_cfg(config):
tmpdir = config.get(section, 'use_tmpdir')
if '/' in tmpdir:
tempfile.tempdir = tmpdir
- use_tmpdir = True
+ use_tmpdir = tmpdir
else:
use_tmpdir = config.getboolean(section, 'use_tmpdir')
else:
@@ -507,7 +509,7 @@ def run_falcon_asm(config, las_fofn_fn, preads4falcon_fasta_fn, db_file_fn, job_
def run_report_pre_assembly(i_raw_reads_db_fn, i_preads_fofn_fn, genome_length, length_cutoff, o_json_fn, job_done, script_fn):
script = bash.script_run_report_pre_assembly(i_raw_reads_db_fn, i_preads_fofn_fn, genome_length, length_cutoff, o_json_fn)
- bash.write_script_and_wrapper_top(script, script_fn, job_done)
+ bash.write_script(script, script_fn, job_done)
def run_daligner(daligner_script, db_prefix, config, job_done, script_fn):
bash.write_script(daligner_script, script_fn, job_done)
diff --git a/FALCON_unzip/falcon_unzip/phasing.py b/FALCON_unzip/falcon_unzip/phasing.py
index 5592869..6bd3151 100644
--- a/FALCON_unzip/falcon_unzip/phasing.py
+++ b/FALCON_unzip/falcon_unzip/phasing.py
@@ -519,6 +519,7 @@ def phasing(args):
parameters["base_dir"] = base_dir
generate_association_table_task = PypeTask( inputs = { "vmap_file": vmap_file },
outputs = { "atable_file": atable_file },
+ parameters = parameters,
) (generate_association_table)
wf.addTasks([generate_association_table_task])
diff --git a/pypeFLOW/pwatcher/network_based.py b/pypeFLOW/pwatcher/network_based.py
index 126c773..803431e 100755
--- a/pypeFLOW/pwatcher/network_based.py
+++ b/pypeFLOW/pwatcher/network_based.py
@@ -223,7 +223,7 @@ class ReuseAddrServer(SocketServer.TCPServer):
self.server_job_list[file][0] = os.path.getmtime(fn)
self.server_job_list[file][3] = rc
else:
- self.server_job_list[file] = [os.getmtime(fn), None, None, rc]
+ self.server_job_list[file] = [os.path.getmtime(fn), None, None, rc]
def __init__(self, server_address, RequestHandlerClass, server_directories):
self.allow_reuse_address = True
self.server_log_dir, self.server_pid_dir, self.server_exitrc_dir = server_directories
diff --git a/pypeFLOW/pypeflow/do_task.py b/pypeFLOW/pypeflow/do_task.py
index b3b36a8..3e9a868 100644
--- a/pypeFLOW/pypeflow/do_task.py
+++ b/pypeFLOW/pypeflow/do_task.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python2.7
-from . import do_support
+from . import do_support, util
import argparse
import contextlib
import importlib
@@ -71,7 +71,15 @@ def cd(newdir):
def mkdirs(path):
if not os.path.isdir(path):
- os.makedirs(path)
+ cmd = 'mkdir -p {}'.format(path)
+ util.system(cmd)
+def rmdirs(path):
+ if os.path.isdir(path):
+ if len(path) < 20 and 'home' in path:
+ LOG.error('Refusing to rm {!r} since it might be your homedir.'.format(path))
+ return
+ cmd = 'rm -rf {}'.format(path)
+ util.system(cmd)
def wait_for(fn):
global TIMEOUT
@@ -99,6 +107,22 @@ class OldTaskRunner(object):
self.inputs = inputs
self.outputs = outputs
+def run_python_func(func, inputs, outputs, parameters):
+ if False:
+ kwds = dict()
+ kwds.update(inputs)
+ kwds.update(outputs)
+ kwds.update(parameters)
+ func(**kwds)
+ else:
+ # old way, for now
+ cwd = os.getcwd()
+ parameters['cwd'] = cwd
+ self = OldTaskRunner(inputs, outputs, parameters)
+ func(self=self)
+ script_fn = getattr(self, 'generated_script_fn', None)
+ if script_fn is not None:
+ do_support.run_bash(script_fn)
def run(json_fn, timeout, tmpdir):
if isinstance(timeout, int):
global TIMEOUT
@@ -109,28 +133,41 @@ def run(json_fn, timeout, tmpdir):
LOG.debug(pprint.pformat(cfg))
for fn in cfg['inputs'].values():
wait_for(fn)
+ inputs = cfg['inputs']
+ outputs = cfg['outputs']
+ parameters = cfg['parameters']
python_function_name = cfg['python_function']
func = get_func(python_function_name)
- try:
- if False:
- kwds = dict()
- kwds.update(cfg['inputs'])
- kwds.update(cfg['outputs'])
- kwds.update(cfg['parameters'])
- func(**kwds)
- else:
- # old way, for now
- cwd = os.getcwd()
- cfg['parameters']['cwd'] = cwd
- self = OldTaskRunner(cfg['inputs'], cfg['outputs'], cfg['parameters'])
- func(self=self)
- script_fn = getattr(self, 'generated_script_fn', None)
- if script_fn is not None:
- do_support.run_bash(script_fn)
- except TypeError:
- # Report the actual function spec.
- LOG.error('For function "{}", {}'.format(python_function_name, inspect.getargspec(func)))
- raise
+ myinputs = dict(inputs)
+ myoutputs = dict(outputs)
+ finaloutdir = os.getcwd()
+ if tmpdir:
+ import getpass
+ user = getpass.getuser()
+ pid = os.getpid()
+ myrundir = '{tmpdir}/{user}/pypetmp/{finaloutdir}'.format(**locals())
+ rmdirs(myrundir)
+ mkdirs(myrundir)
+ # TODO(CD): Copy inputs w/ flock.
+ else:
+ myrundir = finaloutdir
+ with util.cd(myrundir):
+ try:
+ run_python_func(func, myinputs, myoutputs, parameters)
+ except TypeError:
+ # Report the actual function spec.
+ LOG.error('For function "{}", {}'.format(python_function_name, inspect.getargspec(func)))
+ raise
+ if tmpdir:
+ """
+ for k,v in outputs.iteritems():
+ cmd = 'mv -f {} {}'.format(
+ os.path.join(myrundir, v),
+ os.path.join(finaloutdir, v))
+ util.system(cmd)
+ """
+ cmd = 'rsync -av {}/ {}; rm -rf {}'.format(myrundir, finaloutdir, myrundir)
+ util.system(cmd)
for fn in cfg['outputs'].values():
wait_for(fn)
diff --git a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
index 5c068be..e47bb76 100644
--- a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
+++ b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
@@ -12,6 +12,7 @@ import os
import pprint
import random
import sys
+import tempfile
import time
import traceback
@@ -180,7 +181,8 @@ class Workflow(object):
record them as 'node.needs', and update the global pypetask2node table.
"""
needs = set()
- node = PypeNode(pypetask.name, pypetask.wdir, pypetask, needs) #, pypetask.generated_script_fn)
+ use_tmpdir = self.use_tmpdir # TODO(CD): Let pypetask.use_tmpdir override this.
+ node = PypeNode(pypetask.name, pypetask.wdir, pypetask, needs, use_tmpdir) #, pypetask.generated_script_fn)
self.pypetask2node[pypetask] = node
for key, plf in pypetask.inputs.iteritems():
if plf.producer is None:
@@ -281,13 +283,14 @@ class Workflow(object):
raise Exception('We had {} failures. {} tasks remain unsatisfied.'.format(
failures, len(unsatg)))
- def __init__(self, watcher, job_type, job_queue, max_jobs, sge_option=''):
+ def __init__(self, watcher, job_type, job_queue, max_jobs, use_tmpdir, sge_option=''):
self.graph = networkx.DiGraph()
self.tq = PwatcherTaskQueue(watcher=watcher, job_type=job_type, job_queue=job_queue, sge_option=sge_option) # TODO: Inject this.
assert max_jobs > 0, 'max_jobs needs to be set. If you use the "blocking" process-watcher, it is also the number of threads.'
self.max_jobs = max_jobs
self.sentinels = dict() # sentinel_done_fn -> Node
self.pypetask2node = dict()
+ self.use_tmpdir = use_tmpdir
class NodeBase(object):
"""Graph node.
@@ -365,23 +368,29 @@ class PypeNode(NodeBase):
"""
def generate_script(self):
pt = self.pypetask
+ wdir = pt.wdir # For now, assume dir already exists.
+ inputs = {k:v.path for k,v in pt.inputs.items()}
+ outputs = {k:os.path.relpath(v.path, wdir) for k,v in pt.outputs.items()}
+ for v in outputs.values():
+ assert not os.path.isabs(v), '{!r} is not relative'.format(v)
task_desc = {
- 'inputs': {k:v.path for k,v in pt.inputs.items()},
- 'outputs': {k:v.path for k,v in pt.outputs.items()},
+ 'inputs': inputs,
+ 'outputs': outputs,
'parameters': pt.parameters,
'python_function': pt.func_name,
}
task_content = json.dumps(task_desc, sort_keys=True, indent=4, separators=(',', ': '))
- task_json_fn = os.path.join(pt.wdir, 'task.json')
+ task_json_fn = os.path.join(wdir, 'task.json')
open(task_json_fn, 'w').write(task_content)
python = 'python2.7' # sys.executable fails sometimes because of binwrapper: SE-152
- cmd = '{} -m pypeflow.do_task {}'.format(python, task_json_fn)
+ tmpdir_flag = '--tmpdir {}'.format(self.use_tmpdir) if self.use_tmpdir else ''
+ cmd = '{} -m pypeflow.do_task {} {}'.format(python, tmpdir_flag, task_json_fn)
script_content = """#!/bin/bash
hostname
env | sort
pwd
-time {}
-""".format(cmd)
+time {cmd}
+""".format(**locals())
script_fn = os.path.join(pt.wdir, 'task.sh')
open(script_fn, 'w').write(script_content)
return script_fn
@@ -397,9 +406,10 @@ time {}
LOG.exception('name={!r} URL={!r}'.format(pt.name, pt.URL))
raise
return generated_script_fn
- def __init__(self, name, wdir, pypetask, needs): #, script_fn):
+ def __init__(self, name, wdir, pypetask, needs, use_tmpdir): #, script_fn):
super(PypeNode, self).__init__(name, wdir, needs)
self.pypetask = pypetask
+ self.use_tmpdir = use_tmpdir
# This global exists only because we continue to support the old PypeTask style,
# where a PypeLocalFile does not know the PypeTask which produces it.
@@ -464,13 +474,14 @@ def only_path(p):
return p.path
else:
return p
-def PypeTask(inputs, outputs, parameters=None, wdir=None, name=None):
+def PypeTask(inputs, outputs, parameters=None, wdir=None):
"""A slightly messy factory because we want to support both strings and PypeLocalFiles, for now.
This can alter dict values in inputs/outputs if they were not already PypeLocalFiles.
"""
if wdir is None:
+ #wdir = parameters.get('wdir', name) # One of these must be a string!
wdir = find_work_dir([only_path(v) for v in outputs.values()])
- this = _PypeTask(inputs, outputs, parameters, wdir, name)
+ this = _PypeTask(inputs, outputs, wdir, parameters)
#basedir = os.path.basename(wdir)
basedir = this.name
if basedir in PRODUCERS:
@@ -478,43 +489,47 @@ def PypeTask(inputs, outputs, parameters=None, wdir=None, name=None):
basedir, PRODUCERS[basedir], this))
LOG.debug('Added PRODUCERS[{!r}] = {!r}'.format(basedir, this))
PRODUCERS[basedir] = this
- for key, val in outputs.items():
- if not isinstance(val, PypeLocalFile):
- # If relative, then it is relative to our wdir.
- #LOG.warning('Making PLF: {!r} {!r}'.format(val, this))
- if not os.path.isabs(val):
- if not wdir:
- raise Exception('No wdir for {!r}'.format(this))
- val = os.path.join(wdir, val)
- #LOG.warning('Muking PLF: {!r} {!r}'.format(val, this))
- val = PypeLocalFile(val, this)
- outputs[key] = val
- else:
- val.producer = this
- for key, val in inputs.items():
- if not isinstance(val, PypeLocalFile):
- assert os.path.isabs(val), 'Inputs cannot be relative at this point: {!r} in {!r}'.format(val, this)
- inputs[key] = findPypeLocalFile(val)
- common = set(inputs.keys()) & set(outputs.keys())
- assert (not common), 'Keys in both inputs and outputs of PypeTask({}): {!r}'.format(wdir, common)
+ this.canonicalize()
+ this.assert_canonical()
LOG.debug('Built {!r}'.format(this))
return this
class _PypeTask(object):
"""Adaptor from old PypeTask API.
"""
+ def canonicalize(self):
+ for key, val in self.outputs.items():
+ if not isinstance(val, PypeLocalFile):
+ # If relative, then it is relative to our wdir.
+ #LOG.warning('Making PLF: {!r} {!r}'.format(val, self))
+ if not os.path.isabs(val):
+ val = os.path.join(self.wdir, val)
+ #LOG.warning('Muking PLF: {!r} {!r}'.format(val, self))
+ val = PypeLocalFile(val, self)
+ self.outputs[key] = val
+ else:
+ val.producer = self
+ for key, val in self.inputs.items():
+ if not isinstance(val, PypeLocalFile):
+ assert os.path.isabs(val), 'Inputs cannot be relative at self point: {!r} in {!r}'.format(val, self)
+ self.inputs[key] = findPypeLocalFile(val)
+ def assert_canonical(self):
+ # Output values will be updated after PypeTask init, so refer back to self as producer.
+ for k,v in self.inputs.iteritems():
+ assert os.path.isabs(v.path), 'For {!r}, input {!r} is not absolute'.format(self.wdir, v)
+ for k,v in self.outputs.iteritems():
+ assert os.path.isabs(v.path), 'For {!r}, output {!r} is not absolute'.format(self.wdir, v)
+ common = set(self.inputs.keys()) & set(self.outputs.keys())
+ assert (not common), 'Keys in both inputs and outputs of PypeTask({}): {!r}'.format(wdir, common)
def __call__(self, func):
self.func = func
self.func_name = '{}.{}'.format(func.__module__, func.__name__)
return self
def __repr__(self):
return 'PypeTask({!r}, {!r}, {!r}, {!r})'.format(self.name, self.wdir, pprint.pformat(self.outputs), pprint.pformat(self.inputs))
- def __init__(self, inputs, outputs, parameters=None, wdir=None, name=None):
+ def __init__(self, inputs, outputs, wdir, parameters=None):
if parameters is None:
parameters = {}
- if wdir is None:
- wdir = parameters.get('wdir', name) # One of these must be a string!
- if name is None:
- name = os.path.relpath(wdir)
+ name = os.path.relpath(wdir)
URL = 'task://localhost/{}'.format(name)
self.inputs = inputs
self.outputs = outputs
@@ -526,6 +541,7 @@ class _PypeTask(object):
# setattr(self, key, os.path.abspath(bn))
#for key, bn in outputs.iteritems():
# setattr(self, key, os.path.abspath(bn))
+ assert self.wdir, 'No wdir for {!r} {!r}'.format(self.name, self.URL)
LOG.debug('Created {!r}'.format(self))
class DummyPypeTask(_PypeTask):
@@ -545,6 +561,7 @@ def PypeProcWatcherWorkflow(
watcher_directory='mypwatcher',
max_jobs = 24, # must be > 0, but not too high
sge_option = None,
+ use_tmpdir = None,
**attributes):
"""Factory for the workflow.
"""
@@ -557,8 +574,14 @@ def PypeProcWatcherWorkflow(
LOG.warning('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
LOG.info('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
watcher = pwatcher_impl.get_process_watcher(watcher_directory)
- LOG.info('job_type={!r}, job_queue={!r}, sge_option={!r}'.format(job_type, job_queue, sge_option))
- return Workflow(watcher, job_type=job_type, job_queue=job_queue, max_jobs=max_jobs, sge_option=sge_option)
+ if use_tmpdir:
+ try:
+ if not os.path.isabs(use_tmpdir):
+ use_tmpdir = os.path.abspath(use_tmpdir)
+ except (TypeError, AttributeError):
+ use_tmpdir = tempfile.gettempdir()
+ LOG.info('job_type={!r}, job_queue={!r}, sge_option={!r}, use_tmpdir={!r}'.format(job_type, job_queue, sge_option, use_tmpdir))
+ return Workflow(watcher, job_type=job_type, job_queue=job_queue, max_jobs=max_jobs, sge_option=sge_option, use_tmpdir=use_tmpdir)
#th = MyPypeFakeThreadsHandler('mypwatcher', job_type, job_queue)
#mq = MyMessageQueue()
#se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/falcon.git
More information about the debian-med-commit
mailing list