[med-svn] [falcon] 01/12: Imported Upstream version 1.8.3
Afif Elghraoui
afif at moszumanska.debian.org
Sun Nov 27 05:38:30 UTC 2016
This is an automated email from the git hooks/post-receive script.
afif pushed a commit to branch master
in repository falcon.
commit 58a5f3293b83f7d435c8cd3a39912134455746a6
Author: Afif Elghraoui <afif at debian.org>
Date: Sat Nov 26 20:06:30 2016 -0800
Imported Upstream version 1.8.3
---
FALCON-examples/git-sym.makefile | 2 +
FALCON-examples/makefile | 6 +-
FALCON-examples/run/greg200k-sv2/README.md | 1 +
FALCON-examples/run/greg200k-sv2/fc_run.cfg | 44 ++
FALCON-examples/run/greg200k-sv2/fc_unzip.cfg | 21 +
FALCON-examples/run/greg200k-sv2/input.fofn | 2 +
FALCON-examples/run/greg200k-sv2/makefile | 10 +
FALCON-examples/run/synth0/makefile | 8 -
FALCON-make/makefile | 2 +-
FALCON/falcon_kit/bash.py | 4 +-
FALCON/falcon_kit/mains/fetch_reads.py | 72 +--
FALCON/falcon_kit/mains/get_read_ctg_map.py | 180 +++---
FALCON/falcon_kit/mains/pr_ctg_track.py | 6 +-
FALCON/falcon_kit/mains/rr_ctg_track.py | 10 +-
FALCON/falcon_kit/mains/run1.py | 10 +-
FALCON/test/test_functional.py | 1 -
pypeFLOW/pypeflow/common.py | 149 -----
pypeFLOW/pypeflow/controller.py | 875 -------------------------
pypeFLOW/pypeflow/data.py | 315 ---------
pypeFLOW/pypeflow/pwatcher_bridge.py | 391 -----------
pypeFLOW/pypeflow/pwatcher_workflow.py | 9 +
pypeFLOW/pypeflow/simple_pwatcher_bridge.py | 14 +-
pypeFLOW/pypeflow/task.py | 892 --------------------------
pypeFLOW/setup.py | 9 +-
24 files changed, 247 insertions(+), 2786 deletions(-)
diff --git a/FALCON-examples/git-sym.makefile b/FALCON-examples/git-sym.makefile
index aecdd97..ed9b703 100644
--- a/FALCON-examples/git-sym.makefile
+++ b/FALCON-examples/git-sym.makefile
@@ -27,3 +27,5 @@ synth5k.2016-11-02:
curl -L https://downloads.pacbcloud.com/public/data/git-sym/synth5k.2016-11-02.tgz | tar xvfz -
ecoli.m140913_050931_42139_c100713652400000001823152404301535_s1_p0:
curl -L https://downloads.pacbcloud.com/public/data/git-sym/ecoli.m140913_050931_42139_c100713652400000001823152404301535_s1_p0.subreads.tar | tar xvf -
+greg200k-sv2:
+ curl -L https://downloads.pacbcloud.com/public/data/git-sym/greg200k-sv2.tar | tar xvf -
diff --git a/FALCON-examples/makefile b/FALCON-examples/makefile
index 57a7459..d600c35 100644
--- a/FALCON-examples/makefile
+++ b/FALCON-examples/makefile
@@ -8,13 +8,9 @@ setup-%:
git-sym check run/$*
# Our only integration test, for now.
test:
- python -c 'import pypeflow.common; print pypeflow.common'
+ python -c 'import pypeflow.pwatcher_workflow; print pypeflow.pwatcher_workflow'
python -c 'import falcon_kit; print falcon_kit.falcon'
${MAKE} run-synth0
${MAKE} -C run/synth0 test
- #${MAKE} -C run/synth0 clean
- #${MAKE} -C run/synth0 go0 # still test the old pypeflow too, for now
- ${MAKE} -C run/synth0 clean
- ${MAKE} -C run/synth0 go1 # should be the same as go
.PHONY: default
diff --git a/FALCON-examples/run/greg200k-sv2/README.md b/FALCON-examples/run/greg200k-sv2/README.md
new file mode 100644
index 0000000..85c3a80
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/README.md
@@ -0,0 +1 @@
+This needs to be updated again. Still a WIP.
diff --git a/FALCON-examples/run/greg200k-sv2/fc_run.cfg b/FALCON-examples/run/greg200k-sv2/fc_run.cfg
new file mode 100755
index 0000000..a8fb8c4
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/fc_run.cfg
@@ -0,0 +1,44 @@
+[General]
+# list of files of the initial bas.h5 files
+input_fofn = input.fofn
+#input_fofn = preads.fofn
+
+job_type = local
+
+input_type = raw
+#input_type = preads
+
+#openending = True
+
+# The length cutoff used for seed reads used for initial mapping
+length_cutoff = 1000
+genome_size = 200000
+#seed_coverage = 60
+
+# The length cutoff used for seed reads usef for pre-assembly
+length_cutoff_pr = 1
+
+
+sge_option_da = -pe smp 4 -q bigmem
+sge_option_la = -pe smp 20 -q bigmem
+sge_option_pda = -pe smp 6 -q bigmem
+sge_option_pla = -pe smp 16 -q bigmem
+sge_option_fc = -pe smp 24 -q bigmem
+sge_option_cns = -pe smp 8 -q bigmem
+
+#192 ?
+pa_concurrent_jobs = 12
+cns_concurrent_jobs = 12
+ovlp_concurrent_jobs = 12
+
+pa_HPCdaligner_option = -v -dal128 -t16 -e0.8 -M24 -l3200 -k18 -h480 -w8 -s100
+ovlp_HPCdaligner_option = -v -dal128 -M24 -k24 -h1024 -e.9 -l2500 -s100
+
+pa_DBsplit_option = -a -x500 -s100
+ovlp_DBsplit_option = -s100
+
+falcon_sense_option = --output_multi --min_cov_aln 4 --min_idt 0.70 --min_cov 4 --max_n_read 200 --n_core 8
+falcon_sense_skip_contained = False
+
+overlap_filtering_setting = --max_diff 120 --max_cov 120 --min_cov 2 --n_core 12
+#dazcon = 1
diff --git a/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg b/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg
new file mode 100644
index 0000000..c9de3af
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg
@@ -0,0 +1,21 @@
+[General]
+job_type = SGE
+job_type = local
+
+[Unzip]
+
+input_fofn= input.fofn
+input_bam_fofn= input_bam.fofn
+#smrt_bin= /mnt/secondary/builds/full/3.0.0/prod/smrtanalysis_3.0.0.153854/smrtcmds/bin/
+#smrt_bin=/mnt/secondary/builds/full/3.0.1/prod/current-build_smrtanalysis/smrtcmds/bin/
+smrt_bin=/mnt/secondary/builds/full/3.0.0/prod/current-build_smrtanalysis/smrtcmds/bin/
+sge_phasing= -pe smp 12 -q bigmem
+sge_quiver= -pe smp 12 -q sequel-farm
+sge_track_reads= -pe smp 12 -q default
+sge_blasr_aln= -pe smp 24 -q bigmem
+sge_hasm= -pe smp 48 -q bigmem
+unzip_concurrent_jobs = 64
+quiver_concurrent_jobs = 64
+
+unzip_concurrent_jobs = 12
+quiver_concurrent_jobs = 12
diff --git a/FALCON-examples/run/greg200k-sv2/input.fofn b/FALCON-examples/run/greg200k-sv2/input.fofn
new file mode 100644
index 0000000..99cce2d
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/input.fofn
@@ -0,0 +1,2 @@
+data/greg200k-sv2/subreads1.dexta
+data/greg200k-sv2/subreads2.dexta
diff --git a/FALCON-examples/run/greg200k-sv2/makefile b/FALCON-examples/run/greg200k-sv2/makefile
new file mode 100644
index 0000000..099e5ae
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/makefile
@@ -0,0 +1,10 @@
+falcon:
+ fc_run.py fc_run.cfg
+unzip:
+ fc_unzip.py fc_unzip.cfg
+quiver:
+ fc_quiver.py fc_unzip.cfg
+clean:
+ rm -rf 3-*/ mypwatcher/ pwatcher.dir
+cleaner: clean
+ rm -rf 0-*/ 1-*/ 2-*/ all.log mypwatcher/ scripts/ sge_log/
diff --git a/FALCON-examples/run/synth0/makefile b/FALCON-examples/run/synth0/makefile
index d471427..e7ef88e 100644
--- a/FALCON-examples/run/synth0/makefile
+++ b/FALCON-examples/run/synth0/makefile
@@ -5,14 +5,6 @@ go: run
${MAKE} test
run:
fc_run fc_run.cfg logging.json
-go1: run1
- ${MAKE} test
-run1:
- fc_run1 fc_run.cfg logging.ini
-go0: run0
- ${MAKE} test
-run0:
- fc_run0 fc_run.cfg logging.ini
test:
./check.py
clean:
diff --git a/FALCON-make/makefile b/FALCON-make/makefile
index f6c0d95..52b42d3 100644
--- a/FALCON-make/makefile
+++ b/FALCON-make/makefile
@@ -53,7 +53,7 @@ show:
echo "FALCON_PIP_EDIT=${FALCON_PIP_EDIT}"
echo "FALCON_PIP_USER=${FALCON_PIP_USER}"
check:
- python -c 'import pypeflow.common; print pypeflow.common'
+ python -c 'import pypeflow.simple_pwatcher_bridge; print pypeflow.simple_pwatcher_bridge'
python -c 'import falcon_kit; print falcon_kit.falcon'
extra:
pip install ${FALCON_PIP_USER} Cython
diff --git a/FALCON/falcon_kit/bash.py b/FALCON/falcon_kit/bash.py
index fabe142..dbf3378 100644
--- a/FALCON/falcon_kit/bash.py
+++ b/FALCON/falcon_kit/bash.py
@@ -262,8 +262,8 @@ ln -sf ${{db_dir}}/{db_prefix}.db .
ln -sf ${{db_dir}}/.{db_prefix}.dust.anno .
ln -sf ${{db_dir}}/.{db_prefix}.dust.data .
{daligner_cmd}
-rm -f *.C?.las *.C?.S.las
-rm -f *.N?.las *.N?.S.las
+rm -f *.C?.las *.C?.S.las *.C??.las *.C??.S.las *.C???.las *.C???.S.las
+rm -f *.N?.las *.N?.S.las *.N??.las *.N??.S.las *.N???.las *.N???.S.las
""".format(db_dir=db_dir, db_prefix=db_prefix, daligner_cmd=daligner_cmd)
yield job_uid, bash
diff --git a/FALCON/falcon_kit/mains/fetch_reads.py b/FALCON/falcon_kit/mains/fetch_reads.py
index c4576c9..9f7b458 100644
--- a/FALCON/falcon_kit/mains/fetch_reads.py
+++ b/FALCON/falcon_kit/mains/fetch_reads.py
@@ -6,45 +6,41 @@ import sys
import re
-
-
def fetch_ref_and_reads(base_dir, fofn, ctg_id, out_dir, min_ctg_lenth):
-
-
read_fofn = fofn
if out_dir == None:
- out_dir = os.path.join( base_dir, "3-unzip/reads")
+ out_dir = os.path.join(base_dir, '3-unzip/reads')
- ctg_fa = os.path.join( base_dir, "2-asm-falcon/p_ctg.fa")
- read_map_dir = os.path.join( base_dir, "2-asm-falcon/read_maps" )
+ ctg_fa = os.path.join(base_dir, '2-asm-falcon/p_ctg.fa')
+ read_map_dir = os.path.join(base_dir, '2-asm-falcon/read_maps')
- rawread_id_file = os.path.join( read_map_dir, "raw_read_ids" )
- pread_id_file = os.path.join( read_map_dir, "pread_ids" )
+ rawread_id_file = os.path.join(read_map_dir, 'dump_rawread_ids', 'rawread_ids')
+ pread_id_file = os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids')
- rid_to_oid = open(rawread_id_file).read().split("\n") #daligner raw read id to the original ids
- pid_to_fid = open(pread_id_file).read().split("\n") #daligner pread id to the fake ids
+ rid_to_oid = open(rawread_id_file).read().split('\n') #daligner raw read id to the original ids
+ pid_to_fid = open(pread_id_file).read().split('\n') #daligner pread id to the fake ids
def pid_to_oid(pid):
fid = pid_to_fid[int(pid)]
- rid = int(fid.split("/")[1])/10
+ rid = int(fid.split('/')[1])/10
return rid_to_oid[int(rid)]
ref_fasta = FastaReader(ctg_fa)
all_ctg_ids = set()
for s in ref_fasta:
s_id = s.name.split()[0]
- if ctg_id != "all" and s_id != ctg_id:
+ if ctg_id != 'all' and s_id != ctg_id:
continue
if len(s.sequence) < min_ctg_lenth:
continue
- if ctg_id != "all":
- ref_out = open( os.path.join( out_dir, "%s_ref.fa" % ctg_id), "w" )
+ if ctg_id != 'all':
+ ref_out = open( os.path.join( out_dir, '%s_ref.fa' % ctg_id), 'w' )
else:
- ref_out = open( os.path.join( out_dir, "%s_ref.fa" % s_id), "w" )
+ ref_out = open( os.path.join( out_dir, '%s_ref.fa' % s_id), 'w' )
- print >>ref_out, ">%s" % s_id
+ print >>ref_out, '>%s' % s_id
print >>ref_out, s.sequence
all_ctg_ids.add(s_id)
ref_out.close()
@@ -53,71 +49,71 @@ def fetch_ref_and_reads(base_dir, fofn, ctg_id, out_dir, min_ctg_lenth):
read_set = {}
ctg_id_hits = {}
- map_fn = os.path.join(read_map_dir,"rawread_to_contigs")
- with open(map_fn, "r") as f:
+ map_fn = os.path.join(read_map_dir,'rawread_to_contigs')
+ with open(map_fn, 'r') as f:
for row in f:
row = row.strip().split()
hit_ctg = row[1]
- hit_ctg = hit_ctg.split("-")[0]
+ hit_ctg = hit_ctg.split('-')[0]
if int(row[3]) == 0:
o_id = rid_to_oid[int(row[0])]
read_set[o_id] = hit_ctg
ctg_id_hits[hit_ctg] = ctg_id_hits.get(hit_ctg, 0) + 1
- map_fn = os.path.join(read_map_dir,"pread_to_contigs")
- with open(map_fn, "r") as f:
+ map_fn = os.path.join(read_map_dir,'pread_to_contigs')
+ with open(map_fn, 'r') as f:
for row in f:
row = row.strip().split()
hit_ctg = row[1]
- hit_ctg = hit_ctg.split("-")[0]
+ hit_ctg = hit_ctg.split('-')[0]
if hit_ctg not in read_set and int(row[3]) == 0:
o_id = pid_to_oid(row[0])
read_set[o_id] = hit_ctg
ctg_id_hits[hit_ctg] = ctg_id_hits.get(hit_ctg, 0) + 1
- with open(os.path.join( out_dir, "ctg_list"),"w") as f:
+ with open(os.path.join( out_dir, 'ctg_list'),'w') as f:
for ctg_id in sorted(list(all_ctg_ids)):
if ctg_id_hits.get(ctg_id, 0) < 5:
continue
- if ctg_id[-1] not in ["F", "R"]: #ignore small circle contigs, they need different approach
+ if ctg_id[-1] not in ['F', 'R']: #ignore small circle contigs, they need different approach
continue
print >>f, ctg_id
read_out_files = {}
- with open(read_fofn, "r") as f:
+ with open(read_fofn, 'r') as f:
for r_fn in f:
r_fn = r_fn.strip()
read_fa_file = FastaReader(r_fn)
for r in read_fa_file:
rid = r.name.split()[0]
if rid not in read_set:
- ctg_id = "unassigned"
+ ctg_id = 'unassigned'
else:
ctg_id = read_set[rid]
- if ctg_id == "NA" or ctg_id not in all_ctg_ids:
- ctg_id = "unassigned"
+ if ctg_id == 'NA' or ctg_id not in all_ctg_ids:
+ ctg_id = 'unassigned'
if ctg_id not in read_out_files:
- read_out = open( os.path.join( out_dir, "%s_reads.fa" % ctg_id), "w" )
+ read_out = open( os.path.join( out_dir, '%s_reads.fa' % ctg_id), 'w' )
read_out_files[ctg_id] = 1
else:
- read_out = open( os.path.join( out_dir, "%s_reads.fa" % ctg_id), "a" )
+ read_out = open( os.path.join( out_dir, '%s_reads.fa' % ctg_id), 'a' )
- print >>read_out, ">"+rid
+ print >>read_out, '>'+rid
print >>read_out, r.sequence
read_out.close()
def parse_args(argv):
parser = argparse.ArgumentParser(description='using the read to contig mapping data to partition the reads grouped by contigs')
- parser.add_argument('--base_dir', type=str, default="./", help='the base working dir of a falcon assembly')
- parser.add_argument('--fofn', type=str, default="./input.fofn", help='path to the file of the list of raw read fasta files')
- parser.add_argument('--ctg_id', type=str, default="all", help='contig identifier in the contig fasta file')
+ parser.add_argument('--base_dir', type=str, default='./', help='the base working dir of a falcon assembly')
+ parser.add_argument('--fofn', type=str, default='./input.fofn', help='path to the file of the list of raw read fasta files')
+ parser.add_argument('--ctg_id', type=str, default='all', help='contig identifier in the contig fasta file')
parser.add_argument('--out_dir', default=None, type=str, help='the output base_dir, default to `base_dir/3-unzip/reads` directory')
parser.add_argument('--min_ctg_lenth', default=20000, type=int, help='the minimum length of the contig for the outputs, default=20000')
- #parser.add_argument('--ctg_fa', type=str, default="./2-asm-falcon/p_ctg.fa", help='path to the contig fasta file')
- #parser.add_argument('--read_map_dir', type=str, default="./2-asm-falcon/read_maps", help='path to the read-contig map directory')
+ #parser.add_argument('--ctg_fa', type=str, default='./2-asm-falcon/p_ctg.fa', help='path to the contig fasta file')
+ #parser.add_argument('--read_map_dir', type=str, default='./2-asm-falcon/read_maps', help='path to the read-contig map directory')
# we can run this in parallel mode in the furture
#parser.add_argument('--n_core', type=int, default=4,
# help='number of processes used for generating consensus')
@@ -129,5 +125,5 @@ def main(argv=sys.argv):
args = parse_args(argv)
fetch_ref_and_reads(**vars(args))
-if __name__ == "__main__":
+if __name__ == '__main__':
main()
diff --git a/FALCON/falcon_kit/mains/get_read_ctg_map.py b/FALCON/falcon_kit/mains/get_read_ctg_map.py
index 7b85f95..349322f 100644
--- a/FALCON/falcon_kit/mains/get_read_ctg_map.py
+++ b/FALCON/falcon_kit/mains/get_read_ctg_map.py
@@ -1,10 +1,14 @@
-from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
-from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase
-from pypeflow.controller import PypeWorkflow, PypeMPWorkflow, PypeThreadWorkflow
-from falcon_kit.FastaReader import FastaReader
+from __future__ import absolute_import
+#from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
+#from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase
+#from pypeflow.controller import PypeWorkflow, PypeMPWorkflow, PypeThreadWorkflow
+from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
+ makePypeLocalFile, fn, PypeTask)
+PypeThreadTaskBase = MyFakePypeThreadTaskBase
from falcon_kit.fc_asm_graph import AsmGraph
import argparse
import glob
+import logging
import sys
import subprocess as sp
import shlex
@@ -14,97 +18,106 @@ def make_dirs(d):
if not os.path.isdir(d):
os.makedirs(d)
+def dump_rawread_ids(self):
+ rawread_db = fn(self.rawread_db)
+ rawread_id_file = fn(self.rawread_id_file)
+ os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file))
-def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
-
- read_map_dir = os.path.abspath(os.path.join(asm_dir, "read_maps"))
- make_dirs(read_map_dir)
+def dump_pread_ids(self):
+ pread_db = fn(self.pread_db)
+ pread_id_file = fn(self.pread_id_file)
+ os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file))
- PypeMPWorkflow.setNumThreadAllowed(12, 12)
- wf = PypeMPWorkflow()
+def generate_read_to_ctg_map(self):
+ rawread_id_file = fn(self.rawread_id_file)
+ pread_id_file = fn(self.pread_id_file)
+ read_to_contig_map = fn(self.read_to_contig_map)
- rawread_db = makePypeLocalFile( os.path.join( rawread_dir, "raw_reads.db" ) )
- rawread_id_file = makePypeLocalFile( os.path.join( read_map_dir, "raw_read_ids" ) )
+ pread_did_to_rid = open(pread_id_file).read().split('\n')
+ rid_to_oid = open(rawread_id_file).read().split('\n')
- @PypeTask( inputs = {"rawread_db": rawread_db},
- outputs = {"rawread_id_file": rawread_id_file},
- TaskType = PypeThreadTaskBase,
- URL = "task://localhost/dump_rawread_ids" )
- def dump_rawread_ids(self):
- rawread_db = fn( self.rawread_db )
- rawread_id_file = fn( self.rawread_id_file )
- os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file) )
+ asm_G = AsmGraph(fn(self.sg_edges_list),
+ fn(self.utg_data),
+ fn(self.ctg_paths))
- wf.addTask( dump_rawread_ids )
-
- pread_db = makePypeLocalFile( os.path.join( pread_dir, "preads.db" ) )
- pread_id_file = makePypeLocalFile( os.path.join( read_map_dir, "pread_ids" ) )
-
- @PypeTask( inputs = {"pread_db": pread_db},
- outputs = {"pread_id_file": pread_id_file},
- TaskType = PypeThreadTaskBase,
- URL = "task://localhost/dump_pread_ids" )
- def dump_pread_ids(self):
- pread_db = fn( self.pread_db )
- pread_id_file = fn( self.pread_id_file )
- os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file) )
+ pread_to_contigs = {}
- wf.addTask( dump_pread_ids )
+ with open(read_to_contig_map, 'w') as f:
+ for ctg in asm_G.ctg_data:
+ if ctg[-1] == 'R':
+ continue
+ ctg_g = asm_G.get_sg_for_ctg(ctg)
+ for n in ctg_g.nodes():
+ pid = int(n.split(':')[0])
- wf.refreshTargets() # block
+ rid = pread_did_to_rid[pid].split('/')[1]
+ rid = int(int(rid)/10)
+ oid = rid_to_oid[rid]
+ k = (pid, rid, oid)
+ pread_to_contigs.setdefault(k, set())
+ pread_to_contigs[k].add(ctg)
- sg_edges_list = makePypeLocalFile( os.path.join(asm_dir, "sg_edges_list") )
- utg_data = makePypeLocalFile( os.path.join(asm_dir, "utg_data") )
- ctg_paths = makePypeLocalFile( os.path.join(asm_dir, "ctg_paths") )
- inputs = { "rawread_id_file": rawread_id_file,
- "pread_id_file": pread_id_file,
- "sg_edges_list": sg_edges_list,
- "utg_data": utg_data,
- "ctg_paths": ctg_paths }
+ for k in pread_to_contigs:
+ pid, rid, oid = k
+ for ctg in list(pread_to_contigs[ k ]):
+ print >>f, '%09d %09d %s %s' % (pid, rid, oid, ctg)
- read_to_contig_map = makePypeLocalFile( os.path.join(read_map_dir, "read_to_contig_map") )
+def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
+ read_map_dir = os.path.abspath(os.path.join(asm_dir, 'read_maps'))
+ make_dirs(read_map_dir)
- @PypeTask( inputs = inputs,
- outputs = {"read_to_contig_map": read_to_contig_map},
+ wf = PypeProcWatcherWorkflow(
+ max_jobs=12,
+ )
+ """
+ job_type=config['job_type'],
+ job_queue=config['job_queue'],
+ sge_option=config.get('sge_option', ''),
+ watcher_type=config['pwatcher_type'],
+ watcher_directory=config['pwatcher_directory'])
+ """
+
+ rawread_db = makePypeLocalFile(os.path.join(rawread_dir, 'raw_reads.db'))
+ rawread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_rawread_ids', 'rawread_ids'))
+
+ task = PypeTask(
+ inputs = {'rawread_db': rawread_db},
+ outputs = {'rawread_id_file': rawread_id_file},
+ TaskType = PypeThreadTaskBase,
+ URL = 'task://localhost/dump_rawread_ids')
+ wf.addTask(task(dump_rawread_ids))
+
+ pread_db = makePypeLocalFile(os.path.join(pread_dir, 'preads.db'))
+ pread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids'))
+
+ task = PypeTask(
+ inputs = {'pread_db': pread_db},
+ outputs = {'pread_id_file': pread_id_file},
TaskType = PypeThreadTaskBase,
- URL = "task://localhost/get_ctg_read_map" )
- def generate_read_to_ctg_map(self):
- rawread_id_file = fn( self.rawread_id_file )
- pread_id_file = fn( self.pread_id_file )
- read_to_contig_map = fn( self.read_to_contig_map )
-
- pread_did_to_rid = open(pread_id_file).read().split("\n")
- rid_to_oid = open(rawread_id_file).read().split("\n")
+ URL = 'task://localhost/dump_pread_ids' )
+ wf.addTask(task(dump_pread_ids))
- asm_G = AsmGraph(fn(self.sg_edges_list),
- fn(self.utg_data),
- fn(self.ctg_paths) )
-
- pread_to_contigs = {}
-
- with open(read_to_contig_map, "w") as f:
- for ctg in asm_G.ctg_data:
- if ctg[-1] == "R":
- continue
- ctg_g = asm_G.get_sg_for_ctg(ctg)
- for n in ctg_g.nodes():
- pid = int(n.split(":")[0])
+ wf.refreshTargets() # block
- rid = pread_did_to_rid[pid].split("/")[1]
- rid = int(int(rid)/10)
- oid = rid_to_oid[rid]
- k = (pid, rid, oid)
- pread_to_contigs.setdefault( k, set() )
- pread_to_contigs[ k ].add( ctg )
+ sg_edges_list = makePypeLocalFile( os.path.join(asm_dir, 'sg_edges_list') )
+ utg_data = makePypeLocalFile( os.path.join(asm_dir, 'utg_data') )
+ ctg_paths = makePypeLocalFile( os.path.join(asm_dir, 'ctg_paths') )
+ inputs = { 'rawread_id_file': rawread_id_file,
+ 'pread_id_file': pread_id_file,
+ 'sg_edges_list': sg_edges_list,
+ 'utg_data': utg_data,
+ 'ctg_paths': ctg_paths }
- for k in pread_to_contigs:
- pid, rid, oid = k
- for ctg in list(pread_to_contigs[ k ]):
- print >>f, "%09d %09d %s %s" % (pid, rid, oid, ctg)
+ read_to_contig_map = makePypeLocalFile(os.path.join(read_map_dir, 'get_ctg_read_map', 'read_to_contig_map'))
- wf.addTask( generate_read_to_ctg_map )
+ task = PypeTask(
+ inputs = inputs,
+ outputs = {'read_to_contig_map': read_to_contig_map},
+ TaskType = PypeThreadTaskBase,
+ URL = 'task://localhost/get_ctg_read_map')
+ wf.addTask(task(generate_read_to_ctg_map))
wf.refreshTargets() # block
@@ -112,18 +125,19 @@ def parse_args(argv):
parser = argparse.ArgumentParser(description='generate `2-asm-falcon/read_maps/read_to_contig_map` that contains the \
information from the chain of mapping: (contig id) -> (internal p-read id) -> (internal raw-read id) -> (original read id)',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--basedir', type=str, default="./", help='the base working dir of a FALCON assembly')
+ parser.add_argument('--basedir', type=str, default='./', help='the base working dir of a FALCON assembly')
args = parser.parse_args(argv[1:])
return args
def main(argv=sys.argv):
+ logging.basicConfig()
args = parse_args(argv)
basedir = args.basedir
- rawread_dir = os.path.abspath( os.path.join( basedir, "0-rawreads" ) )
- pread_dir = os.path.abspath( os.path.join( basedir, "1-preads_ovl" ) )
- asm_dir = os.path.abspath( os.path.join( basedir, "2-asm-falcon") )
+ rawread_dir = os.path.abspath(os.path.join( basedir, '0-rawreads'))
+ pread_dir = os.path.abspath(os.path.join( basedir, '1-preads_ovl'))
+ asm_dir = os.path.abspath(os.path.join( basedir, '2-asm-falcon'))
get_read_ctg_map(rawread_dir=rawread_dir, pread_dir=pread_dir, asm_dir=asm_dir)
-if __name__ == "__main__":
+if __name__ == '__main__':
main()
diff --git a/FALCON/falcon_kit/mains/pr_ctg_track.py b/FALCON/falcon_kit/mains/pr_ctg_track.py
index f516866..f1b06fc 100644
--- a/FALCON/falcon_kit/mains/pr_ctg_track.py
+++ b/FALCON/falcon_kit/mains/pr_ctg_track.py
@@ -54,8 +54,8 @@ def tr_stage1(readlines, min_len, bestn, pid_to_ctg):
def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
io.LOG('preparing tr_stage1')
io.logstats()
- asm_dir = os.path.abspath( os.path.join(base_dir, "2-asm-falcon") )
- pid_to_ctg = get_pid_to_ctg( os.path.join(asm_dir, "read_maps/read_to_contig_map" ) )
+ asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon'))
+ pid_to_ctg = get_pid_to_ctg(os.path.join(asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map'))
inputs = []
for fn in file_list:
inputs.append( (run_tr_stage1, db_fn, fn, min_len, bestn, pid_to_ctg) )
@@ -135,7 +135,7 @@ def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream):
def parse_args(argv):
parser = argparse.ArgumentParser(description='scan the pread overlap information to identify the best hit from the preads \
-to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/read_to_contig_map`',
+to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--n_core', type=int, default=4,
help='number of processes used for for tracking reads; '
diff --git a/FALCON/falcon_kit/mains/rr_ctg_track.py b/FALCON/falcon_kit/mains/rr_ctg_track.py
index ad07f48..397d382 100644
--- a/FALCON/falcon_kit/mains/rr_ctg_track.py
+++ b/FALCON/falcon_kit/mains/rr_ctg_track.py
@@ -54,8 +54,8 @@ def tr_stage1(readlines, min_len, bestn, rid_to_ctg):
def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
io.LOG('preparing tr_stage1')
io.logstats()
- asm_dir = os.path.abspath( os.path.join(base_dir, "2-asm-falcon") )
- rid_to_ctg = get_rid_to_ctg( os.path.join(asm_dir, "read_maps/read_to_contig_map" ) )
+ asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon'))
+ rid_to_ctg = get_rid_to_ctg(os.path.join(asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map'))
inputs = []
for fn in file_list:
inputs.append( (run_tr_stage1, db_fn, fn, min_len, bestn, rid_to_ctg) )
@@ -75,12 +75,12 @@ def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
else:
heappushpop( bread_to_areads[k], item )
- #rid_to_oid = open(os.path.join( rawread_dir, "raw_read_ids" ) ).read().split("\n")
+ #rid_to_oid = open(os.path.join(rawread_dir, 'dump_rawread_ids', 'raw_read_ids')).read().split('\n')
"""
For each b-read, we find the best contig map throgh the b->a->contig map.
"""
- with open( os.path.join(asm_dir, "read_maps/rawread_to_contigs"), "w") as out_f:
+ with open(os.path.join(asm_dir, 'read_maps/rawread_to_contigs'), 'w') as out_f:
for bread in bread_to_areads:
ctg_score = {}
@@ -145,7 +145,7 @@ def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream):
def parse_args(argv):
parser = argparse.ArgumentParser(description='scan the raw read overlap information to identify the best hit from the reads \
-to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/read_to_contig_map`',
+to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--n_core', type=int, default=4,
help='number of processes used for for tracking reads; '
diff --git a/FALCON/falcon_kit/mains/run1.py b/FALCON/falcon_kit/mains/run1.py
index cdfb05b..8fd7352 100644
--- a/FALCON/falcon_kit/mains/run1.py
+++ b/FALCON/falcon_kit/mains/run1.py
@@ -1,9 +1,9 @@
from .. import run_support as support
from .. import bash, pype_tasks
from ..util.system import only_these_symlinks
-from pypeflow.pwatcher_bridge import PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase
-from pypeflow.data import makePypeLocalFile, fn
-from pypeflow.task import PypeTask
+#from pypeflow.pwatcher_bridge import PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase
+#from pypeflow.data import makePypeLocalFile, fn
+#from pypeflow.task import PypeTask
from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
makePypeLocalFile, fn, PypeTask)
import argparse
@@ -133,7 +133,6 @@ def main1(prog_name, input_config_fn, logger_config_fn=None):
fc_run_logger.exception('Failed to parse config "{}".'.format(input_config_fn))
raise
input_fofn_plf = makePypeLocalFile(config['input_fofn'])
- #Workflow = PypeProcWatcherWorkflow
wf = PypeProcWatcherWorkflow(job_type=config['job_type'],
job_queue=config['job_queue'],
sge_option=config.get('sge_option', ''),
@@ -142,13 +141,12 @@ def main1(prog_name, input_config_fn, logger_config_fn=None):
run(wf, config,
os.path.abspath(input_config_fn),
input_fofn_plf=input_fofn_plf,
- setNumThreadAllowed=PypeProcWatcherWorkflow.setNumThreadAllowed)
+ )
def run(wf, config,
input_config_fn,
input_fofn_plf,
- setNumThreadAllowed,
):
"""
Preconditions (for now):
diff --git a/FALCON/test/test_functional.py b/FALCON/test/test_functional.py
index 832ba1f..aafb6d4 100644
--- a/FALCON/test/test_functional.py
+++ b/FALCON/test/test_functional.py
@@ -30,7 +30,6 @@ def test_get_daligner_job_descriptions_small():
example_se161 = os.path.join(thisdir, 'se161.sh')
def test_get_daligner_job_descriptions_se161():
- # when there is only 1 block, a special case
example_HPCdaligner = open(example_se161)
result = f.get_daligner_job_descriptions(
example_HPCdaligner, 'raw_reads', single=False)
diff --git a/pypeFLOW/pypeflow/common.py b/pypeFLOW/pypeflow/common.py
deleted file mode 100644
index 4c12aef..0000000
--- a/pypeFLOW/pypeflow/common.py
+++ /dev/null
@@ -1,149 +0,0 @@
-
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeCommon: provide the common base classes and general module level utility functions
- and constants for PypeEngine
-
-"""
-
-from urlparse import urlparse
-
-import rdflib
-try:
- from rdflib import ConjunctiveGraph as Graph #work for rdflib-3.1.0
- # need to install rdfextras for rdflib-3.0.0
- rdflib.plugin.register('sparql', rdflib.query.Processor,
- 'rdfextras.sparql.processor', 'Processor')
- rdflib.plugin.register('sparql', rdflib.query.Result,
- 'rdfextras.sparql.query', 'SPARQLQueryResult')
-except Exception:
- from rdflib.Graph import ConjunctiveGraph as Graph #work for rdflib-2.4.2
-from rdflib import Namespace
-from rdflib import Literal
-from rdflib import URIRef
-
-from subprocess import Popen, PIPE
-import time
-
-pypeNS = Namespace("pype://v0.1/")
-
-class PypeError(Exception):
- def __init__(self, msg):
- Exception.__init__(self, msg) # to make __repr__() show class name
- self.msg = msg
- def __str__(self):
- return repr(self.msg)
-
-class NotImplementedError(PypeError):
- pass
-
-class URLSchemeNotSupportYet(PypeError):
- pass
-
-class PypeObject(object):
-
- """
-
- Base class for all PypeObjects
-
- Every PypeObject should have an URL.
- The instance attributes can be set by using keyword argument in __init__().
-
- """
-
- def __init__(self, URL, **attributes):
-
- URLParseResult = urlparse(URL)
- if URLParseResult.scheme not in self.__class__.supportedURLScheme:
- raise URLSchemeNotSupportYet("%s is not supported yet" % URLParseResult.scheme )
- else:
- self.URL = URL
- for k,v in attributes.iteritems():
- if k not in self.__dict__:
- self.__dict__[k] = v
-
- def _updateURL(self, newURL):
- URLParseResult = urlparse(self.URL)
- newURLParseResult = urlparse(newURL)
- if URLParseResult.scheme != newURLParseResult.scheme:
- raise PypeError, "the URL scheme can not be changed for obj %s" % self.URL
- self.URL = newURL
-
- @property
- def _RDFGraph(self):
- graph = Graph()
-
- for k, v in self.__dict__.iteritems():
- if k == "URL": continue
- if k[0] == "_": continue
- if hasattr(v, "URL"):
- graph.add( ( URIRef(self.URL), pypeNS[k], URIRef(v.URL) ) )
- return graph
-
-
-
- @property
- def RDFXML(self):
-
- """
- RDF XML representation of the everything related to the PypeObject
- """
-
- return self._RDFGraph.serialize()
-
-def runShellCmd(args,**kwargs):
-
- """
- Utility function that runs a shell script command.
- It blocks until the command is finished. The return value
- from the shell command is returned
- """
-
- p = Popen(args,**kwargs)
- pStatus = None
- while 1:
- time.sleep(0.2)
- pStatus = p.poll()
- if pStatus != None:
- break
- return pStatus
-
-def runSgeSyncJob(args):
-
- """
- Utility function that runs a shell script with SGE.
- It blocks until the command is finished. The return value
- from the shell command is returned
- """
-
- p = Popen(args)
- pStatus = None
- while 1:
- time.sleep(0.1)
- pStatus = p.poll()
- if pStatus != None:
- break
- return pStatus
diff --git a/pypeFLOW/pypeflow/controller.py b/pypeFLOW/pypeflow/controller.py
deleted file mode 100644
index ce39e89..0000000
--- a/pypeFLOW/pypeflow/controller.py
+++ /dev/null
@@ -1,875 +0,0 @@
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeController: This module provides the PypeWorkflow that controlls how a workflow is excuted.
-
-"""
-import sys
-import datetime
-import multiprocessing
-import threading
-import time
-import traceback
-import logging
-import Queue
-from cStringIO import StringIO
-from urlparse import urlparse
-
-# TODO(CD): When we stop using Python 2.5, use relative-imports and remove this dir from PYTHONPATH.
-from common import PypeError, PypeObject, Graph, URIRef, pypeNS
-from data import PypeDataObjectBase, PypeSplittableLocalFile
-from task import PypeTaskBase, PypeTaskCollection, PypeThreadTaskBase, getFOFNMapTasks
-from task import TaskInitialized, TaskDone, TaskFail
-
-logger = logging.getLogger(__name__)
-
-class TaskExecutionError(PypeError):
- pass
-class TaskTypeError(PypeError):
- pass
-class TaskFailureError(PypeError):
- pass
-class LateTaskFailureError(PypeError):
- pass
-
-class PypeNode(object):
- """
- Representing a node in the dependence DAG.
- """
-
- def __init__(self, obj):
- self.obj = obj
- self._outNodes = set()
- self._inNodes = set()
-
- def addAnOutNode(self, obj):
- self._outNodes.add(obj)
-
- def addAnInNode(self, obj):
- self._inNodes.add(obj)
-
- def removeAnOutNode(self, obj):
- self._outNodes.remove(obj)
-
- def removeAnInNode(self, obj):
- self._inNodes.remove(obj)
-
- @property
- def inDegree(self):
- return len(self._inNodes)
-
- @property
- def outDegree(self):
- return len(self._outNodes)
-
- @property
- def depth(self):
- if self.inDegree == 0:
- return 1
- return 1 + max([ node.depth for node in self._inNodes ])
-
-class PypeGraph(object):
- """
- Representing a dependence DAG with PypeObjects.
- """
-
- def __init__(self, RDFGraph, subGraphNodes=None):
- """
- Construct an internal DAG with PypeObject given an RDF graph.
- A sub-graph can be constructed if subGraphNodes is not "None"
- """
-
- self._RDFGraph = RDFGraph
- self._allEdges = set()
- self._allNodes = set()
- self.url2Node ={}
-
- for row in self._RDFGraph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)):
- if subGraphNodes != None:
- if row[0] not in subGraphNodes: continue
- if row[1] not in subGraphNodes: continue
-
- sURL, oURL = str(row[0]), str(row[1])
-
- self.url2Node[sURL] = self.url2Node.get( sURL, PypeNode(str(row[0])) )
- self.url2Node[oURL] = self.url2Node.get( oURL, PypeNode(str(row[1])) )
-
- n1 = self.url2Node[oURL]
- n2 = self.url2Node[sURL]
-
- n1.addAnOutNode(n2)
- n2.addAnInNode(n1)
-
- anEdge = (n1, n2)
- self._allNodes.add( n1 )
- self._allNodes.add( n2 )
- self._allEdges.add( anEdge )
-
- def __getitem__(self, url):
- """PypeGraph["URL"] ==> PypeNode"""
- return self.url2Node[url]
-
- def tSort(self): #return a topoloical sort node list
- """
- Output topological sorted list of the graph element.
- It raises a TeskExecutionError if a circle is detected.
- """
- edges = self._allEdges.copy()
-
- S = [x for x in self._allNodes if x.inDegree == 0]
- L = []
- while len(S) != 0:
- n = S.pop()
- L.append(n)
- outNodes = n._outNodes.copy()
- for m in outNodes:
- edges.remove( (n, m) )
- n.removeAnOutNode(m)
- m.removeAnInNode(n)
- if m.inDegree == 0:
- S.append(m)
-
- if len(edges) != 0:
- raise TaskExecutionError(" Circle detectd in the dependency graph ")
- else:
- return [x.obj for x in L]
-
-class PypeWorkflow(PypeObject):
- """
- Representing a PypeWorkflow. PypeTask and PypeDataObjects can be added
- into the workflow and executed through the instanct methods.
-
- >>> import os, time
- >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
- >>> from pypeflow.task import *
- >>> try:
- ... os.makedirs("/tmp/pypetest")
- ... _ = os.system("rm -f /tmp/pypetest/*")
- ... except Exception:
- ... pass
- >>> time.sleep(1)
- >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False)
- >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False)
- >>> @PypeTask(outputDataObjs={"test_out":fout},
- ... inputDataObjs={"test_in":fin},
- ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'})
- ... def test(self):
- ... print test.test_in.localFileName
- ... print test.test_out.localFileName
- ... os.system( "touch %s" % fn(test.test_out) )
- ... pass
- >>> os.system( "touch %s" % (fn(fin)) )
- 0
- >>> from pypeflow.controller import PypeWorkflow
- >>> wf = PypeWorkflow()
- >>> wf.addTask(test)
- >>> def finalize(self):
- ... def f():
- ... print "in finalize:", self._status
- ... return f
- >>> test.finalize = finalize(test) # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overriden by subclasses.
- >>> wf.refreshTargets( objs = [fout] )
- /tmp/pypetest/testfile_in
- /tmp/pypetest/testfile_out
- in finalize: done
- True
- """
-
- supportedURLScheme = ["workflow"]
-
- def __init__(self, URL = None, **attributes ):
-
- if URL == None:
- URL = "workflow://" + __file__+"/%d" % id(self)
-
- self._pypeObjects = {}
-
- PypeObject.__init__(self, URL, **attributes)
-
- self._referenceRDFGraph = None #place holder for a reference RDF
-
-
- def addObject(self, obj):
- self.addObjects([obj])
-
- def addObjects(self, objs):
- """
- Add data objects into the workflow. One can add also task object to the workflow using this method for
- non-threaded workflow.
- """
- for obj in objs:
- if obj.URL in self._pypeObjects:
- if id(self._pypeObjects[obj.URL]) != id(obj):
- raise PypeError, "Add different objects with the same URL %s" % obj.URL
- else:
- continue
- self._pypeObjects[obj.URL] = obj
-
- def addTask(self, taskObj):
- self.addTasks([taskObj])
-
-
- def addTasks(self, taskObjs):
- """
- Add tasks into the workflow. The dependent input and output data objects are added automatically too.
- It sets the message queue used for communicating between the task thread and the main thread. One has
- to use addTasks() or addTask() to add task objects to a threaded workflow.
- """
- for taskObj in taskObjs:
- if isinstance(taskObj, PypeTaskCollection):
- for subTaskObj in taskObj.getTasks() + taskObj.getScatterGatherTasks():
- self.addObjects(subTaskObj.inputDataObjs.values())
- self.addObjects(subTaskObj.outputDataObjs.values())
- self.addObjects(subTaskObj.mutableDataObjs.values())
- self.addObject(subTaskObj)
-
- else:
- for dObj in taskObj.inputDataObjs.values() +\
- taskObj.outputDataObjs.values() +\
- taskObj.mutableDataObjs.values() :
- if isinstance(dObj, PypeSplittableLocalFile):
- self.addObjects([dObj._completeFile])
- self.addObjects([dObj])
-
- self.addObject(taskObj)
-
-
- def removeTask(self, taskObj):
- self.removeTasks([taskObj])
-
- def removeTasks(self, taskObjs ):
- """
- Remove tasks from the workflow.
- """
- self.removeObjects(taskObjs)
-
- def removeObjects(self, objs):
- """
- Remove objects from the workflow. If the object cannot be found, a PypeError is raised.
- """
- for obj in objs:
- if obj.URL in self._pypeObjects:
- del self._pypeObjects[obj.URL]
- else:
- raise PypeError, "Unable to remove %s from the graph. (Object not found)" % obj.URL
-
- def updateURL(self, oldURL, newURL):
- obj = self._pypeObjects[oldURL]
- obj._updateURL(newURL)
- self._pypeObjects[newURL] = obj
- del self._pypeObjects[oldURL]
-
-
-
- @property
- def _RDFGraph(self):
- # expensive to recompute
- graph = Graph()
- for URL, obj in self._pypeObjects.iteritems():
- for s,p,o in obj._RDFGraph:
- graph.add( (s,p,o) )
- return graph
-
- def setReferenceRDFGraph(self, fn):
- self._referenceRDFGraph = Graph()
- self._referenceRDFGraph.load(fn)
- refMD5s = self._referenceRDFGraph.subject_objects(pypeNS["codeMD5digest"])
- for URL, md5digest in refMD5s:
- obj = self._pypeObjects[str(URL)]
- obj.setReferenceMD5(md5digest)
-
- def _graphvizDot(self, shortName=False):
- graph = self._RDFGraph
- dotStr = StringIO()
- shapeMap = {"file":"box", "state":"box", "task":"component"}
- colorMap = {"file":"yellow", "state":"cyan", "task":"green"}
- dotStr.write( 'digraph "%s" {\n rankdir=LR;' % self.URL)
- for URL in self._pypeObjects.keys():
- URLParseResult = urlparse(URL)
- if URLParseResult.scheme not in shapeMap:
- continue
- else:
- shape = shapeMap[URLParseResult.scheme]
- color = colorMap[URLParseResult.scheme]
-
- s = URL
- if shortName == True:
- s = URLParseResult.scheme + "://..." + URLParseResult.path.split("/")[-1]
- dotStr.write( '"%s" [shape=%s, fillcolor=%s, style=filled];\n' % (s, shape, color))
-
- for row in graph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)):
- s, o = row
- if shortName == True:
- s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1]
- o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1]
- dotStr.write( '"%s" -> "%s";\n' % (o, s))
- for row in graph.query('SELECT ?s ?o WHERE {?s pype:hasMutable ?o . }', initNs=dict(pype=pypeNS)):
- s, o = row
- if shortName == True:
- s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1]
- o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1]
- dotStr.write( '"%s" -- "%s" [arrowhead=both, style=dashed ];\n' % (s, o))
- dotStr.write ("}")
- return dotStr.getvalue()
-
- @property
- def graphvizDot(self):
- return self._graphvizDot()
-
- @property
- def graphvizShortNameDot(self):
- return self._graphvizDot(shortName = True)
-
- @property
- def makeFileStr(self):
- """
- generate a string that has the information of the execution dependency in
- a "Makefile" like format. It can be written into a "Makefile" and
- executed by "make".
- """
- for URL in self._pypeObjects.keys():
- URLParseResult = urlparse(URL)
- if URLParseResult.scheme != "task": continue
- taskObj = self._pypeObjects[URL]
- if not hasattr(taskObj, "script"):
- raise TaskTypeError("can not convert non shell script based workflow to a makefile")
- makeStr = StringIO()
- for URL in self._pypeObjects.keys():
- URLParseResult = urlparse(URL)
- if URLParseResult.scheme != "task": continue
- taskObj = self._pypeObjects[URL]
- inputFiles = taskObj.inputDataObjs
- outputFiles = taskObj.outputDataObjs
- #for oStr in [o.localFileName for o in outputFiles.values()]:
- if 1:
- oStr = " ".join( [o.localFileName for o in outputFiles.values()])
-
- iStr = " ".join([i.localFileName for i in inputFiles.values()])
- makeStr.write( "%s:%s\n" % ( oStr, iStr ) )
- makeStr.write( "\t%s\n\n" % taskObj.script )
- makeStr.write("all: %s" % " ".join([o.localFileName for o in outputFiles.values()]) )
- return makeStr.getvalue()
-
- @staticmethod
- def getSortedURLs(rdfGraph, objs):
- if len(objs) != 0:
- connectedPypeNodes = set()
- for obj in objs:
- if isinstance(obj, PypeSplittableLocalFile):
- obj = obj._completeFile
- for x in rdfGraph.transitive_objects(URIRef(obj.URL), pypeNS["prereq"]):
- connectedPypeNodes.add(x)
- tSortedURLs = PypeGraph(rdfGraph, connectedPypeNodes).tSort( )
- else:
- tSortedURLs = PypeGraph(rdfGraph).tSort( )
- return tSortedURLs
-
- def refreshTargets(self, objs = [], callback = (None, None, None) ):
- """
- Execute the DAG to reach all objects in the "objs" argument.
- """
- tSortedURLs = self.getSortedURLs(self._RDFGraph, objs)
- for URL in tSortedURLs:
- obj = self._pypeObjects[URL]
- if not isinstance(obj, PypeTaskBase):
- continue
- else:
- obj()
- obj.finalize()
- self._runCallback(callback)
- return True
-
- def _runCallback(self, callback = (None, None, None ) ):
- if callback[0] != None and callable(callback[0]):
- argv = []
- kwargv = {}
- if callback[1] != None and isinstance( callback[1], type(list()) ):
- argv = callback[1]
- else:
- raise TaskExecutionError( "callback argument type error")
-
- if callback[2] != None and isinstance( callback[1], type(dict()) ):
- kwargv = callback[2]
- else:
- raise TaskExecutionError( "callback argument type error")
-
- callback[0](*argv, **kwargv)
-
- elif callback[0] != None:
- raise TaskExecutionError( "callback is not callable")
-
- @property
- def dataObjects( self ):
- return [ o for o in self._pypeObjects.values( ) if isinstance( o, PypeDataObjectBase )]
-
- @property
- def tasks( self ):
- return [ o for o in self._pypeObjects.values( ) if isinstance( o, PypeTaskBase )]
-
- @property
- def inputDataObjects(self):
- graph = self._RDFGraph
- inputObjs = []
- for obj in self.dataObjects:
- r = graph.query('SELECT ?o WHERE {<%s> pype:prereq ?o . }' % obj.URL, initNs=dict(pype=pypeNS))
- if len(r) == 0:
- inputObjs.append(obj)
- return inputObjs
-
- @property
- def outputDataObjects(self):
- graph = self._RDFGraph
- outputObjs = []
- for obj in self.dataObjects:
- r = graph.query('SELECT ?s WHERE {?s pype:prereq <%s> . }' % obj.URL, initNs=dict(pype=pypeNS))
- if len(r) == 0:
- outputObjs.append(obj)
- return outputObjs
-
-def PypeMPWorkflow(URL = None, **attributes):
- """Factory for the workflow using multiprocessing.
- """
- th = _PypeProcsHandler()
- mq = multiprocessing.Queue()
- se = multiprocessing.Event()
- return _PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
- attributes=attributes)
-
-def PypeThreadWorkflow(URL = None, **attributes):
- """Factory for the workflow using threading.
- """
- th = _PypeThreadsHandler()
- mq = Queue.Queue()
- se = threading.Event()
- return _PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
- attributes=attributes)
-
-class _PypeConcurrentWorkflow(PypeWorkflow):
- """
- Representing a PypeWorkflow that can excute tasks concurrently using threads. It
- assume all tasks block until they finish. PypeTask and PypeDataObjects can be added
- into the workflow and executed through the instance methods.
- """
-
- CONCURRENT_THREAD_ALLOWED = 16
- MAX_NUMBER_TASK_SLOT = CONCURRENT_THREAD_ALLOWED
-
- @classmethod
- def setNumThreadAllowed(cls, nT, nS):
- """
- Override the default number of threads used to run the tasks with this method.
- """
- cls.CONCURRENT_THREAD_ALLOWED = nT
- cls.MAX_NUMBER_TASK_SLOT = nS
-
- def __init__(self, URL, thread_handler, messageQueue, shutdown_event, attributes):
- PypeWorkflow.__init__(self, URL, **attributes )
- self.thread_handler = thread_handler
- self.messageQueue = messageQueue
- self.shutdown_event = shutdown_event
- self.jobStatusMap = dict()
-
- def addTasks(self, taskObjs):
- """
- Add tasks into the workflow. The dependent input and output data objects are added automatically too.
- It sets the message queue used for communicating between the task thread and the main thread. One has
- to use addTasks() or addTask() to add task objects to a threaded workflow.
- """
- for taskObj in taskObjs:
- if isinstance(taskObj, PypeTaskCollection):
- for subTaskObj in taskObj.getTasks() + taskObj.getScatterGatherTasks():
- if not isinstance(subTaskObj, PypeThreadTaskBase):
- raise TaskTypeError("Only PypeThreadTask can be added into a PypeThreadWorkflow. The task object %s has type %s " % (subTaskObj.URL, repr(type(subTaskObj))))
- subTaskObj.setMessageQueue(self.messageQueue)
- subTaskObj.setShutdownEvent(self.shutdown_event)
- else:
- if not isinstance(taskObj, PypeThreadTaskBase):
- raise TaskTypeError("Only PypeThreadTask can be added into a PypeThreadWorkflow. The task object has type %s " % repr(type(taskObj)))
- taskObj.setMessageQueue(self.messageQueue)
- taskObj.setShutdownEvent(self.shutdown_event)
-
- PypeWorkflow.addTasks(self, taskObjs)
-
- def refreshTargets(self, objs=None,
- callback=(None, None, None),
- updateFreq=None,
- exitOnFailure=True):
- if objs is None:
- objs = []
- task2thread = {}
- try:
- rtn = self._refreshTargets(task2thread, objs = objs, callback = callback, updateFreq = updateFreq, exitOnFailure = exitOnFailure)
- return rtn
- except:
- tb = traceback.format_exc()
- self.shutdown_event.set()
- logger.critical("Any exception caught in RefreshTargets() indicates an unrecoverable error. Shutting down...")
- shutdown_msg = """
- "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
- "! Please wait for all threads / processes to terminate !"
- "! Also, maybe use 'ps' or 'qstat' to check all threads,!"
- "! processes and/or jobs are terminated cleanly. !"
- "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
- """
- import warnings
- warnings.warn(shutdown_msg)
- th = self.thread_handler
- threads = list(task2thread.values())
- logger.warning("#tasks=%d, #alive=%d" %(len(threads), th.alive(threads)))
- try:
- while th.alive(threads):
- th.join(threads, 2)
- logger.warning("Now, #tasks=%d, #alive=%d" %(len(threads), th.alive(threads)))
- except (KeyboardInterrupt, SystemExit) as e:
- logger.debug("Interrupted while joining threads (while handling exception from RefreshTargets()). Trying to terminate any working processes before final exit.")
- th.notifyTerminate(threads)
- raise Exception('Caused by:\n' + tb)
-
-
- def _refreshTargets(self, task2thread, objs,
- callback,
- updateFreq,
- exitOnFailure):
- thread = self.thread_handler.create
-
- rdfGraph = self._RDFGraph # expensive to recompute, should not change during execution
- tSortedURLs = self.getSortedURLs(rdfGraph, objs)
-
- sortedTaskList = [ (str(u), self._pypeObjects[u], self._pypeObjects[u].getStatus()) for u in tSortedURLs
- if isinstance(self._pypeObjects[u], PypeTaskBase) ]
- self.jobStatusMap = dict( ( (t[0], t[2]) for t in sortedTaskList ) )
- logger.info("# of tasks in complete graph: %d" %(
- len(sortedTaskList),
- ))
-
- prereqJobURLMap = {}
-
- for URL, taskObj, tStatus in sortedTaskList:
- prereqJobURLs = [str(u) for u in rdfGraph.transitive_objects(URIRef(URL), pypeNS["prereq"])
- if isinstance(self._pypeObjects[str(u)], PypeTaskBase) and str(u) != URL ]
-
- prereqJobURLMap[URL] = prereqJobURLs
-
- logger.debug("Determined prereqs for %r to be %r" % (URL, ", ".join(prereqJobURLs)))
-
- if taskObj.nSlots > self.MAX_NUMBER_TASK_SLOT:
- raise TaskExecutionError("%s requests more %s task slots which is more than %d task slots allowed" %
- (str(URL), taskObj.nSlots, self.MAX_NUMBER_TASK_SLOT) )
-
- sleep_time = 0
- nSubmittedJob = 0
- usedTaskSlots = 0
- loopN = 0
- lastUpdate = None
- activeDataObjs = set() #keep a set of output data object. repeats are illegal.
- mutableDataObjs = set() #keep a set of mutable data object. a task will be delayed if a running task has the same output.
- updatedTaskURLs = set() #to avoid extra stat-calls
- failedJobCount = 0
- succeededJobCount = 0
- jobsReadyToBeSubmitted = []
-
- while 1:
-
- loopN += 1
- if not ((loopN - 1) & loopN):
- # exponential back-off for logging
- logger.info("tick: %d, #updatedTasks: %d, sleep_time=%f" %(loopN, len(updatedTaskURLs), sleep_time))
-
- for URL, taskObj, tStatus in sortedTaskList:
- if self.jobStatusMap[URL] != TaskInitialized:
- continue
- logger.debug(" #outputDataObjs: %d; #mutableDataObjs: %d" %(
- len(taskObj.outputDataObjs.values()),
- len(taskObj.mutableDataObjs.values()),
- ))
- prereqJobURLs = prereqJobURLMap[URL]
-
- logger.debug(' preqs of %s:' %URL)
- for u in prereqJobURLs:
- logger.debug(' %s: %s' %(self.jobStatusMap[u], u))
- if any(self.jobStatusMap[u] != "done" for u in prereqJobURLs):
- # Note: If self.jobStatusMap[u] raises, then the sorting was wrong.
- #logger.debug('Prereqs not done! %s' %URL)
- continue
- # Check for mutable collisions; delay task if any.
- outputCollision = False
- for dataObj in taskObj.mutableDataObjs.values():
- for fromTaskObjURL, mutableDataObjURL in mutableDataObjs:
- if dataObj.URL == mutableDataObjURL and taskObj.URL != fromTaskObjURL:
- logger.debug("mutable output collision detected for data object %r betw %r and %r" %(
- dataObj, dataObj.URL, mutableDataObjURL))
- outputCollision = True
- break
- if outputCollision:
- continue
- # Check for illegal collisions.
- if len(activeDataObjs) < 100:
- # O(n^2) on active tasks, but pretty fast.
- for dataObj in taskObj.outputDataObjs.values():
- for fromTaskObjURL, activeDataObjURL in activeDataObjs:
- if dataObj.URL == activeDataObjURL and taskObj.URL != fromTaskObjURL:
- raise Exception("output collision detected for data object %r betw %r and %r" %(
- dataObj, dataObj.URL, activeDataObjURL))
- # We use 'updatedTaskURLs' to short-circuit 'isSatisfied()', to avoid many stat-calls.
- # Note: Sorting should prevent FileNotExistError in isSatisfied().
- if not (set(prereqJobURLs) & updatedTaskURLs) and taskObj.isSatisfied():
- #taskObj.setStatus(pypeflow.task.TaskDone) # Safe b/c thread is not running yet, and all prereqs are done.
- logger.info(' Skipping already done task: %s' %(URL,))
- logger.debug(' (Status was %s)' %(self.jobStatusMap[URL],))
- taskObj.setStatus(TaskDone) # to avoid re-stat on subsequent call to refreshTargets()
- self.jobStatusMap[str(URL)] = TaskDone # to avoid re-stat on *this* call
- successfullTask = self._pypeObjects[URL]
- successfullTask.finalize()
- continue
- self.jobStatusMap[str(URL)] = "ready" # in case not all ready jobs are given threads immediately, to avoid re-stat
- jobsReadyToBeSubmitted.append( (URL, taskObj) )
- for dataObj in taskObj.outputDataObjs.values():
- logger.debug( "add active data obj: %s" %(dataObj,))
- activeDataObjs.add( (taskObj.URL, dataObj.URL) )
- for dataObj in taskObj.mutableDataObjs.values():
- logger.debug( "add mutable data obj: %s" %(dataObj,))
- mutableDataObjs.add( (taskObj.URL, dataObj.URL) )
-
- logger.debug( "#jobsReadyToBeSubmitted: %d" % len(jobsReadyToBeSubmitted) )
-
- numAliveThreads = self.thread_handler.alive(task2thread.values())
- logger.debug( "Total # of running threads: %d; alive tasks: %d; sleep=%f, loopN=%d" % (
- threading.activeCount(), numAliveThreads, sleep_time, loopN) )
- if numAliveThreads == 0 and len(jobsReadyToBeSubmitted) == 0 and self.messageQueue.empty():
- #TODO: better job status detection. messageQueue should be empty and all return condition should be "done", or "fail"
- logger.info( "_refreshTargets() finished with no thread running and no new job to submit" )
- for URL in task2thread:
- assert self.jobStatusMap[str(URL)] in ("done", "fail"), "status(%s)==%r" %(
- URL, self.jobStatusMap[str(URL)])
- break # End of loop!
-
- while jobsReadyToBeSubmitted:
- URL, taskObj = jobsReadyToBeSubmitted[0]
- numberOfEmptySlot = self.MAX_NUMBER_TASK_SLOT - usedTaskSlots
- logger.debug( "#empty_slots = %d/%d; #jobs_ready=%d" % (numberOfEmptySlot, self.MAX_NUMBER_TASK_SLOT, len(jobsReadyToBeSubmitted)))
- if numberOfEmptySlot >= taskObj.nSlots and numAliveThreads < self.CONCURRENT_THREAD_ALLOWED:
- t = thread(target = taskObj)
- t.start()
- task2thread[URL] = t
- nSubmittedJob += 1
- usedTaskSlots += taskObj.nSlots
- numAliveThreads += 1
- self.jobStatusMap[URL] = "submitted"
- # Note that we re-submit completed tasks whenever refreshTargets() is called.
- logger.debug("Submitted %r" %URL)
- logger.debug(" Details: %r" %taskObj)
- jobsReadyToBeSubmitted.pop(0)
- else:
- break
-
- time.sleep(sleep_time)
- if updateFreq != None:
- elapsedSeconds = updateFreq if lastUpdate==None else (datetime.datetime.now()-lastUpdate).seconds
- if elapsedSeconds >= updateFreq:
- self._update( elapsedSeconds )
- lastUpdate = datetime.datetime.now( )
-
- sleep_time = sleep_time + 0.1 if (sleep_time < 1) else 1
- while not self.messageQueue.empty():
- sleep_time = 0 # Wait very briefly while messages are coming in.
- URL, message = self.messageQueue.get()
- updatedTaskURLs.add(URL)
- self.jobStatusMap[str(URL)] = message
- logger.debug("message for %s: %r" %(URL, message))
-
- if message in ["done"]:
- successfullTask = self._pypeObjects[str(URL)]
- nSubmittedJob -= 1
- usedTaskSlots -= successfullTask.nSlots
- logger.info("Success (%r). Joining %r..." %(message, URL))
- task2thread[URL].join(timeout=10)
- #del task2thread[URL]
- succeededJobCount += 1
- successfullTask.finalize()
- for o in successfullTask.outputDataObjs.values():
- activeDataObjs.remove( (successfullTask.URL, o.URL) )
- for o in successfullTask.mutableDataObjs.values():
- mutableDataObjs.remove( (successfullTask.URL, o.URL) )
- elif message in ["fail"]:
- failedTask = self._pypeObjects[str(URL)]
- nSubmittedJob -= 1
- usedTaskSlots -= failedTask.nSlots
- logger.info("Failure (%r). Joining %r..." %(message, URL))
- task2thread[URL].join(timeout=10)
- #del task2thread[URL]
- failedJobCount += 1
- failedTask.finalize()
- for o in failedTask.outputDataObjs.values():
- activeDataObjs.remove( (failedTask.URL, o.URL) )
- for o in failedTask.mutableDataObjs.values():
- mutableDataObjs.remove( (failedTask.URL, o.URL) )
- elif message in ["started, runflag: 1"]:
- logger.info("Queued %s ..." %repr(URL))
- elif message in ["started, runflag: 0"]:
- logger.debug("Queued %s (already completed) ..." %repr(URL))
- raise Exception('It should not be possible to start an already completed task.')
- else:
- logger.warning("Got unexpected message %r from URL %r." %(message, URL))
-
- for u,s in sorted(self.jobStatusMap.items()):
- logger.debug("task status: %r, %r, used slots: %d" % (str(u),str(s), self._pypeObjects[str(u)].nSlots))
-
- if failedJobCount != 0 and (exitOnFailure or succeededJobCount == 0):
- raise TaskFailureError("Counted %d failure(s) with 0 successes so far." %failedJobCount)
-
-
- for u,s in sorted(self.jobStatusMap.items()):
- logger.debug("task status: %s, %r" % (u, s))
-
- self._runCallback(callback)
- if failedJobCount != 0:
- # Slightly different exception when !exitOnFailure.
- raise LateTaskFailureError("Counted a total of %d failure(s) and %d success(es)." %(
- failedJobCount, succeededJobCount))
- return True #TODO: There is no reason to return anything anymore.
-
- def _update(self, elapsed):
- """Can be overridden to provide timed updates during execution"""
- pass
-
- def _graphvizDot(self, shortName=False):
-
- graph = self._RDFGraph
- dotStr = StringIO()
- shapeMap = {"file":"box", "state":"box", "task":"component"}
- colorMap = {"file":"yellow", "state":"cyan", "task":"green"}
- dotStr.write( 'digraph "%s" {\n rankdir=LR;' % self.URL)
-
-
- for URL in self._pypeObjects.keys():
- URLParseResult = urlparse(URL)
- if URLParseResult.scheme not in shapeMap:
- continue
- else:
- shape = shapeMap[URLParseResult.scheme]
- color = colorMap[URLParseResult.scheme]
-
- s = URL
- if shortName == True:
- s = URLParseResult.scheme + "://..." + URLParseResult.path.split("/")[-1]
-
- if URLParseResult.scheme == "task":
- jobStatus = self.jobStatusMap.get(URL, None)
- if jobStatus != None:
- if jobStatus == "fail":
- color = 'red'
- elif jobStatus == "done":
- color = 'green'
- else:
- color = 'white'
-
- dotStr.write( '"%s" [shape=%s, fillcolor=%s, style=filled];\n' % (s, shape, color))
-
- for row in graph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)):
- s, o = row
- if shortName == True:
- s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1]
- o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1]
- dotStr.write( '"%s" -> "%s";\n' % (o, s))
- for row in graph.query('SELECT ?s ?o WHERE {?s pype:hasMutable ?o . }', initNs=dict(pype=pypeNS)):
- s, o = row
- if shortName == True:
- s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1]
- o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1]
- dotStr.write( '"%s" -- "%s" [arrowhead=both, style=dashed ];\n' % (s, o))
- dotStr.write ("}")
- return dotStr.getvalue()
-
-# For a class-method:
-PypeThreadWorkflow.setNumThreadAllowed = _PypeConcurrentWorkflow.setNumThreadAllowed
-PypeMPWorkflow.setNumThreadAllowed = _PypeConcurrentWorkflow.setNumThreadAllowed
-
-class _PypeThreadsHandler(object):
- """Stateless method delegator, for injection.
- """
- def create(self, target):
- thread = threading.Thread(target=target)
- thread.daemon = True # so it will terminate on exit
- return thread
- def alive(self, threads):
- return sum(thread.is_alive() for thread in threads)
- def join(self, threads, timeout):
- then = datetime.datetime.now()
- for thread in threads:
- assert thread is not threading.current_thread()
- if thread.is_alive():
- to = max(0, timeout - (datetime.datetime.now() - then).seconds)
- thread.join(to)
- def notifyTerminate(self, threads):
- """Assume these are daemon threads.
- We will attempt to join them all quickly, but non-daemon threads may
- eventually block the program from quitting.
- """
- self.join(threads, 1)
-
-class _PypeProcsHandler(object):
- """Stateless method delegator, for injection.
- """
- def create(self, target):
- proc = multiprocessing.Process(target=target)
- return proc
- def alive(self, procs):
- return sum(proc.is_alive() for proc in procs)
- def join(self, procs, timeout):
- then = datetime.datetime.now()
- for proc in procs:
- if proc.is_alive():
- proc.join((datetime.datetime.now() - then).seconds)
- def notifyTerminate(self, procs):
- """This can orphan sub-processes.
- """
- for proc in procs:
- if proc.is_alive():
- proc.terminate()
-
-
-def defaultOutputTemplate(fn):
- return fn + ".out"
-
-def applyFOFN( task_fun = None,
- fofonFileName = None,
- outTemplateFunc = defaultOutputTemplate,
- nproc = 8 ):
-
- tasks = getFOFNMapTasks( FOFNFileName = fofonFileName,
- outTemplateFunc = outTemplateFunc,
- TaskType=PypeThreadTaskBase,
- parameters = dict(nSlots = 1))( task_fun )
-
- wf = PypeThreadWorkflow()
- wf.CONCURRENT_THREAD_ALLOWED = nproc
- wf.MAX_NUMBER_TASK_SLOT = nproc
- wf.addTasks(tasks)
- wf.refreshTargets(exitOnFailure=False)
-
-
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
diff --git a/pypeFLOW/pypeflow/data.py b/pypeFLOW/pypeflow/data.py
deleted file mode 100644
index 43a5dc2..0000000
--- a/pypeFLOW/pypeflow/data.py
+++ /dev/null
@@ -1,315 +0,0 @@
-
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeData: This module defines the general interface and class for PypeData Objects.
-
-"""
-
-
-from urlparse import urlparse, urljoin
-import platform
-import os, shutil
-from common import pypeNS, PypeObject, PypeError, NotImplementedError
-import logging
-
-logger = logging.getLogger(__name__)
-
-class FileNotExistError(PypeError):
- pass
-
-class TypeMismatchError(PypeError):
- pass
-
-def fn(obj):
- return obj.localFileName
-
-class PypeDataObjectBase(PypeObject):
-
- """
- Represent the common interface for a PypeData object.
- """
-
- def __init__(self, URL, **attributes):
- PypeObject.__init__(self, URL, **attributes)
- self.verification = []
- self._mutatble = False
-
- @property
- def timeStamp(self):
- raise NotImplementedError, self.__expr__()
-
- @property
- def isMutable(self):
- return self._mutatble
-
- @property
- def exists(self):
- raise NotImplementedError
-
- def addVerifyFunction( self, verifyFunction ):
- self.verification.append( verifyFunction )
-
- def __str__( self ):
- return self.URL
-
- def _updateURL(self, newURL):
- super(PypeDataObjectBase, self)._updateURL(newURL)
- self._updatePath()
-
- def _updatePath(self):
- URLParseResult = urlparse(self.URL)
- self.localFileName = URLParseResult.path
- self._path = self.localFileName #for local file, _path is the same as full local file name
-
-class PypeLocalFile(PypeDataObjectBase):
-
- """
- Represent a PypeData object that can be accessed as a file in a local
- filesystem.
-
- >>> f = PypeLocalFile("file://localhost/test.txt")
- >>> f.localFileName == "/test.txt"
- True
- >>> fn(f)
- '/test.txt'
- >>> f = PypeLocalFile("file://localhost/test.txt", False, isFasta = True)
- >>> f.isFasta == True
- True
- """
- def __repr__(self): return "PypeLocalFile(%r, %r)" %(self.URL, self._path)
- supportedURLScheme = ["file", "state"]
- def __init__(self, URL, readOnly = False, **attributes):
- PypeDataObjectBase.__init__(self, URL, **attributes)
- self._updatePath()
- self.readOnly = readOnly
- self._mutable = attributes.get("mutable", False)
-
- @property
- def timeStamp(self):
- if not os.path.exists(self.localFileName):
- logger.warning('failed: os.path.exists("%s")\n%s'%(
- self.localFileName,repr(os.listdir(os.path.dirname(self.localFileName)))))
- raise FileNotExistError("No such file:%r on %r" % (self.localFileName, platform.node()) )
- return os.stat(self.localFileName).st_mtime
-
- @property
- def exists(self):
- return os.path.exists(self.localFileName)
-
- def verify(self):
- logger.debug("Verifying contents of %s" % self.URL)
- # Get around the NFS problem
- os.listdir(os.path.dirname(self.path))
-
- errors = [ ]
- for verifyFn in self.verification:
- try:
- errors.extend( verifyFn(self.path) )
- except Exception, e:
- errors.append( str(e) )
- if len(errors) > 0:
- for e in errors:
- logger.error(e)
- return errors
-
- @property
- def path(self):
- if self._path == None:
- raise IOError, "Must resolve this file (%s) with a context " + \
- "before you can access .path"
- return self._path
-
- def clean(self):
- if os.path.exists( self.path ):
- logger.info("Removing %s" % self.path )
- if os.path.isdir( self.path ):
- shutil.rmtree( self.path )
- else:
- os.remove( self.path )
-
-class PypeHDF5Dataset(PypeDataObjectBase): #stub for now Mar 17, 2010
-
- """
- Represent a PypeData object that is an HDF5 dataset.
- Not implemented yet.
- """
-
- supportedURLScheme = ["hdf5ds"]
- def __init__(self, URL, readOnly = False, **attributes):
- PypeDataObjectBase.__init__(self, URL, **attributes)
- URLParseResult = urlparse(URL)
- self.localFileName = URLParseResult.path
- #the rest of the URL goes to HDF5 DS
-
-
-class PypeLocalFileCollection(PypeDataObjectBase): #stub for now Mar 17, 2010
-
- """
- Represent a PypeData object that is a composition of multiple files.
- It will provide a container that allows the tasks to choose one or all file to
- process.
- """
-
- supportedURLScheme = ["files"]
- def __init__(self, URL, readOnly = False, select = 1, **attributes):
- """
- currently we only support select = 1,
- namely, we only pass the first file add to the collection to the tasks
- """
- PypeDataObjectBase.__init__(self, URL, **attributes)
- URLParseResult = urlparse(URL)
- self.compositedDataObjName = URLParseResult.path
- self.localFileName = None
- self._path = None
- self.readOnly = readOnly
- self.verification = []
- self.localFiles = [] # a list of all files within the obj
- self.select = select
-
- def addLocalFile(self, pLocalFile):
- if not isinstance(pLocalFile, PypeLocalFile):
- raise TypeMismatchError, "only PypeLocalFile object can be added into PypeLocalFileColletion"
- self.localFiles.append(pLocalFile)
- if self.select == 1:
- self.localFileName = self.localFiles[0].localFileName
- self._path = self.localFileName
-
- @property
- def timeStamp(self):
- if self.localFileName == None:
- raise PypeError, "No PypeLocalFile is added into the PypeLocalFileColletion yet"
- if not os.path.exists(self.localFileName):
- raise FileNotExistError("No such file:%s on %s" % (self.localFileName, platform.node()) )
- return os.stat(self.localFileName).st_mtime
-
- @property
- def exists(self):
- if self.localFileName == None:
- raise PypeError, "No PypeLocalFile is added into the PypeLocalFileColletion yet"
- return os.path.exists(self.localFileName)
-
-
-class PypeSplittableLocalFile(PypeDataObjectBase):
- """
- Represent a PypeData object that has two different local file
- (1) the whole file (could be a virtual one)
- (2) the split files
-
- * Such data object can have either a scatter task attached or a gather task attached.
- * If a scatter task is attached, the task will be inserted to generate the scattered files.
- * If a gather task is attached, the task will be inserted to generate the whole file.
- * If neither scatter task nor gather task is specified, then the file is mostly like interme
- Namely, the whole file representation is not used any place else.
- * One can not specify scatter task and gather task for the same object since it will create
- """
- supportedURLScheme = ["splittablefile"]
-
- def __init__(self, URL, readOnly = False, nChunk = 1, **attributes):
- PypeDataObjectBase.__init__(self, URL, **attributes)
- self._updatePath()
- self.readOnly = readOnly
- self.verification = []
- self._scatterTask = None
- self._gatherTask = None
- self._splittedFiles = []
- self.nChunk = nChunk
-
- URLParseResult = urlparse(self.URL)
- cfURL = "file://%s%s" % (URLParseResult.netloc, URLParseResult.path)
-
- self._completeFile = PypeLocalFile(cfURL, readOnly, **attributes)
-
- dirname, basename = os.path.split(self._path)
-
- for i in range(nChunk):
- chunkBasename = "%03d_%s" % (i, basename)
- if dirname != "":
- chunkURL = "file://%s%s/%s" % (URLParseResult.netloc, dirname, chunkBasename)
- else:
- chunkURL = "file://%s/%s" % (URLParseResult.netloc, chunkBasename)
-
- subfile = PypeLocalFile(chunkURL, readOnly, **attributes)
- self._splittedFiles.append(subfile)
-
- def setGatherTask(self, TaskCreator, TaskType, function):
- assert self._scatterTask == None
- inputDataObjs = dict( ( ("subfile%03d" % c[0], c[1])
- for c in enumerate(self._splittedFiles) ) )
- outputDataObjs = {"completeFile": self._completeFile}
- gatherTask = TaskCreator( inputDataObjs = inputDataObjs,
- outputDataObjs = outputDataObjs,
- URL = "task://gather/%s" % self._path ,
- TaskType=TaskType) ( function )
- self._gatherTask = gatherTask
-
- def setScatterTask(self, TaskCreator, TaskType, function):
- assert self._gatherTask == None
- outputDataObjs = dict( ( ("subfile%03d" % c[0], c[1])
- for c in enumerate(self._splittedFiles) ) )
- inputDataObjs = {"completeFile": self._completeFile}
- scatterTask = TaskCreator( inputDataObjs = inputDataObjs,
- outputDataObjs = outputDataObjs,
- URL = "task://scatter/%s" % self._path ,
- TaskType=TaskType) ( function )
- self._scatterTask = scatterTask
-
- def getGatherTask(self):
- return self._gatherTask
-
- def getScatterTask(self):
- return self._scatterTask
-
- def getSplittedFiles(self):
- return self._splittedFiles
-
- @property
- def timeStamp(self):
- return self._completeFile.timeStamp
-
-def makePypeLocalFile(aLocalFileName, readOnly = False, scheme="file", **attributes):
- """
- >>> f = makePypeLocalFile("/tmp/test.txt")
- >>> f.localFileName == "/tmp/test.txt"
- True
- >>> fn(f)
- '/tmp/test.txt'
- """
- aLocalFileName = os.path.abspath(aLocalFileName)
-
- #if aLocalFileName.startswith("/"):
- # aLocalFileName.lstrip("/")
-
- return PypeLocalFile("%s://localhost%s" % (scheme, aLocalFileName), readOnly, **attributes)
-
-def makePypeLocalStateFile(stateName, readOnly = False, **attributes):
- dirname, basename = os.path.split(stateName)
- stateFileName = os.path.join(dirname, "."+basename)
- return makePypeLocalFile( stateFileName, readOnly = readOnly, scheme = "state", **attributes)
-
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
-
diff --git a/pypeFLOW/pypeflow/pwatcher_bridge.py b/pypeFLOW/pypeflow/pwatcher_bridge.py
deleted file mode 100644
index d8b78c8..0000000
--- a/pypeFLOW/pypeflow/pwatcher_bridge.py
+++ /dev/null
@@ -1,391 +0,0 @@
-"""Bridge pattern to adapt pypeFLOW with pwatcher.
-
-This is a bit messy, but it avoids re-writing the useful bits
-of pypeFLOW.
-
-With PypeProcWatcherWorkflow, the refreshTargets() loop will
-be single-threaded!
-"""
-from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase, TaskFunctionError
-from pypeflow.data import fn
-import pwatcher.blocking
-import pwatcher.fs_based
-import pwatcher.network_based
-import pypeflow.controller
-import pypeflow.task
-import collections
-import datetime
-import glob
-import hashlib
-import json
-import logging
-import os
-import pprint
-import re
-import sys
-import time
-import traceback
-
-log = logging.getLogger(__name__)
-
-def PypeProcWatcherWorkflow(
- URL = None,
- job_type='local',
- job_queue='UNSPECIFIED_QUEUE',
- watcher_type='fs_based',
- watcher_directory='mypwatcher',
- max_jobs=None, # ignore here for now
- **attributes):
- """Factory for the workflow using our new
- filesystem process watcher.
- """
- if watcher_type == 'blocking':
- pwatcher_impl = pwatcher.blocking
- elif watcher_type == 'network_based':
- pwatcher_impl = pwatcher.network_based
- else:
- pwatcher_impl = pwatcher.fs_based
- log.warning('In pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
- log.info('In pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
- watcher = pwatcher_impl.get_process_watcher(watcher_directory)
- log.info('job_type={!r}, job_queue={!r}'.format(job_type, job_queue))
- th = MyPypeFakeThreadsHandler(watcher, job_type, job_queue)
- mq = MyMessageQueue()
- se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)
- return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
- attributes=attributes)
-
-PypeProcWatcherWorkflow.setNumThreadAllowed = pypeflow.controller._PypeConcurrentWorkflow.setNumThreadAllowed
-
-class Fred(object):
- """Fake thread.
- """
- INITIALIZED = 10
- STARTED = 20
- RUNNING = 30
- JOINED = 40
- def is_alive(self):
- return self.__status in (Fred.STARTED, Fred.RUNNING)
- def start(self):
- self.__status = Fred.STARTED
- self.__th.enqueue(self)
- def join(self, timeout=None):
- # Maybe we should wait until the sentinel is visible in the filesystem?
- # Also, if we were STARTED but not RUNNING, then we never did anything!
-
- #assert self.__status is not Fred.JOINED # Nope. Might be called a 2nd time by notifyTerminate().
- self.__status = Fred.JOINED
- # And our own special methods.
- def task(self):
- return self.__target
- def generate(self):
- self.__target()
- def setTargetStatus(self, status):
- self.__target.setStatus(status)
- def endrun(self, status):
- """By convention for now, status is one of:
- 'DEAD'
- 'UNSUBMITTED' (a pseudo-status defined in the ready-loop of alive())
- 'EXIT rc'
- """
- name = status.split()[0]
- if name == 'DEAD':
- log.warning(''.join(traceback.format_stack()))
- log.error('Task {}\n is DEAD, meaning no HEARTBEAT, but this can be a race-condition. If it was not killed, then restarting might suffice. Otherwise, you might have excessive clock-skew.'.format(self.brief()))
- self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better
- elif name == 'UNSUBMITTED':
- log.warning(''.join(traceback.format_stack()))
- log.error('Task {}\n is UNSUBMITTED, meaning job-submission somehow failed. Possibly a re-start would work. Otherwise, you need to investigate.'.format(self.brief()))
- self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better
- elif name != 'EXIT':
- raise Exception('Unexpected status {!r}'.format(name))
- else:
- code = int(status.split()[1])
- if 0 == code:
- self.__target.check_missing()
- # TODO: If missing, just leave the status as TaskInitialized?
- else:
- log.error('Task {} failed with exit-code={}'.format(self.brief(), code))
- self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better
- self.__target.finish()
- def brief(self):
- return 'Fred{}'.format(self.__target.brief())
- def __repr__(self):
- return 'FRED with taskObj={!r}'.format(self.__target)
- def __init__(self, target, th):
- assert isinstance(target, MyFakePypeThreadTaskBase)
- self.__target = target # taskObj?
- self.__th = th # thread handler
- self.__status = Fred.INITIALIZED
-
-class MyMessageQueue(object):
- def empty(self):
- return not self.__msgq
- def get(self):
- return self.__msgq.popleft()
- def put(self, msg):
- self.__msgq.append(msg)
- def __init__(self):
- self.__msgq = collections.deque()
-
-class MyFakeShutdownEvent(object):
- """I do not see any code which actually uses the
- shutdown_event, but if needed someday, we can use this.
- """
- def set(self):
- pass
-
-_prev_q = {} # To avoid excessive log output.
-
-class MyPypeFakeThreadsHandler(object):
- """Stateless method delegator, for injection.
- """
- def create(self, target):
- thread = Fred(target=target, th=self)
- return thread
- def alive(self, threads):
- ready = dict()
- while self.__jobq:
- fred = self.__jobq.popleft()
- taskObj = fred.task()
- fred.generate() # -> taskObj->generated_script_fn by convention
- #log.info('param:\n%s' %pprint.pformat(taskObj.parameters)) # I do not think these change.
- try:
- script_fn = taskObj.generated_script_fn # BY CONVENTION
- except AttributeError:
- log.warning('Missing taskObj.generated_script_fn for task. Maybe we did not need it? Skipping and continuing.')
- fred.endrun('EXIT 0')
- continue
- log.info('script_fn:%s' %repr(script_fn))
- content = open(script_fn).read()
- digest = hashlib.sha256(content).hexdigest()
- jobid = 'J{}'.format(digest)
- log.info('jobid=%s' %jobid)
- taskObj.jobid = jobid
- ready[jobid] = fred
- self.__known[jobid] = fred
- if ready:
- # Start anything in the 'ready' queue.
- # Note: It is safe to run this block always, but we save a
- # call to pwatcher with 'if ready'.
- log.debug('ready dict keys:\n%s' %pprint.pformat(ready.keys()))
- jobids = dict()
- #sge_option='-pe smp 8 -q default'
- for jobid, fred in ready.iteritems():
- generated_script_fn = fred.task().generated_script_fn
- rundir, basename = os.path.split(os.path.abspath(generated_script_fn))
- cmd = '/bin/bash {}'.format(basename)
- sge_option = fred.task().parameters.get('sge_option', None)
- job_type = fred.task().parameters.get('job_type', None)
- job_queue = fred.task().parameters.get('job_queue', None)
- job_nprocs = fred.task().parameters.get('job_nprocs', None)
- jobids[jobid] = {
- 'cmd': cmd,
- 'rundir': rundir,
- # These are optional:
- 'job_type': job_type,
- 'job_queue': job_queue,
- 'job_nprocs': job_nprocs,
- 'sge_option': sge_option,
- }
- # Also send the default type and queue-name.
- watcher_args = {
- 'jobids': jobids,
- 'job_type': self.__job_type,
- 'job_queue': self.__job_queue,
- }
- result = self.watcher.run(**watcher_args)
- log.debug('Result of watcher.run()={}'.format(repr(result)))
- submitted = result['submitted']
- self.__running.update(submitted)
- #log.info("QQQ ADDED: {}".format(jobid))
- for jobid in set(jobids.keys()) - set(submitted):
- fred = ready[jobid]
- fred.endrun('UNSUBMITTED')
-
- watcher_args = {
- 'jobids': list(self.__running),
- 'which': 'list',
- }
- q = self.watcher.query(**watcher_args)
- #log.debug('In alive(), result of query:%s' %repr(q))
- global _prev_q
- if q != _prev_q:
- #log.debug('In alive(), updated result of query:%s' %repr(q))
- _prev_q = q
- _prev_q = None
- for jobid, status in q['jobids'].iteritems():
- if status.startswith('EXIT') or status.startswith('DEAD'):
- self.__running.remove(jobid)
- #log.info("QQQ REMOVED: {}".format(jobid))
- fred = self.__known[jobid]
- try:
- fred.endrun(status)
- except Exception as e:
- msg = 'Failed to clean-up FakeThread: jobid={} status={}'.format(jobid, repr(status))
- log.exception(msg)
- raise
- #log.info('len(jobq)==%d' %len(self.__jobq))
- #log.info(''.join(traceback.format_stack()))
- return sum(thread.is_alive() for thread in threads)
- def join(self, threads, timeout):
- then = datetime.datetime.now()
- for thread in threads:
- #assert thread is not threading.current_thread()
- if thread.is_alive():
- to = max(0, timeout - (datetime.datetime.now() - then).seconds)
- # This is called only in the refreshTargets() catch, so
- # it can simply terminate all threads.
- thread.join(to)
- self.notifyTerminate(threads)
- def notifyTerminate(self, threads):
- """Assume these are daemon threads.
- We will attempt to join them all quickly, but non-daemon threads may
- eventually block the program from quitting.
- """
- pass #self.join(threads, 1)
- # TODO: Terminate only the jobs for 'threads'.
- # For now, use 'known' instead of 'infer' b/c we
- # also want to kill merely queued jobs, though that is currently difficult.
- watcher_args = {
- 'jobids': list(self.__running),
- 'which': 'known',
- }
- q = self.watcher.delete(**watcher_args)
- log.debug('In notifyTerminate(), result of delete:%s' %repr(q))
-
-
- # And our special methods.
- def enqueue(self, fred):
- self.__jobq.append(fred)
- def __init__(self, watcher, job_type, job_queue=None):
- """
- job_type and job_queue are defaults, possibly over-ridden for specific jobs.
- Note: job_queue is a string, not a collection. If None, then it would need to
- come via per-job settings.
- """
- self.watcher = watcher
- self.__job_type = job_type
- self.__job_queue = job_queue
- self.__jobq = collections.deque()
- self.__running = set()
- self.__known = dict()
-
-def makedirs(path):
- if not os.path.isdir(path):
- log.debug('makedirs {!r}'.format(path))
- os.makedirs(path)
-
-class MyFakePypeThreadTaskBase(PypeThreadTaskBase):
- """Fake for PypeConcurrentWorkflow.
- It demands a subclass, even though we do not use threads at all.
- Here, we override everything that it overrides. PypeTaskBase defaults are fine.
- """
- @property
- def nSlots(self):
- """(I am not sure what happend if > 1, but we will not need that. ~cdunn)
- Return the required number of slots to run, total number of slots is determined by
- PypeThreadWorkflow.MAX_NUMBER_TASK_SLOT, increase this number by passing desired number
- through the "parameters" argument (e.g parameters={"nSlots":2}) to avoid high computationa
- intensive job running concurrently in local machine One can set the max number of thread
- of a workflow by PypeThreadWorkflow.setNumThreadAllowed()
- """
- try:
- nSlots = self.parameters["nSlots"]
- except AttributeError:
- nSlots = 1
- except KeyError:
- nSlots = 1
- return nSlots
-
- def setMessageQueue(self, q):
- self._queue = q
-
- def setShutdownEvent(self, e):
- self.shutdown_event = e
-
- def __call__(self, *argv, **kwargv):
- """Trap all exceptions, set fail flag, SEND MESSAGE, log, and re-raise.
- """
- try:
- return self.runInThisThread(*argv, **kwargv)
- except: # and re-raise
- #log.exception('%s __call__ failed:\n%r' %(type(self).__name__, self))
- self._status = pypeflow.task.TaskFail # TODO: Do not touch internals of base class.
- self._queue.put( (self.URL, pypeflow.task.TaskFail) )
- raise
-
- def runInThisThread(self, *argv, **kwargv):
- """
- Similar to the PypeTaskBase.run(), but it provide some machinary to pass information
- back to the main thread that run this task in a sepearated thread through the standard python
- queue from the Queue module.
-
- We expect this to be used *only* for tasks which generate run-scripts.
- Our version does not actually run the script. Instead, we expect the script-filename to be returned
- by run().
- """
- if self._queue == None:
- raise Exception('There seems to be a case when self.queue==None, so we need to let this block simply return.')
- self._queue.put( (self.URL, "started, runflag: %d" % True) )
- return self.run(*argv, **kwargv)
-
- # We must override this from PypeTaskBase b/c we do *not* produce outputs
- # immediately.
- def run(self, *argv, **kwargv):
- argv = list(argv)
- argv.extend(self._argv)
- kwargv.update(self._kwargv)
-
- inputDataObjs = self.inputDataObjs
- self.syncDirectories([o.localFileName for o in inputDataObjs.values()])
-
- outputDataObjs = self.outputDataObjs
- for datao in outputDataObjs.values():
- makedirs(os.path.dirname(fn(datao)))
- parameters = self.parameters
-
- log.info('Running task from function %s()' %(self._taskFun.__name__))
- rtn = self._runTask(self, *argv, **kwargv)
-
- if self.inputDataObjs != inputDataObjs or self.parameters != parameters:
- raise TaskFunctionError("The 'inputDataObjs' and 'parameters' should not be modified in %s" % self.URL)
- # Jason, note that this only tests whether self.parameters was rebound.
- # If it is altered, then so is parameters, so the check would pass.
- # TODO(CD): What is the importance of this test? Should it be fixed or deleted?
-
- return True # to indicate that it run, since we no longer rely on runFlag
-
- def check_missing(self):
- timeout_s = 30
- sleep_s = .1
- self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()]) # Sometimes helps in NFS.
- lastUpdate = datetime.datetime.now()
- while timeout_s > (datetime.datetime.now()-lastUpdate).seconds:
- missing = [(k,o) for (k,o) in self.outputDataObjs.iteritems() if not o.exists]
- if missing:
- log.debug("%s failed to generate all outputs; %s; missing:\n%s" %(
- self.URL, repr(self._status),
- pprint.pformat(missing),
- ))
- #import commands
- #cmd = 'pstree -pg -u cdunn'
- #output = commands.getoutput(cmd)
- #log.debug('`%s`:\n%s' %(cmd, output))
- dirs = set(os.path.dirname(o.localFileName) for o in self.outputDataObjs.itervalues())
- for d in dirs:
- log.debug('listdir(%s): %s' %(d, repr(os.listdir(d))))
- #self._status = pypeflow.task.TaskFail
- time.sleep(sleep_s)
- sleep_s *= 2.0
- else:
- self._status = pypeflow.task.TaskDone
- break
- else:
- log.info('timeout(%ss) in check_missing()' %timeout_s)
- self._status = pypeflow.task.TaskFail
-
- # And our own special methods.
- def finish(self):
- self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()])
- self._queue.put( (self.URL, self._status) )
diff --git a/pypeFLOW/pypeflow/pwatcher_workflow.py b/pypeFLOW/pypeflow/pwatcher_workflow.py
new file mode 100644
index 0000000..0ca0384
--- /dev/null
+++ b/pypeFLOW/pypeflow/pwatcher_workflow.py
@@ -0,0 +1,9 @@
+from .simple_pwatcher_bridge import (
+ PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
+ makePypeLocalFile, fn, PypeTask)
+PypeThreadTaskBase = MyFakePypeThreadTaskBase
+
+__all__ = [
+ 'PypeProcWatcherWorkflow', 'PypeThreadTaskBase',
+ 'makePypeLocalFile', 'fn', 'PypeTask',
+]
diff --git a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
index d1c7467..3b2fe59 100644
--- a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
+++ b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
@@ -77,6 +77,9 @@ class PwatcherTaskQueue(object):
rundir, basename = os.path.split(os.path.abspath(generated_script_fn))
cmd = '/bin/bash {}'.format(basename)
+ LOG.debug('In rundir={!r}, sge_option={!r}, __sge_option={!r}'.format(
+ rundir,
+ node.pypetask.parameters.get('sge_option'), self.__sge_option))
sge_option = node.pypetask.parameters.get('sge_option', self.__sge_option)
job_type = node.pypetask.parameters.get('job_type', None)
job_queue = node.pypetask.parameters.get('job_queue', None)
@@ -241,13 +244,13 @@ class Workflow(object):
len(unsubmitted), len(to_submit), unsubmitted))
#ready.update(unsubmitted) # Resubmit only in pwatcher, if at all.
submitted.update(to_submit - unsubmitted)
- LOG.debug('N in queue: {}'.format(len(submitted)))
+ LOG.debug('N in queue: {} (max_jobs={})'.format(len(submitted), self.max_jobs))
recently_done = set(self.tq.check_done())
if not recently_done:
if not submitted:
LOG.warning('Nothing is happening, and we had {} failures. Should we quit? Instead, we will just sleep.'.format(failures))
#break
- LOG.info('sleep {}'.format(sleep_time))
+ LOG.info('sleep {}s'.format(sleep_time))
time.sleep(sleep_time)
sleep_time = sleep_time + 0.1 if (sleep_time < updateFreq) else updateFreq
continue
@@ -430,8 +433,8 @@ def find_work_dir(paths):
"""
dirnames = set(os.path.dirname(os.path.normpath(p)) for p in paths)
if len(dirnames) != 1:
- raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {!r}'.format(
- paths))
+ raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {} in {}'.format(
+ pprint.pformat(paths), pprint.pformat(dirnames)))
d = dirnames.pop()
return os.path.abspath(d)
class PypeLocalFile(object):
@@ -554,13 +557,12 @@ def PypeProcWatcherWorkflow(
LOG.warning('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
LOG.info('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
watcher = pwatcher_impl.get_process_watcher(watcher_directory)
- LOG.info('job_type={!r}, job_queue={!r}'.format(job_type, job_queue))
+ LOG.info('job_type={!r}, job_queue={!r}, sge_option={!r}'.format(job_type, job_queue, sge_option))
return Workflow(watcher, job_type=job_type, job_queue=job_queue, max_jobs=max_jobs, sge_option=sge_option)
#th = MyPypeFakeThreadsHandler('mypwatcher', job_type, job_queue)
#mq = MyMessageQueue()
#se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)
#return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
# attributes=attributes)
-PypeProcWatcherWorkflow.setNumThreadAllowed = lambda x, y: None
__all__ = ['PypeProcWatcherWorkflow', 'fn', 'makePypeLocalFile', 'MyFakePypeThreadTaskBase', 'PypeTask']
diff --git a/pypeFLOW/pypeflow/task.py b/pypeFLOW/pypeflow/task.py
deleted file mode 100644
index 32564ab..0000000
--- a/pypeFLOW/pypeflow/task.py
+++ /dev/null
@@ -1,892 +0,0 @@
-
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeTask: This module provides the PypeTask class and the decorators that can convert
-a regular python funtion into a PypeTask instance.
-
-"""
-
-import pprint
-import inspect
-import hashlib
-import logging
-import copy
-import sys
-import time
-
-PYTHONVERSION = sys.version_info[:2]
-if PYTHONVERSION == (2,5):
- import simplejson as json
-else:
- import json
-
-import os
-import shlex
-
-from common import PypeError, PypeObject, pypeNS, runShellCmd, Graph, URIRef, Literal
-from data import FileNotExistError, PypeSplittableLocalFile, makePypeLocalFile
-
-logger = logging.getLogger(__name__)
-
-class TaskFunctionError(PypeError):
- pass
-
-# These must be strings.
-TaskInitialized = "TaskInitialized"
-TaskDone = "done"
-TaskFail = "fail"
-# TODO(CD): Make user-code compare by variable name.
-
-class PypeTaskBase(PypeObject):
- """
- Represent a PypeTask. Subclass it to for different kind of
- task.
- """
-
- supportedURLScheme = ["task"]
-
- def __init__(self, URL, *argv, **kwargv):
-
- """
- Constructor of a PypeTask.
- """
-
- PypeObject.__init__(self, URL, **kwargv)
-
- self._argv = argv
- self._kwargv = kwargv
- self._taskFun = kwargv['_taskFun']
- self._referenceMD5 = None
- self._status = TaskInitialized
- self._queue = None
- self.shutdown_event = None
-
-
- for defaultAttr in ["inputDataObjs", "outputDataObjs", "parameters", "mutableDataObjs"]:
- if defaultAttr not in self.__dict__:
- self.__dict__[defaultAttr] = {}
-
- # "input" and "output" short cut
- if "inputs" in kwargv:
- self.inputDataObjs.update(kwargv["inputs"])
- del kwargv["inputs"]
-
- if "outputs" in kwargv:
- self.outputDataObjs.update(kwargv["outputs"])
- del kwargv["outputs"]
-
- if "mutables" in kwargv:
- self.mutableDataObjs.update(kwargv["mutables"])
- del kwargv["mutables"]
-
- #the keys in inputDataObjs/outputDataObjs/parameters will become a task attribute
- for defaultAttr in ["inputDataObjs", "outputDataObjs", "mutableDataObjs", "parameters"]:
- vars(self).update(self.__dict__[defaultAttr])
-
- if "chunk_id" in kwargv:
- self.chunk_id = kwargv["chunk_id"]
-
- self._codeMD5digest = kwargv.get("_codeMD5digest", "")
- self._paramMD5digest = kwargv.get("_paramMD5digest", "")
- self._compareFunctions = kwargv.get("_compareFunctions", [ timeStampCompare ])
-
- for o in self.outputDataObjs.values():
- if o.readOnly == True:
- raise PypeError, "Cannot assign read only data object %s for task %s" % (o.URL, self.URL)
-
- @property
- def status(self):
- return self._status
-
- def setInputs( self, inputDataObjs ):
- self.inputDataObjs = inputDataObjs
- vars(self).update( inputDataObjs )
-
- def setOutputs( self, outputDataObjs ):
- self.outputDataObjs = outputDataObjs
- vars(self).update( outputDataObjs )
-
- def setReferenceMD5(self, md5Str):
- self._referenceMD5 = md5Str
-
- def _getRunFlag(self):
- """Determine whether the PypeTask should be run. It can be overridden in
- subclass to allow more flexible rules.
- """
- runFlag = False
- if self._referenceMD5 is not None and self._referenceMD5 != self._codeMD5digest:
- self._referenceMD5 = self._codeMD5digest
- # Code has changed.
- return True
- return any( [ f(self.inputDataObjs, self.outputDataObjs, self.parameters) for f in self._compareFunctions] )
-
- def isSatisfied(self):
- """Compare dependencies. (Kinda expensive.)
- Note: Do not call this while the task is actually running!
- """
- return not self._getRunFlag()
-
- def getStatus(self):
- """
- Note: Do not call this while the task is actually running!
- """
- return self._status
-
- def setStatus(self, status):
- """
- Note: Do not call this while the task is actually running!
- """
- assert status in (TaskInitialized, TaskDone, TaskFail)
- self._status = status
-
- def _runTask(self, *argv, **kwargv):
- """
- The method to run the decorated function _taskFun(). It is called through run() of
- the PypeTask object and it should never be called directly
-
- TODO: the arg porcessing is still a mess, need to find a better way to do this
- """
- if PYTHONVERSION == (2,5): #TODO(CD): Does this even work anymore?
- (args, varargs, varkw, defaults) = inspect.getargspec(self._taskFun)
- #print (args, varargs, varkw, defaults)
- else:
- argspec = inspect.getargspec(self._taskFun)
- (args, varargs, varkw, defaults) = argspec.args, argspec.varargs, argspec.keywords, argspec.defaults
-
- if varkw != None:
- return self._taskFun(self, *argv, **kwargv)
- elif varargs != None:
- return self._taskFun(self, *argv)
- elif len(args) != 0:
- nkwarg = {}
- if defaults != None:
- defaultArg = args[-len(defaults):]
- for a in defaultArg:
- nkwarg[a] = kwargv[a]
- return self._taskFun(self, *argv, **nkwarg)
- else:
- return self._taskFun(self)
- else:
- return self._taskFun(self)
-
- @property
- def _RDFGraph(self):
- graph = Graph()
- for k,v in self.__dict__.iteritems():
- if k == "URL": continue
- if k[0] == "_": continue
- if k in ["inputDataObjs", "outputDataObjs", "mutableDataObjs", "parameters"]:
- if k == "inputDataObjs":
- for ft, f in v.iteritems():
- graph.add( (URIRef(self.URL), pypeNS["prereq"], URIRef(f.URL) ) )
- elif k == "outputDataObjs":
- for ft, f in v.iteritems():
- graph.add( (URIRef(f.URL), pypeNS["prereq"], URIRef(self.URL) ) )
- elif k == "mutableDataObjs":
- for ft, f in v.iteritems():
- graph.add( (URIRef(self.URL), pypeNS["hasMutable"], URIRef(f.URL) ) )
- elif k == "parameters":
- graph.add( (URIRef(self.URL), pypeNS["hasParameters"], Literal(json.dumps(v)) ) )
-
- continue
-
- if k in self.inputDataObjs:
- graph.add( ( URIRef(self.URL), pypeNS["inputDataObject"], URIRef(v.URL) ) )
- continue
-
- if k in self.outputDataObjs:
- graph.add( ( URIRef(self.URL), pypeNS["outputDataObject"], URIRef(v.URL) ) )
- continue
-
- if k in self.mutableDataObjs:
- graph.add( ( URIRef(self.URL), pypeNS["mutableDataObject"], URIRef(v.URL) ) )
- continue
-
- if hasattr(v, "URL"):
- graph.add( ( URIRef(self.URL), pypeNS[k], URIRef(v.URL) ) )
-
- graph.add( ( URIRef(self.URL), pypeNS["codeMD5digest"], Literal(self._codeMD5digest) ) )
- graph.add( ( URIRef(self.URL), pypeNS["parameterMD5digest"], Literal(self._paramMD5digest) ) )
-
- return graph
-
- def __call__(self, *argv, **kwargv):
- """Trap all exceptions, set fail flag, log, and re-raise.
- If you need to do more, then over-ride this method.
- """
- try:
- return self.run(*argv, **kwargv)
- except: # and re-raise
- logger.exception('PypeTaskBase failed unexpectedly:\n%r' %self)
- self._status = TaskFail
- raise
-
- @staticmethod
- def syncDirectories(fns):
- # need the following loop to force the stupid Islon to update the metadata in the directory
- # otherwise, the file would be appearing as non-existence... sigh, this is a >5 hours hard earned hacks
- # Yes, a friend at AMD had this problem too. Painful. ~cd
- for d in set(os.path.dirname(fn) for fn in fns):
- try:
- os.listdir(d)
- except OSError:
- pass
-
- def run(self, *argv, **kwargv):
- """Determine whether a task should be run when called.
- If the dependency is not satisified,
- then the _taskFun() will be called to generate the output data objects.
-
- Derived class can over-ride this method, but if __call__ is over-ridden,
- then derived must call this explicitly.
- """
- argv = list(argv)
- argv.extend(self._argv)
- kwargv.update(self._kwargv)
-
- inputDataObjs = self.inputDataObjs
- self.syncDirectories([o.localFileName for o in inputDataObjs.values()])
-
- outputDataObjs = self.outputDataObjs
- parameters = self.parameters
-
- logger.info('Running task from function %s()' %(self._taskFun.__name__))
- rtn = self._runTask(self, *argv, **kwargv)
-
- if self.inputDataObjs != inputDataObjs or self.parameters != parameters:
- raise TaskFunctionError("The 'inputDataObjs' and 'parameters' should not be modified in %s" % self.URL)
- missing = [(k,o) for (k,o) in self.outputDataObjs.iteritems() if not o.exists]
- if missing:
- logger.debug("%s fails to generate all outputs; missing:\n%s" %(self.URL, pprint.pformat(missing)))
- self._status = TaskFail
- else:
- self._status = TaskDone
-
- return True # to indicate that it run, since we no longer rely on runFlag
-
- def __repr__(self):
- r = dict()
- r['_status'] = self._status
- r['inputDataObjs'] = self.inputDataObjs
- r['outputDataObjs'] = self.outputDataObjs
- r['mutableDataObjs'] = self.mutableDataObjs
- r['parameters'] = self.parameters
- r['URL'] = getattr(self, 'URL', 'No URL?')
- r['__class__.__name__'] = self.__class__.__name__
- return pprint.pformat(r)
- def brief(self):
- r = dict()
- r['URL'] = self.URL
- return pprint.pformat(r)
-
- def finalize(self):
- """
- This method is intended to be overriden by subclass to provide extra processing that is not
- directed related to the processing the input and output data. For the thread workflow, this
- method will be called in the main thread after a take is finished regardless the job status.
- """
- pass
-
-class PypeThreadTaskBase(PypeTaskBase):
-
- """
- Represent a PypeTask that can be run within a thread.
- Subclass it to for different kind of task.
- """
-
- @property
- def nSlots(self):
- """
- Return the required number of slots to run, total number of slots is determined by
- PypeThreadWorkflow.MAX_NUMBER_TASK_SLOT, increase this number by passing desired number
- through the "parameters" argument (e.g parameters={"nSlots":2}) to avoid high computationa
- intensive job running concurrently in local machine One can set the max number of thread
- of a workflow by PypeThreadWorkflow.setNumThreadAllowed()
- """
- try:
- nSlots = self.parameters["nSlots"]
- except AttributeError:
- nSlots = 1
- except KeyError:
- nSlots = 1
- return nSlots
-
-
- def setMessageQueue(self, q):
- self._queue = q
-
- def setShutdownEvent(self, e):
- self.shutdown_event = e
-
- def __call__(self, *argv, **kwargv):
- """Trap all exceptions, set fail flag, SEND MESSAGE, log, and re-raise.
- """
- try:
- return self.runInThisThread(*argv, **kwargv)
- except: # and re-raise
- logger.exception('PypeTaskBase failed:\n%r' %self)
- self._status = TaskFail # TODO: Do not touch internals of base class.
- self._queue.put( (self.URL, TaskFail) )
- raise
-
- def runInThisThread(self, *argv, **kwargv):
- """
- Similar to the PypeTaskBase.run(), but it provide some machinary to pass information
- back to the main thread that run this task in a sepearated thread through the standard python
- queue from the Queue module.
- """
- if self._queue == None:
- logger.debug('Testing threads w/out queue?')
- self.run(*argv, **kwargv)
- # return
- # raise until we know what this should do.
- raise Exception('There seems to be a case when self.queue==None, so we need to let this block simply return.')
-
- self._queue.put( (self.URL, "started, runflag: %d" % True) )
- self.run(*argv, **kwargv)
-
- self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()])
-
- self._queue.put( (self.URL, self._status) )
-
-class PypeDistributiableTaskBase(PypeThreadTaskBase):
-
- """
- Represent a PypeTask that can be run within a thread or submit to
- a grid-engine like job scheduling system.
- Subclass it to for different kind of task.
- """
-
- def __init__(self, URL, *argv, **kwargv):
- PypeTaskBase.__init__(self, URL, *argv, **kwargv)
- self.distributed = True
-
-
-class PypeTaskCollection(PypeObject):
-
- """
- Represent an object that encapsules a number of tasks
- """
-
- supportedURLScheme = ["tasks"]
- def __init__(self, URL, tasks = [], scatterGatherTasks = [], **kwargv):
- PypeObject.__init__(self, URL, **kwargv)
- self._tasks = tasks[:]
- self._scatterGatherTasks = scatterGatherTasks[:]
-
- def addTask(self, task):
- self._tasks.append(task)
-
- def getTasks(self):
- return self._tasks
-
- def addScatterGatherTask(self, task):
- self._scatterGatherTasks.append(task)
-
- def getScatterGatherTasks(self):
- return self._scatterGatherTasks
-
- def __getitem__(self, k):
- return self._tasks[k]
-
-_auto_names = set()
-def _unique_name(name):
- """
- >>> def foo(): pass
- >>> _unique_name('foo')
- 'foo'
- >>> _unique_name('foo')
- 'foo.01'
- >>> _unique_name('foo')
- 'foo.02'
- """
- if name in _auto_names:
- n = 0
- while True:
- n += 1
- try_name = '%s.%02d' %(name, n)
- if try_name not in _auto_names:
- break
- name = try_name
- _auto_names.add(name)
- return name
-def _auto_task_url(taskFun):
- # Note: in doctest, the filename would be weird.
- return "task://" + inspect.getfile(taskFun) + "/"+ _unique_name(taskFun.func_name)
-
-def PypeTask(**kwargv):
-
- """
- A decorator that converts a function into a PypeTaskBase object.
-
- >>> import os, time
- >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
- >>> from pypeflow.task import *
- >>> try:
- ... os.makedirs("/tmp/pypetest")
- ... _ = os.system("rm -f /tmp/pypetest/*")
- ... except Exception:
- ... pass
- >>> time.sleep(.1)
- >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False)
- >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False)
- >>> @PypeTask(outputs={"test_out":fout},
- ... inputs={"test_in":fin},
- ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'})
- ... def test(self):
- ... print test.test_in.localFileName
- ... print test.test_out.localFileName
- ... os.system( "touch %s" % fn(test.test_out) )
- ... print self.test_in.localFileName
- ... print self.test_out.localFileName
- ... pass
- >>> type(test)
- <class 'pypeflow.task.PypeTaskBase'>
- >>> test.test_in.localFileName
- '/tmp/pypetest/testfile_in'
- >>> test.test_out.localFileName
- '/tmp/pypetest/testfile_out'
- >>> os.system( "touch %s" % ( fn(fin)) )
- 0
- >>> timeStampCompare(test.inputDataObjs, test.outputDataObjs, test.parameters)
- True
- >>> print test._getRunFlag()
- True
- >>> test()
- /tmp/pypetest/testfile_in
- /tmp/pypetest/testfile_out
- /tmp/pypetest/testfile_in
- /tmp/pypetest/testfile_out
- True
- >>> timeStampCompare(test.inputDataObjs, test.outputDataObjs, test.parameters)
- False
- >>> print test._getRunFlag()
- False
- >>> print test.a
- I am "a"
- >>> print test.b
- I am "b"
- >>> os.system( "touch %s" % (fn(fin)) )
- 0
- >>> # test PypeTask.finalize()
- >>> from controller import PypeWorkflow
- >>> wf = PypeWorkflow()
- >>> wf.addTask(test)
- >>> def finalize(self):
- ... def f():
- ... print "in finalize:", self._status
- ... return f
- >>> test.finalize = finalize(test) # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overriden by subclasses.
- >>> wf.refreshTargets( objs = [fout] )
- /tmp/pypetest/testfile_in
- /tmp/pypetest/testfile_out
- /tmp/pypetest/testfile_in
- /tmp/pypetest/testfile_out
- in finalize: done
- True
- >>> #The following code show how to set up a task with a PypeThreadWorkflow that allows running multitple tasks in parallel.
- >>> from pypeflow.controller import PypeThreadWorkflow
- >>> wf = PypeThreadWorkflow()
- >>> @PypeTask(outputDataObjs={"test_out":fout},
- ... inputDataObjs={"test_in":fin},
- ... TaskType=PypeThreadTaskBase,
- ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'})
- ... def test(self):
- ... print test.test_in.localFileName
- ... print test.test_out.localFileName
- ... os.system( "touch %s" % fn(test.test_out) )
- ... print self.test_in.localFileName
- ... print self.test_out.localFileName
- >>> wf.addTask(test)
- >>> def finalize(self):
- ... def f():
- ... print "in finalize:", self._status
- ... return f
- >>> test.finalize = finalize(test) # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overided by subclasses.
- >>> wf.refreshTargets( objs = [fout] ) #doctest: +SKIP
- """
-
- def f(taskFun):
-
- TaskType = kwargv.get("TaskType", PypeTaskBase)
-
- if "TaskType" in kwargv:
- del kwargv["TaskType"]
-
- kwargv["_taskFun"] = taskFun
-
- if kwargv.get("URL",None) == None:
- kwargv["URL"] = _auto_task_url(taskFun)
- try:
- kwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest()
- except IOError: #python2.7 seems having problem to get source code from docstring, this is a work around to make docstring test working
- kwargv["_codeMD5digest"] = ""
- kwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest()
-
- newKwargv = copy.copy(kwargv)
- inputDataObjs = kwargv.get("inputDataObjs",{})
- inputDataObjs.update(kwargv.get("inputs", {}))
- outputDataObjs = kwargv.get("outputDataObjs",{})
- outputDataObjs.update(kwargv.get("outputs", {}))
- newInputs = {}
- for inputKey, inputDO in inputDataObjs.items():
- if isinstance(inputDO, PypeSplittableLocalFile):
- newInputs[inputKey] = inputDO._completeFile
- else:
- newInputs[inputKey] = inputDO
-
- newOutputs = {}
- for outputKey, outputDO in outputDataObjs.items():
- if isinstance(outputDO, PypeSplittableLocalFile):
- newOutputs[outputKey] = outputDO._completeFile
- else:
- newOutputs[outputKey] = outputDO
-
- newKwargv["inputDataObjs"] = newInputs
- newKwargv["outputDataObjs"] = newOutputs
- task = TaskType(**newKwargv)
- task.__doc__ = taskFun.__doc__
- return task
-
- return f
-
-def PypeShellTask(**kwargv):
-
- """
- A function that converts a shell script into a PypeTaskBase object.
-
- >>> import os, time
- >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
- >>> from pypeflow.task import *
- >>> try:
- ... os.makedirs("/tmp/pypetest")
- ... _ = os.system("rm -f /tmp/pypetest/*")
- ... except Exception:
- ... pass
- >>> time.sleep(.1)
- >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False)
- >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False)
- >>> f = open("/tmp/pypetest/shellTask.sh","w")
- >>> f.write( "touch %s" % (fn(fout)))
- >>> f.close()
- >>> shellTask = PypeShellTask(outputDataObjs={"test_out":fout},
- ... inputDataObjs={"test_in":fin},
- ... parameters={"a":'I am "a"'}, **{"b":'I am "b"'})
- >>> shellTask = shellTask("/tmp/pypetest/shellTask.sh")
- >>> type(shellTask)
- <class 'pypeflow.task.PypeTaskBase'>
- >>> print fn(shellTask.test_in)
- /tmp/pypetest/testfile_in
- >>> os.system( "touch %s" % fn(fin) )
- 0
- >>> timeStampCompare(shellTask.inputDataObjs, shellTask.outputDataObjs, shellTask.parameters)
- True
- >>> print shellTask._getRunFlag()
- True
- >>> shellTask() # run task
- True
- >>> timeStampCompare(shellTask.inputDataObjs, shellTask.outputDataObjs, shellTask.parameters)
- False
- >>> print shellTask._getRunFlag()
- False
- >>> shellTask()
- True
- """
-
- def f(scriptToRun):
- def taskFun(self):
- """make shell script using a template"""
- """run shell command"""
- shellCmd = "/bin/bash %s" % scriptToRun
- runShellCmd(shlex.split(shellCmd))
-
- kwargv["script"] = scriptToRun
- return PypeTask(**kwargv)(taskFun)
-
- return f
-
-
-def PypeSGETask(**kwargv):
-
- """
- Similar to PypeShellTask, but the shell script job will be executed through SGE.
- """
-
- def f(scriptToRun):
-
- def taskFun():
- """make shell script using the template"""
- """run shell command"""
- shellCmd = "qsub -sync y -S /bin/bash %s" % scriptToRun
- runShellCmd(shlex.split(shellCmd))
-
- kwargv["script"] = scriptToRun
-
- return PypeTask(**kwargv)(taskFun)
-
- return f
-
-def PypeDistributibleTask(**kwargv):
-
- """
- Similar to PypeShellTask and PypeSGETask, with an additional argument "distributed" to decide
- whether a job to be run through local shell or SGE.
- """
-
- distributed = kwargv.get("distributed", False)
- def f(scriptToRun):
- def taskFun(self):
- """make shell script using the template"""
- """run shell command"""
- if distributed == True:
- shellCmd = "qsub -sync y -S /bin/bash %s" % scriptToRun
- else:
- shellCmd = "/bin/bash %s" % scriptToRun
-
- runShellCmd(shlex.split(shellCmd))
-
- kwargv["script"] = scriptToRun
- return PypeTask(**kwargv)(taskFun)
-
- return f
-
-
-def PypeScatteredTasks(**kwargv):
-
- def f(taskFun):
-
- TaskType = kwargv.get("TaskType", PypeTaskBase)
-
- if "TaskType" in kwargv:
- del kwargv["TaskType"]
-
- kwargv["_taskFun"] = taskFun
-
- inputDataObjs = kwargv["inputDataObjs"]
- outputDataObjs = kwargv["outputDataObjs"]
- nChunk = None
- scatteredInput = []
-
- if kwargv.get("URL", None) == None:
- kwargv["URL"] = "tasks://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name
-
- tasks = PypeTaskCollection(kwargv["URL"])
-
- for inputKey, inputDO in inputDataObjs.items():
- if hasattr(inputDO, "nChunk"):
- if nChunk != None:
- assert inputDO.nChunk == nChunk
- else:
- nChunk = inputDO.nChunk
- if inputDO.getScatterTask() != None:
- tasks.addScatterGatherTask( inputDO.getScatterTask() )
-
- scatteredInput.append( inputKey )
-
- for outputKey, outputDO in outputDataObjs.items():
- if hasattr(outputDO, "nChunk"):
- if nChunk != None:
- assert outputDO.nChunk == nChunk
- if outputDO.getGatherTask() != None:
- tasks.addScatterGatherTask( outputDO.getGatherTask() )
- else:
- nChunk = outputDO.nChunk
-
-
- for i in range(nChunk):
-
- newKwargv = copy.copy(kwargv)
-
- subTaskInput = {}
- for inputKey, inputDO in inputDataObjs.items():
- if inputKey in scatteredInput:
- subTaskInput[inputKey] = inputDO.getSplittedFiles()[i]
- else:
- subTaskInput[inputKey] = inputDO
-
- subTaskOutput = {}
- for outputKey, outputDO in outputDataObjs.items():
- subTaskOutput[outputKey] = outputDO.getSplittedFiles()[i]
-
- newKwargv["inputDataObjs"] = subTaskInput
- newKwargv["outputDataObjs"] = subTaskOutput
-
- #newKwargv["URL"] = "task://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name + "/%03d" % i
- newKwargv["URL"] = kwargv["URL"].replace("tasks","task") + "/%03d" % i
-
- try:
- newKwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest()
- except IOError:
- # python2.7 seems having problem to get source code from docstring,
- # this is a work around to make docstring test working
- newKwargv["_codeMD5digest"] = ""
-
- newKwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest()
- newKwargv["chunk_id"] = i
-
-
- tasks.addTask( TaskType(**newKwargv) )
- return tasks
- return f
-
-getPypeScatteredTasks = PypeScatteredTasks
-
-def PypeFOFNMapTasks(**kwargv):
- """
- A special decorator that takes a FOFN (file of file names) as the main
- input and generate the tasks with the inputs are the files specified in
- the FOFN
-
- Example:
-
- def outTemplate(fn):
- return fn + ".out"
-
- def task(self, **kwargv):
- in_f = self.in_f
- out_f = self.out_f
- #do something with in_f, and write something to out_f
-
- tasks = PypeFOFNMapTasks(FOFNFileName = "./file.fofn",
- outTemplateFunc = outTemplate,
- TaskType = PypeThreadTaskBase,
- parameters = dict(nSlots = 8))( alignTask )
- """
-
- def f(taskFun):
-
- TaskType = kwargv.get("TaskType", PypeTaskBase)
-
- if "TaskType" in kwargv:
- del kwargv["TaskType"]
-
- kwargv["_taskFun"] = taskFun
-
- FOFNFileName = kwargv["FOFNFileName"]
- outTemplateFunc = kwargv["outTemplateFunc"]
-
- if kwargv.get("URL", None) == None:
- kwargv["URL"] = "tasks://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name
-
- tasks = PypeTaskCollection(kwargv["URL"])
-
- with open(FOFNFileName,"r") as FOFN:
-
- newKwargv = copy.copy(kwargv)
-
- for fn in FOFN:
-
- fn = fn.strip()
-
- if len(fn) == 0:
- continue
-
- newKwargv["inputDataObjs"] = {"in_f": makePypeLocalFile(fn) }
- outfileName = outTemplateFunc(fn)
- newKwargv["outputDataObjs"] = {"out_f": makePypeLocalFile(outfileName) }
- newKwargv["URL"] = kwargv["URL"].replace("tasks","task") + "/%s" % hashlib.md5(fn).hexdigest()
-
- try:
- newKwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest()
- except IOError:
- # python2.7 seems having problem to get source code from docstring,
- # this is a work around to make docstring test working
- newKwargv["_codeMD5digest"] = ""
-
-
- newKwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest()
-
- tasks.addTask( TaskType(**newKwargv) )
-
- allFOFNOutDataObjs = dict( [ ("FOFNout%03d" % t[0], t[1].in_f) for t in enumerate(tasks) ] )
-
- def pseudoScatterTask(**kwargv):
- pass
-
- newKwargv = dict( inputDataObjs = {"FOFNin": makePypeLocalFile(FOFNFileName)},
- outputDataObjs = allFOFNOutDataObjs,
- _taskFun = pseudoScatterTask,
- _compareFunctions = [lambda inObjs, outObj, params: False], #this task is never meant to be run
- URL = "task://pseudoScatterTask/%s" % FOFNFileName)
-
- tasks.addTask( TaskType(**newKwargv) )
-
- return tasks
-
- return f
-
-getFOFNMapTasks = PypeFOFNMapTasks
-
-def timeStampCompare( inputDataObjs, outputDataObjs, parameters) :
-
- """
- Given the inputDataObjs and the outputDataObjs, determine whether any
- object in the inputDataObjs is created or modified later than any object
- in outputDataObjects.
- """
-
- runFlag = False
-
- inputDataObjsTS = []
- for ft, f in inputDataObjs.iteritems():
- if not f.exists:
- wait_s = 60
- logger.warning('input does not exist yet (in this filesystem): %r - waiting up to %ds' %(f, wait_s))
- for i in range(wait_s):
- time.sleep(1)
- if f.exists:
- logger.warning('now exists: %r (after only %ds)' %(f, i))
- break
- # At this point, if it still does not exist, the entire workflow will fail.
- # What else can we do? The user's filesystem has too much latency.
- inputDataObjsTS.append((f.timeStamp, 'A', f))
-
- outputDataObjsTS = []
- for ft, f in outputDataObjs.iteritems():
- if not f.exists:
- logger.debug('output does not exist yet: %r'%f)
- runFlag = True
- break
- else:
- # 'A' < 'B', so outputs are 'later' if timestamps match.
- outputDataObjsTS.append((f.timeStamp, 'B', f))
-
- if not outputDataObjs:
- # 0 outputs => always run
- runFlag = True
-
- if not runFlag and inputDataObjs: # 0 inputs would imply that existence of outputs is enough.
- minOut = min(outputDataObjsTS)
- maxIn = max(inputDataObjsTS)
- if minOut < maxIn:
- logger.debug('timestamp of output < input: %r < %r'%(minOut, maxIn))
- runFlag = True
-
- return runFlag
-
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
diff --git a/pypeFLOW/setup.py b/pypeFLOW/setup.py
index df77bec..1397b35 100644
--- a/pypeFLOW/setup.py
+++ b/pypeFLOW/setup.py
@@ -2,22 +2,19 @@ from setuptools import setup, Extension, find_packages
setup(
name = 'pypeflow',
- version='0.1.1',
+ version='1.0.0',
author='J. Chin',
author_email='cschin at infoecho.net',
license='LICENSE.txt',
packages = [
'pypeflow',
- 'pwatcher', # a separate package for here for convenience, for now
+ 'pwatcher', # a separate package here for convenience, for now
'pwatcher.mains',
],
package_dir = {'':'.'},
zip_safe = False,
install_requires=[
- 'rdflib == 3.4.0',
- 'rdfextras >= 0.1',
- 'html5lib == 0.999999',
- 'networkx >=1.7, <=1.10',
+ 'networkx >=1.7, <=1.11',
],
entry_points = {'console_scripts': [
'pwatcher-main=pwatcher.mains.pwatcher:main',
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/falcon.git
More information about the debian-med-commit
mailing list