[med-svn] [falcon] 02/04: Imported Upstream version 1.8.5

Afif Elghraoui afif at moszumanska.debian.org
Mon Dec 12 09:19:06 UTC 2016


This is an automated email from the git hooks/post-receive script.

afif pushed a commit to branch master
in repository falcon.

commit d74e7a355f14ad7d9d1810ce7d6423108d82b451
Author: Afif Elghraoui <afif at debian.org>
Date:   Sun Dec 11 14:35:04 2016 -0800

    Imported Upstream version 1.8.5
---
 FALCON-make/makefile                        |  2 +
 FALCON/falcon_kit/bash.py                   |  7 ++-
 FALCON/falcon_kit/mains/get_read_ctg_map.py | 56 +++--------------
 FALCON/falcon_kit/mains/run1.py             |  8 ++-
 FALCON/falcon_kit/pype_tasks.py             | 69 +++++++++++++++-----
 FALCON/falcon_kit/run_support.py            |  8 ++-
 FALCON_unzip/falcon_unzip/phasing.py        |  1 +
 pypeFLOW/pwatcher/network_based.py          |  2 +-
 pypeFLOW/pypeflow/do_task.py                | 81 +++++++++++++++++-------
 pypeFLOW/pypeflow/simple_pwatcher_bridge.py | 97 ++++++++++++++++++-----------
 10 files changed, 202 insertions(+), 129 deletions(-)

diff --git a/FALCON-make/makefile b/FALCON-make/makefile
index 52b42d3..e712e6e 100644
--- a/FALCON-make/makefile
+++ b/FALCON-make/makefile
@@ -37,6 +37,8 @@ install-pypeFLOW:
 	cd ${FALCON_WORKSPACE}/pypeFLOW; pip uninstall -v .; pip install -v ${FALCON_PIP_USER} ${FALCON_PIP_EDIT} .
 install-FALCON: install-pypeFLOW
 	cd ${FALCON_WORKSPACE}/FALCON; pip uninstall -v .; pip install -v ${FALCON_PIP_USER} ${FALCON_PIP_EDIT} .
+install-FALCON_unzip: install-pypeFLOW
+	cd ${FALCON_WORKSPACE}/FALCON_unzip; pip uninstall -v .; pip install -v ${FALCON_PIP_USER} ${FALCON_PIP_EDIT} .
 install-git-sym:
 	# TODO: copy vs. symlink?
 	ln -sf $(abspath ${FALCON_WORKSPACE}/git-sym/git-sym) ${FALCON_PREFIX}/bin/git-sym
diff --git a/FALCON/falcon_kit/bash.py b/FALCON/falcon_kit/bash.py
index dbf3378..1f64a96 100644
--- a/FALCON/falcon_kit/bash.py
+++ b/FALCON/falcon_kit/bash.py
@@ -55,7 +55,7 @@ def write_script(script, script_fn, job_done_fn=None):
         exe = write_sub_script(ofs, script)
 
 def write_script_and_wrapper_top(script, wrapper_fn, job_done):
-    """
+    """NOT USED.
     Write script to a filename based on wrapper_fn, in same directory.
     Write wrapper to call it,
      and to write job_done on success and job_done.exit on any exit.
@@ -92,6 +92,8 @@ def select_rundir(tmpdir, wdir, script):
     return '{}/{}/falcontmp/{}/{}'.format(tmpdir, user, wdir, digest)
 
 def write_script_and_wrapper_for_tmp(tmpdir, script, wrapper_fn, job_done):
+    """NOT USED. This will be done in pypeFLOW.
+    """
     wdir = os.path.dirname(os.path.abspath(wrapper_fn))
     root, ext = os.path.splitext(os.path.basename(wrapper_fn))
     sub_script_bfn = root + '.xsub' + ext
@@ -117,7 +119,8 @@ rm -rf {rdir}
     return write_script_and_wrapper_top(tmp_wrapper_script, wrapper_fn, job_done)
 
 def get_write_script_and_wrapper(config):
-    """Return a function.
+    """NOT USED. This will be done in pypeFLOW.
+    Return a function.
     For now, we actually use only config['use_tmpdir'], a boolean.
     """
     use_tmpdir = config.get('use_tmpdir', None)
diff --git a/FALCON/falcon_kit/mains/get_read_ctg_map.py b/FALCON/falcon_kit/mains/get_read_ctg_map.py
index 91470ff..0ef0825 100644
--- a/FALCON/falcon_kit/mains/get_read_ctg_map.py
+++ b/FALCON/falcon_kit/mains/get_read_ctg_map.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
 from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
         makePypeLocalFile, fn, PypeTask)
 PypeThreadTaskBase = MyFakePypeThreadTaskBase
-from falcon_kit.fc_asm_graph import AsmGraph
+from .. import pype_tasks
 import argparse
 import glob
 import logging
@@ -11,55 +11,13 @@ import subprocess as sp
 import shlex
 import os
 
+LOG = logging.getLogger(__name__)
+
 def make_dirs(d):
     if not os.path.isdir(d):
+        LOG.debug('mkdirs {}'.format(d))
         os.makedirs(d)
 
-def dump_rawread_ids(self):
-    rawread_db = fn(self.rawread_db)
-    rawread_id_file = fn(self.rawread_id_file)
-    os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file))
-
-def dump_pread_ids(self):
-    pread_db = fn(self.pread_db)
-    pread_id_file = fn(self.pread_id_file)
-    os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file))
-
-def generate_read_to_ctg_map(self):
-    rawread_id_file = fn(self.rawread_id_file)
-    pread_id_file = fn(self.pread_id_file)
-    read_to_contig_map = fn(self.read_to_contig_map)
-
-    pread_did_to_rid = open(pread_id_file).read().split('\n')
-    rid_to_oid = open(rawread_id_file).read().split('\n')
-
-    asm_G = AsmGraph(fn(self.sg_edges_list),
-                        fn(self.utg_data),
-                        fn(self.ctg_paths))
-
-    pread_to_contigs = {}
-
-    with open(read_to_contig_map, 'w') as f:
-        for ctg in asm_G.ctg_data:
-            if ctg[-1] == 'R':
-                continue
-            ctg_g = asm_G.get_sg_for_ctg(ctg)
-            for n in ctg_g.nodes():
-                pid = int(n.split(':')[0])
-
-                rid = pread_did_to_rid[pid].split('/')[1]
-                rid = int(int(rid)/10)
-                oid = rid_to_oid[rid]
-                k = (pid, rid, oid)
-                pread_to_contigs.setdefault(k, set())
-                pread_to_contigs[k].add(ctg)
-
-
-        for k in pread_to_contigs:
-            pid, rid, oid = k
-            for ctg in list(pread_to_contigs[ k ]):
-                print >>f, '%09d %09d %s %s' % (pid, rid, oid, ctg)
-
 def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
     read_map_dir = os.path.abspath(os.path.join(asm_dir, 'read_maps'))
     make_dirs(read_map_dir)
@@ -82,7 +40,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
               inputs = {'rawread_db': rawread_db},
               outputs = {'rawread_id_file': rawread_id_file},
     )
-    wf.addTask(task(dump_rawread_ids))
+    wf.addTask(task(pype_tasks.task_dump_rawread_ids))
 
     pread_db = makePypeLocalFile(os.path.join(pread_dir, 'preads.db'))
     pread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids'))
@@ -91,7 +49,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
                inputs = {'pread_db': pread_db},
                outputs = {'pread_id_file': pread_id_file},
     )
-    wf.addTask(task(dump_pread_ids))
+    wf.addTask(task(pype_tasks.task_dump_pread_ids))
 
     wf.refreshTargets() # block
 
@@ -111,7 +69,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
               inputs = inputs,
               outputs = {'read_to_contig_map': read_to_contig_map},
     )
-    wf.addTask(task(generate_read_to_ctg_map))
+    wf.addTask(task(pype_tasks.task_generate_read_to_ctg_map))
 
     wf.refreshTargets() # block
 
diff --git a/FALCON/falcon_kit/mains/run1.py b/FALCON/falcon_kit/mains/run1.py
index d7bb13d..2e5f99d 100644
--- a/FALCON/falcon_kit/mains/run1.py
+++ b/FALCON/falcon_kit/mains/run1.py
@@ -125,7 +125,9 @@ def main1(prog_name, input_config_fn, logger_config_fn=None):
             job_queue=config['job_queue'],
             sge_option=config.get('sge_option', ''),
             watcher_type=config['pwatcher_type'],
-            watcher_directory=config['pwatcher_directory'])
+            watcher_directory=config['pwatcher_directory'],
+            use_tmpdir=config.get('use_tmpdir'),
+    )
     run(wf, config,
             os.path.abspath(input_config_fn),
             input_fofn_plf=input_fofn_plf,
@@ -261,6 +263,7 @@ def run(wf, config,
         make_task = PypeTask(
                 inputs = {
                     'gathered': las_fopfn_plf,
+                    'db': raw_reads_db_plf,
                 },
                 outputs = {
                     'scattered': scattered_plf,
@@ -452,3 +455,6 @@ def main(argv=sys.argv):
         help='(Optional)JSON config for standard Python logging module')
     args = parser.parse_args(argv[1:])
     main1(argv[0], args.config, args.logger)
+
+if __name__ == '__main__':
+    main()
diff --git a/FALCON/falcon_kit/pype_tasks.py b/FALCON/falcon_kit/pype_tasks.py
index 25857e0..169bf5e 100644
--- a/FALCON/falcon_kit/pype_tasks.py
+++ b/FALCON/falcon_kit/pype_tasks.py
@@ -37,14 +37,12 @@ def task_make_fofn_abs_raw(self):
     script_fn = 'noop.sh'
     open(script_fn, 'w').write('echo NOOP raw')
     self.generated_script_fn = script_fn
-    mkdir(os.path.dirname(fn(self.o_fofn)))
     support.make_fofn_abs(fn(self.i_fofn), fn(self.o_fofn))
 
 def task_make_fofn_abs_preads(self):
     script_fn = 'noop.sh'
     open(script_fn, 'w').write('echo NOOP preads')
     self.generated_script_fn = script_fn
-    mkdir(os.path.dirname(fn(self.o_fofn)))
     support.make_fofn_abs(fn(self.i_fofn), fn(self.o_fofn))
 
 def task_build_rdb(self):
@@ -89,7 +87,6 @@ def task_build_pdb(self):  #essential the same as build_rdb() but the subtle dif
 
 def task_run_db2falcon(self):
     wd = self.parameters['wd']
-    mkdir(wd)
     #self.las_fofn # TODO: Are there any implicit dependencies, or can we drop this?
     job_done = fn(self.db2falcon_done)
     preads4falcon_fn = fn(self.preads4falcon)
@@ -141,7 +138,6 @@ def task_report_pre_assembly(self):
     # i_length_cutoff_fn was created long ago, so no filesystem issues.
     length_cutoff = support.get_length_cutoff(length_cutoff, i_length_cutoff_fn)
     cwd = self.parameters['cwd']
-    mkdir(cwd)
     script_fn = os.path.join(cwd , 'run_report_pre_assembly.sh')
     job_done = os.path.join(cwd, 'report_pa_done')
     kwds = {
@@ -162,7 +158,6 @@ def task_run_daligner(self):
     daligner_script = self.parameters['daligner_script']
     job_uid = self.parameters['job_uid']
     cwd = self.parameters['cwd']
-    #mkdir(cwd)
     db_prefix = self.parameters['db_prefix']
     config = self.parameters['config']
     script_dir = os.path.join(cwd)
@@ -196,7 +191,6 @@ def task_run_las_merge(self):
     script = self.parameters['merge_script']
     job_id = self.parameters['job_id'] # aka 'block'
     cwd = self.parameters['cwd']
-    mkdir(cwd)
 
     gathered_dict = read_gathered_las(gathered_las_fn)
     las_paths = gathered_dict[job_id]
@@ -221,6 +215,7 @@ def task_run_las_merge(self):
 
 def task_run_consensus(self):
     las_fn = fn(self.las)
+    db_fn = fn(self.db)
     out_file_fn = fn(self.out_file)
     out_done = 'out.done' #fn(self.out_done)
     job_id = self.parameters['job_id']
@@ -228,9 +223,8 @@ def task_run_consensus(self):
     config = self.parameters['config']
     prefix = self.parameters['prefix']
     p_id = int(job_id)
-    script_dir = os.path.join( cwd )
-    script_fn =  os.path.join( script_dir , 'c_%05d.sh' % (p_id))
-    db_fn = os.path.abspath('{cwd}/../../{prefix}'.format(**locals())) # ASSUMING 2-levels deep. TODO: DB should be an input.
+    script_dir = os.path.join(cwd)
+    script_fn = os.path.join(script_dir, 'c_%05d.sh' % (p_id))
     #merge_job_dir = os.path.dirname(merged_las_fn)
     #las_fn = os.path.abspath('{merge_job_dir}/{prefix}.{job_id}.las'.format(**locals()))
     args = {
@@ -317,9 +311,10 @@ def task_merge_scatter(self):
     content = json.dumps(tasks, sort_keys=True, indent=4, separators=(',', ': '))
     open(scatter_fn, 'w').write(content)
 def task_consensus_scatter(self):
-    scatter_fn = self.scattered
+    scattered_fn = self.scattered
     gathered_fn = self.gathered
-    wd = os.path.dirname(scatter_fn)
+    db_fn = self.db
+    wd = os.path.dirname(scattered_fn)
     par = self.parameters
     db_prefix = par['db_prefix']
     config = par['config']
@@ -344,6 +339,7 @@ def task_consensus_scatter(self):
                        'sge_option': config['sge_option_cns'],
         }
         inputs =  {'las': las_fn,
+                   'db': db_fn,
         }
         outputs = {'out_file': out_file_fn,
                    #'out_done': out_done_fn,
@@ -359,7 +355,7 @@ def task_consensus_scatter(self):
         }
         tasks.append(task_desc)
     content = json.dumps(tasks, sort_keys=True, indent=4, separators=(',', ': '))
-    open(scatter_fn, 'w').write(content)
+    open(scattered_fn, 'w').write(content)
 
 def task_daligner_gather(self):
     """Find all .las leaves so far.
@@ -369,8 +365,6 @@ def task_daligner_gather(self):
     nblock = self.parameters['nblock']
     LOG.debug('nblock=%d, out_dir:\n%s'%(nblock, out_dict))
     job_rundirs = [os.path.dirname(fn(dal_done)) for dal_done in out_dict.values()]
-    wdir = os.path.dirname(gathered_fn)
-    #mkdir(wdir)
     with open(gathered_fn, 'w') as ofs:
         for block, las_path in support.daligner_gather_las(job_rundirs):
             ofs.write('{} {}\n'.format(block, las_path))
@@ -396,3 +390,50 @@ def task_merge_gather(self):
     #pread_dir = os.path.dirname(wdir) # by convention, for now
     # Generate las.fofn in run-dir. # No longer needed!
     #system('find {}/m_*/ -name "preads.*.las" >| {}'.format(pread_dir, las_fofn_fn))
+
+
+def task_dump_rawread_ids(self):
+    rawread_db = fn(self.rawread_db)
+    rawread_id_file = fn(self.rawread_id_file)
+    system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file))
+
+def task_dump_pread_ids(self):
+    pread_db = fn(self.pread_db)
+    pread_id_file = fn(self.pread_id_file)
+    system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file))
+
+def task_generate_read_to_ctg_map(self):
+    from .fc_asm_graph import AsmGraph
+    rawread_id_file = fn(self.rawread_id_file)
+    pread_id_file = fn(self.pread_id_file)
+    read_to_contig_map = fn(self.read_to_contig_map)
+
+    pread_did_to_rid = open(pread_id_file).read().split('\n')
+    rid_to_oid = open(rawread_id_file).read().split('\n')
+
+    asm_G = AsmGraph(fn(self.sg_edges_list),
+                        fn(self.utg_data),
+                        fn(self.ctg_paths))
+
+    pread_to_contigs = {}
+
+    with open(read_to_contig_map, 'w') as f:
+        for ctg in asm_G.ctg_data:
+            if ctg[-1] == 'R':
+                continue
+            ctg_g = asm_G.get_sg_for_ctg(ctg)
+            for n in ctg_g.nodes():
+                pid = int(n.split(':')[0])
+
+                rid = pread_did_to_rid[pid].split('/')[1]
+                rid = int(int(rid)/10)
+                oid = rid_to_oid[rid]
+                k = (pid, rid, oid)
+                pread_to_contigs.setdefault(k, set())
+                pread_to_contigs[k].add(ctg)
+
+
+        for k in pread_to_contigs:
+            pid, rid, oid = k
+            for ctg in list(pread_to_contigs[ k ]):
+                print >>f, '%09d %09d %s %s' % (pid, rid, oid, ctg)
diff --git a/FALCON/falcon_kit/run_support.py b/FALCON/falcon_kit/run_support.py
index a3c56cb..a41f124 100644
--- a/FALCON/falcon_kit/run_support.py
+++ b/FALCON/falcon_kit/run_support.py
@@ -39,7 +39,9 @@ def update_env_in_script(fn, names):
         ofs.write(content)
 
 def use_tmpdir_for_files(basenames, src_dir, link_dir):
-    """Generate script to copy db files to tmpdir (for speed).
+    """NOT USED. Kept only for reference. This will be done in pypeFLOW.
+
+    Generate script to copy db files to tmpdir (for speed).
     - Choose tmp_dir, based on src_dir name.
     - rsync basenames into tmp_dir  # after 'flock', per file
     - symlink from link_dir into tmp_dir.
@@ -291,7 +293,7 @@ def get_dict_from_old_falcon_cfg(config):
         tmpdir = config.get(section, 'use_tmpdir')
         if '/' in tmpdir:
             tempfile.tempdir = tmpdir
-            use_tmpdir = True
+            use_tmpdir = tmpdir
         else:
             use_tmpdir = config.getboolean(section, 'use_tmpdir')
     else:
@@ -507,7 +509,7 @@ def run_falcon_asm(config, las_fofn_fn, preads4falcon_fasta_fn, db_file_fn, job_
 
 def run_report_pre_assembly(i_raw_reads_db_fn, i_preads_fofn_fn, genome_length, length_cutoff, o_json_fn, job_done, script_fn):
     script = bash.script_run_report_pre_assembly(i_raw_reads_db_fn, i_preads_fofn_fn, genome_length, length_cutoff, o_json_fn)
-    bash.write_script_and_wrapper_top(script, script_fn, job_done)
+    bash.write_script(script, script_fn, job_done)
 
 def run_daligner(daligner_script, db_prefix, config, job_done, script_fn):
     bash.write_script(daligner_script, script_fn, job_done)
diff --git a/FALCON_unzip/falcon_unzip/phasing.py b/FALCON_unzip/falcon_unzip/phasing.py
index 5592869..6bd3151 100644
--- a/FALCON_unzip/falcon_unzip/phasing.py
+++ b/FALCON_unzip/falcon_unzip/phasing.py
@@ -519,6 +519,7 @@ def phasing(args):
     parameters["base_dir"] = base_dir
     generate_association_table_task = PypeTask( inputs = { "vmap_file": vmap_file },
                                       outputs = { "atable_file": atable_file },
+                                      parameters = parameters,
     ) (generate_association_table)
 
     wf.addTasks([generate_association_table_task])
diff --git a/pypeFLOW/pwatcher/network_based.py b/pypeFLOW/pwatcher/network_based.py
index 126c773..803431e 100755
--- a/pypeFLOW/pwatcher/network_based.py
+++ b/pypeFLOW/pwatcher/network_based.py
@@ -223,7 +223,7 @@ class ReuseAddrServer(SocketServer.TCPServer):
                         self.server_job_list[file][0] = os.path.getmtime(fn)
                         self.server_job_list[file][3] = rc
                     else:
-                        self.server_job_list[file] = [os.getmtime(fn), None, None, rc]
+                        self.server_job_list[file] = [os.path.getmtime(fn), None, None, rc]
     def __init__(self, server_address, RequestHandlerClass, server_directories):
         self.allow_reuse_address = True
         self.server_log_dir, self.server_pid_dir, self.server_exitrc_dir = server_directories
diff --git a/pypeFLOW/pypeflow/do_task.py b/pypeFLOW/pypeflow/do_task.py
index b3b36a8..3e9a868 100644
--- a/pypeFLOW/pypeflow/do_task.py
+++ b/pypeFLOW/pypeflow/do_task.py
@@ -1,5 +1,5 @@
 #!/usr/bin/env python2.7
-from . import do_support
+from . import do_support, util
 import argparse
 import contextlib
 import importlib
@@ -71,7 +71,15 @@ def cd(newdir):
 
 def mkdirs(path):
     if not os.path.isdir(path):
-        os.makedirs(path)
+        cmd = 'mkdir -p {}'.format(path)
+        util.system(cmd)
+def rmdirs(path):
+    if os.path.isdir(path):
+        if len(path) < 20 and 'home' in path:
+            LOG.error('Refusing to rm {!r} since it might be your homedir.'.format(path))
+            return
+        cmd = 'rm -rf {}'.format(path)
+        util.system(cmd)
 
 def wait_for(fn):
     global TIMEOUT
@@ -99,6 +107,22 @@ class OldTaskRunner(object):
         self.inputs = inputs
         self.outputs = outputs
 
+def run_python_func(func, inputs, outputs, parameters):
+    if False:
+        kwds = dict()
+        kwds.update(inputs)
+        kwds.update(outputs)
+        kwds.update(parameters)
+        func(**kwds)
+    else:
+        # old way, for now
+        cwd = os.getcwd()
+        parameters['cwd'] = cwd
+        self = OldTaskRunner(inputs, outputs, parameters)
+        func(self=self)
+        script_fn = getattr(self, 'generated_script_fn', None)
+        if script_fn is not None:
+            do_support.run_bash(script_fn)
 def run(json_fn, timeout, tmpdir):
     if isinstance(timeout, int):
         global TIMEOUT
@@ -109,28 +133,41 @@ def run(json_fn, timeout, tmpdir):
     LOG.debug(pprint.pformat(cfg))
     for fn in cfg['inputs'].values():
         wait_for(fn)
+    inputs = cfg['inputs']
+    outputs = cfg['outputs']
+    parameters = cfg['parameters']
     python_function_name = cfg['python_function']
     func = get_func(python_function_name)
-    try:
-        if False:
-            kwds = dict()
-            kwds.update(cfg['inputs'])
-            kwds.update(cfg['outputs'])
-            kwds.update(cfg['parameters'])
-            func(**kwds)
-        else:
-            # old way, for now
-            cwd = os.getcwd()
-            cfg['parameters']['cwd'] = cwd
-            self = OldTaskRunner(cfg['inputs'], cfg['outputs'], cfg['parameters'])
-            func(self=self)
-            script_fn = getattr(self, 'generated_script_fn', None)
-            if script_fn is not None:
-                do_support.run_bash(script_fn)
-    except TypeError:
-        # Report the actual function spec.
-        LOG.error('For function "{}", {}'.format(python_function_name, inspect.getargspec(func)))
-        raise
+    myinputs = dict(inputs)
+    myoutputs = dict(outputs)
+    finaloutdir = os.getcwd()
+    if tmpdir:
+        import getpass
+        user = getpass.getuser()
+        pid = os.getpid()
+        myrundir = '{tmpdir}/{user}/pypetmp/{finaloutdir}'.format(**locals())
+        rmdirs(myrundir)
+        mkdirs(myrundir)
+        # TODO(CD): Copy inputs w/ flock.
+    else:
+        myrundir = finaloutdir
+    with util.cd(myrundir):
+        try:
+            run_python_func(func, myinputs, myoutputs, parameters)
+        except TypeError:
+            # Report the actual function spec.
+            LOG.error('For function "{}", {}'.format(python_function_name, inspect.getargspec(func)))
+            raise
+    if tmpdir:
+        """
+        for k,v in outputs.iteritems():
+            cmd = 'mv -f {} {}'.format(
+                os.path.join(myrundir, v),
+                os.path.join(finaloutdir, v))
+            util.system(cmd)
+        """
+        cmd = 'rsync -av {}/ {}; rm -rf {}'.format(myrundir, finaloutdir, myrundir)
+        util.system(cmd)
     for fn in cfg['outputs'].values():
         wait_for(fn)
 
diff --git a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
index 5c068be..e47bb76 100644
--- a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
+++ b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
@@ -12,6 +12,7 @@ import os
 import pprint
 import random
 import sys
+import tempfile
 import time
 import traceback
 
@@ -180,7 +181,8 @@ class Workflow(object):
         record them as 'node.needs', and update the global pypetask2node table.
         """
         needs = set()
-        node = PypeNode(pypetask.name, pypetask.wdir, pypetask, needs) #, pypetask.generated_script_fn)
+        use_tmpdir = self.use_tmpdir # TODO(CD): Let pypetask.use_tmpdir override this.
+        node = PypeNode(pypetask.name, pypetask.wdir, pypetask, needs, use_tmpdir) #, pypetask.generated_script_fn)
         self.pypetask2node[pypetask] = node
         for key, plf in pypetask.inputs.iteritems():
             if plf.producer is None:
@@ -281,13 +283,14 @@ class Workflow(object):
             raise Exception('We had {} failures. {} tasks remain unsatisfied.'.format(
                 failures, len(unsatg)))
 
-    def __init__(self, watcher, job_type, job_queue, max_jobs, sge_option=''):
+    def __init__(self, watcher, job_type, job_queue, max_jobs, use_tmpdir, sge_option=''):
         self.graph = networkx.DiGraph()
         self.tq = PwatcherTaskQueue(watcher=watcher, job_type=job_type, job_queue=job_queue, sge_option=sge_option) # TODO: Inject this.
         assert max_jobs > 0, 'max_jobs needs to be set. If you use the "blocking" process-watcher, it is also the number of threads.'
         self.max_jobs = max_jobs
         self.sentinels = dict() # sentinel_done_fn -> Node
         self.pypetask2node = dict()
+        self.use_tmpdir = use_tmpdir
 
 class NodeBase(object):
     """Graph node.
@@ -365,23 +368,29 @@ class PypeNode(NodeBase):
     """
     def generate_script(self):
         pt = self.pypetask
+        wdir = pt.wdir # For now, assume dir already exists.
+        inputs = {k:v.path for k,v in pt.inputs.items()}
+        outputs = {k:os.path.relpath(v.path, wdir) for k,v in pt.outputs.items()}
+        for v in outputs.values():
+            assert not os.path.isabs(v), '{!r} is not relative'.format(v)
         task_desc = {
-                'inputs': {k:v.path for k,v in pt.inputs.items()},
-                'outputs': {k:v.path for k,v in pt.outputs.items()},
+                'inputs': inputs,
+                'outputs': outputs,
                 'parameters': pt.parameters,
                 'python_function': pt.func_name,
         }
         task_content = json.dumps(task_desc, sort_keys=True, indent=4, separators=(',', ': '))
-        task_json_fn = os.path.join(pt.wdir, 'task.json')
+        task_json_fn = os.path.join(wdir, 'task.json')
         open(task_json_fn, 'w').write(task_content)
         python = 'python2.7' # sys.executable fails sometimes because of binwrapper: SE-152
-        cmd = '{} -m pypeflow.do_task {}'.format(python, task_json_fn)
+        tmpdir_flag = '--tmpdir {}'.format(self.use_tmpdir) if self.use_tmpdir else ''
+        cmd = '{} -m pypeflow.do_task {} {}'.format(python, tmpdir_flag, task_json_fn)
         script_content = """#!/bin/bash
 hostname
 env | sort
 pwd
-time {}
-""".format(cmd)
+time {cmd}
+""".format(**locals())
         script_fn = os.path.join(pt.wdir, 'task.sh')
         open(script_fn, 'w').write(script_content)
         return script_fn
@@ -397,9 +406,10 @@ time {}
             LOG.exception('name={!r} URL={!r}'.format(pt.name, pt.URL))
             raise
         return generated_script_fn
-    def __init__(self, name, wdir, pypetask, needs): #, script_fn):
+    def __init__(self, name, wdir, pypetask, needs, use_tmpdir): #, script_fn):
         super(PypeNode, self).__init__(name, wdir, needs)
         self.pypetask = pypetask
+        self.use_tmpdir = use_tmpdir
 
 # This global exists only because we continue to support the old PypeTask style,
 # where a PypeLocalFile does not know the PypeTask which produces it.
@@ -464,13 +474,14 @@ def only_path(p):
         return p.path
     else:
         return p
-def PypeTask(inputs, outputs, parameters=None, wdir=None, name=None):
+def PypeTask(inputs, outputs, parameters=None, wdir=None):
     """A slightly messy factory because we want to support both strings and PypeLocalFiles, for now.
     This can alter dict values in inputs/outputs if they were not already PypeLocalFiles.
     """
     if wdir is None:
+        #wdir = parameters.get('wdir', name) # One of these must be a string!
         wdir = find_work_dir([only_path(v) for v in outputs.values()])
-    this = _PypeTask(inputs, outputs, parameters, wdir, name)
+    this = _PypeTask(inputs, outputs, wdir, parameters)
     #basedir = os.path.basename(wdir)
     basedir = this.name
     if basedir in PRODUCERS:
@@ -478,43 +489,47 @@ def PypeTask(inputs, outputs, parameters=None, wdir=None, name=None):
             basedir, PRODUCERS[basedir], this))
     LOG.debug('Added PRODUCERS[{!r}] = {!r}'.format(basedir, this))
     PRODUCERS[basedir] = this
-    for key, val in outputs.items():
-        if not isinstance(val, PypeLocalFile):
-            # If relative, then it is relative to our wdir.
-            #LOG.warning('Making PLF: {!r} {!r}'.format(val, this))
-            if not os.path.isabs(val):
-                if not wdir:
-                    raise Exception('No wdir for {!r}'.format(this))
-                val = os.path.join(wdir, val)
-            #LOG.warning('Muking PLF: {!r} {!r}'.format(val, this))
-            val = PypeLocalFile(val, this)
-            outputs[key] = val
-        else:
-            val.producer = this
-    for key, val in inputs.items():
-        if not isinstance(val, PypeLocalFile):
-            assert os.path.isabs(val), 'Inputs cannot be relative at this point: {!r} in {!r}'.format(val, this)
-            inputs[key] = findPypeLocalFile(val)
-    common = set(inputs.keys()) & set(outputs.keys())
-    assert (not common), 'Keys in both inputs and outputs of PypeTask({}): {!r}'.format(wdir, common)
+    this.canonicalize()
+    this.assert_canonical()
     LOG.debug('Built {!r}'.format(this))
     return this
 class _PypeTask(object):
     """Adaptor from old PypeTask API.
     """
+    def canonicalize(self):
+        for key, val in self.outputs.items():
+            if not isinstance(val, PypeLocalFile):
+                # If relative, then it is relative to our wdir.
+                #LOG.warning('Making PLF: {!r} {!r}'.format(val, self))
+                if not os.path.isabs(val):
+                    val = os.path.join(self.wdir, val)
+                #LOG.warning('Muking PLF: {!r} {!r}'.format(val, self))
+                val = PypeLocalFile(val, self)
+                self.outputs[key] = val
+            else:
+                val.producer = self
+        for key, val in self.inputs.items():
+            if not isinstance(val, PypeLocalFile):
+                assert os.path.isabs(val), 'Inputs cannot be relative at self point: {!r} in {!r}'.format(val, self)
+                self.inputs[key] = findPypeLocalFile(val)
+    def assert_canonical(self):
+        # Output values will be updated after PypeTask init, so refer back to self as producer.
+        for k,v in self.inputs.iteritems():
+            assert os.path.isabs(v.path), 'For {!r}, input {!r} is not absolute'.format(self.wdir, v)
+        for k,v in self.outputs.iteritems():
+            assert os.path.isabs(v.path), 'For {!r}, output {!r} is not absolute'.format(self.wdir, v)
+        common = set(self.inputs.keys()) & set(self.outputs.keys())
+        assert (not common), 'Keys in both inputs and outputs of PypeTask({}): {!r}'.format(wdir, common)
     def __call__(self, func):
         self.func = func
         self.func_name = '{}.{}'.format(func.__module__, func.__name__)
         return self
     def __repr__(self):
         return 'PypeTask({!r}, {!r}, {!r}, {!r})'.format(self.name, self.wdir, pprint.pformat(self.outputs), pprint.pformat(self.inputs))
-    def __init__(self, inputs, outputs, parameters=None, wdir=None, name=None):
+    def __init__(self, inputs, outputs, wdir, parameters=None):
         if parameters is None:
             parameters = {}
-        if wdir is None:
-            wdir = parameters.get('wdir', name) # One of these must be a string!
-        if name is None:
-            name = os.path.relpath(wdir)
+        name = os.path.relpath(wdir)
         URL = 'task://localhost/{}'.format(name)
         self.inputs = inputs
         self.outputs = outputs
@@ -526,6 +541,7 @@ class _PypeTask(object):
         #    setattr(self, key, os.path.abspath(bn))
         #for key, bn in outputs.iteritems():
         #    setattr(self, key, os.path.abspath(bn))
+        assert self.wdir, 'No wdir for {!r} {!r}'.format(self.name, self.URL)
         LOG.debug('Created {!r}'.format(self))
 
 class DummyPypeTask(_PypeTask):
@@ -545,6 +561,7 @@ def PypeProcWatcherWorkflow(
         watcher_directory='mypwatcher',
         max_jobs = 24, # must be > 0, but not too high
         sge_option = None,
+        use_tmpdir = None,
         **attributes):
     """Factory for the workflow.
     """
@@ -557,8 +574,14 @@ def PypeProcWatcherWorkflow(
     LOG.warning('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
     LOG.info('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
     watcher = pwatcher_impl.get_process_watcher(watcher_directory)
-    LOG.info('job_type={!r}, job_queue={!r}, sge_option={!r}'.format(job_type, job_queue, sge_option))
-    return Workflow(watcher, job_type=job_type, job_queue=job_queue, max_jobs=max_jobs, sge_option=sge_option)
+    if use_tmpdir:
+        try:
+            if not os.path.isabs(use_tmpdir):
+                use_tmpdir = os.path.abspath(use_tmpdir)
+        except (TypeError, AttributeError):
+            use_tmpdir = tempfile.gettempdir()
+    LOG.info('job_type={!r}, job_queue={!r}, sge_option={!r}, use_tmpdir={!r}'.format(job_type, job_queue, sge_option, use_tmpdir))
+    return Workflow(watcher, job_type=job_type, job_queue=job_queue, max_jobs=max_jobs, sge_option=sge_option, use_tmpdir=use_tmpdir)
     #th = MyPypeFakeThreadsHandler('mypwatcher', job_type, job_queue)
     #mq = MyMessageQueue()
     #se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/falcon.git



More information about the debian-med-commit mailing list