[med-svn] [falcon] 01/12: Imported Upstream version 1.8.3

Afif Elghraoui afif at moszumanska.debian.org
Sun Nov 27 05:38:30 UTC 2016


This is an automated email from the git hooks/post-receive script.

afif pushed a commit to branch master
in repository falcon.

commit 58a5f3293b83f7d435c8cd3a39912134455746a6
Author: Afif Elghraoui <afif at debian.org>
Date:   Sat Nov 26 20:06:30 2016 -0800

    Imported Upstream version 1.8.3
---
 FALCON-examples/git-sym.makefile              |   2 +
 FALCON-examples/makefile                      |   6 +-
 FALCON-examples/run/greg200k-sv2/README.md    |   1 +
 FALCON-examples/run/greg200k-sv2/fc_run.cfg   |  44 ++
 FALCON-examples/run/greg200k-sv2/fc_unzip.cfg |  21 +
 FALCON-examples/run/greg200k-sv2/input.fofn   |   2 +
 FALCON-examples/run/greg200k-sv2/makefile     |  10 +
 FALCON-examples/run/synth0/makefile           |   8 -
 FALCON-make/makefile                          |   2 +-
 FALCON/falcon_kit/bash.py                     |   4 +-
 FALCON/falcon_kit/mains/fetch_reads.py        |  72 +--
 FALCON/falcon_kit/mains/get_read_ctg_map.py   | 180 +++---
 FALCON/falcon_kit/mains/pr_ctg_track.py       |   6 +-
 FALCON/falcon_kit/mains/rr_ctg_track.py       |  10 +-
 FALCON/falcon_kit/mains/run1.py               |  10 +-
 FALCON/test/test_functional.py                |   1 -
 pypeFLOW/pypeflow/common.py                   | 149 -----
 pypeFLOW/pypeflow/controller.py               | 875 -------------------------
 pypeFLOW/pypeflow/data.py                     | 315 ---------
 pypeFLOW/pypeflow/pwatcher_bridge.py          | 391 -----------
 pypeFLOW/pypeflow/pwatcher_workflow.py        |   9 +
 pypeFLOW/pypeflow/simple_pwatcher_bridge.py   |  14 +-
 pypeFLOW/pypeflow/task.py                     | 892 --------------------------
 pypeFLOW/setup.py                             |   9 +-
 24 files changed, 247 insertions(+), 2786 deletions(-)

diff --git a/FALCON-examples/git-sym.makefile b/FALCON-examples/git-sym.makefile
index aecdd97..ed9b703 100644
--- a/FALCON-examples/git-sym.makefile
+++ b/FALCON-examples/git-sym.makefile
@@ -27,3 +27,5 @@ synth5k.2016-11-02:
 	curl -L https://downloads.pacbcloud.com/public/data/git-sym/synth5k.2016-11-02.tgz | tar xvfz -
 ecoli.m140913_050931_42139_c100713652400000001823152404301535_s1_p0:
 	curl -L https://downloads.pacbcloud.com/public/data/git-sym/ecoli.m140913_050931_42139_c100713652400000001823152404301535_s1_p0.subreads.tar | tar xvf -
+greg200k-sv2:
+	curl -L https://downloads.pacbcloud.com/public/data/git-sym/greg200k-sv2.tar | tar xvf -
diff --git a/FALCON-examples/makefile b/FALCON-examples/makefile
index 57a7459..d600c35 100644
--- a/FALCON-examples/makefile
+++ b/FALCON-examples/makefile
@@ -8,13 +8,9 @@ setup-%:
 	git-sym check run/$*
 # Our only integration test, for now.
 test:
-	python -c 'import pypeflow.common; print pypeflow.common'
+	python -c 'import pypeflow.pwatcher_workflow; print pypeflow.pwatcher_workflow'
 	python -c 'import falcon_kit; print falcon_kit.falcon'
 	${MAKE} run-synth0
 	${MAKE} -C run/synth0 test
-	#${MAKE} -C run/synth0 clean
-	#${MAKE} -C run/synth0 go0 # still test the old pypeflow too, for now
-	${MAKE} -C run/synth0 clean
-	${MAKE} -C run/synth0 go1 # should be the same as go
 
 .PHONY: default
diff --git a/FALCON-examples/run/greg200k-sv2/README.md b/FALCON-examples/run/greg200k-sv2/README.md
new file mode 100644
index 0000000..85c3a80
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/README.md
@@ -0,0 +1 @@
+This needs to be updated again. Still a WIP.
diff --git a/FALCON-examples/run/greg200k-sv2/fc_run.cfg b/FALCON-examples/run/greg200k-sv2/fc_run.cfg
new file mode 100755
index 0000000..a8fb8c4
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/fc_run.cfg
@@ -0,0 +1,44 @@
+[General]
+# list of files of the initial bas.h5 files
+input_fofn = input.fofn
+#input_fofn = preads.fofn
+
+job_type = local
+
+input_type = raw
+#input_type = preads
+
+#openending = True
+
+# The length cutoff used for seed reads used for initial mapping
+length_cutoff = 1000
+genome_size = 200000
+#seed_coverage = 60
+
+# The length cutoff used for seed reads usef for pre-assembly
+length_cutoff_pr = 1
+
+
+sge_option_da = -pe smp 4 -q bigmem
+sge_option_la = -pe smp 20 -q bigmem
+sge_option_pda = -pe smp 6 -q bigmem 
+sge_option_pla = -pe smp 16 -q bigmem
+sge_option_fc = -pe smp 24 -q bigmem
+sge_option_cns = -pe smp 8 -q bigmem
+
+#192 ?
+pa_concurrent_jobs = 12
+cns_concurrent_jobs = 12
+ovlp_concurrent_jobs = 12
+
+pa_HPCdaligner_option =  -v -dal128 -t16 -e0.8 -M24 -l3200 -k18 -h480 -w8 -s100
+ovlp_HPCdaligner_option =  -v -dal128  -M24 -k24 -h1024 -e.9 -l2500 -s100
+
+pa_DBsplit_option = -a -x500 -s100
+ovlp_DBsplit_option = -s100
+
+falcon_sense_option = --output_multi --min_cov_aln 4 --min_idt 0.70 --min_cov 4 --max_n_read 200 --n_core 8 
+falcon_sense_skip_contained = False
+
+overlap_filtering_setting = --max_diff 120 --max_cov 120 --min_cov 2 --n_core 12
+#dazcon = 1
diff --git a/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg b/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg
new file mode 100644
index 0000000..c9de3af
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/fc_unzip.cfg
@@ -0,0 +1,21 @@
+[General]
+job_type = SGE
+job_type = local
+
+[Unzip]
+
+input_fofn= input.fofn
+input_bam_fofn= input_bam.fofn
+#smrt_bin= /mnt/secondary/builds/full/3.0.0/prod/smrtanalysis_3.0.0.153854/smrtcmds/bin/
+#smrt_bin=/mnt/secondary/builds/full/3.0.1/prod/current-build_smrtanalysis/smrtcmds/bin/
+smrt_bin=/mnt/secondary/builds/full/3.0.0/prod/current-build_smrtanalysis/smrtcmds/bin/
+sge_phasing= -pe smp 12 -q bigmem
+sge_quiver= -pe smp 12 -q sequel-farm
+sge_track_reads= -pe smp 12 -q default
+sge_blasr_aln=  -pe smp 24 -q bigmem 
+sge_hasm=  -pe smp 48 -q bigmem
+unzip_concurrent_jobs = 64
+quiver_concurrent_jobs = 64
+
+unzip_concurrent_jobs = 12
+quiver_concurrent_jobs = 12
diff --git a/FALCON-examples/run/greg200k-sv2/input.fofn b/FALCON-examples/run/greg200k-sv2/input.fofn
new file mode 100644
index 0000000..99cce2d
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/input.fofn
@@ -0,0 +1,2 @@
+data/greg200k-sv2/subreads1.dexta
+data/greg200k-sv2/subreads2.dexta
diff --git a/FALCON-examples/run/greg200k-sv2/makefile b/FALCON-examples/run/greg200k-sv2/makefile
new file mode 100644
index 0000000..099e5ae
--- /dev/null
+++ b/FALCON-examples/run/greg200k-sv2/makefile
@@ -0,0 +1,10 @@
+falcon:
+	fc_run.py fc_run.cfg
+unzip:
+	fc_unzip.py fc_unzip.cfg
+quiver:
+	fc_quiver.py fc_unzip.cfg
+clean:
+	rm -rf 3-*/ mypwatcher/ pwatcher.dir
+cleaner: clean
+	rm -rf 0-*/ 1-*/ 2-*/ all.log mypwatcher/ scripts/ sge_log/
diff --git a/FALCON-examples/run/synth0/makefile b/FALCON-examples/run/synth0/makefile
index d471427..e7ef88e 100644
--- a/FALCON-examples/run/synth0/makefile
+++ b/FALCON-examples/run/synth0/makefile
@@ -5,14 +5,6 @@ go: run
 	${MAKE} test
 run:
 	fc_run fc_run.cfg logging.json
-go1: run1
-	${MAKE} test
-run1:
-	fc_run1 fc_run.cfg logging.ini
-go0: run0
-	${MAKE} test
-run0:
-	fc_run0 fc_run.cfg logging.ini
 test:
 	./check.py
 clean:
diff --git a/FALCON-make/makefile b/FALCON-make/makefile
index f6c0d95..52b42d3 100644
--- a/FALCON-make/makefile
+++ b/FALCON-make/makefile
@@ -53,7 +53,7 @@ show:
 	echo "FALCON_PIP_EDIT=${FALCON_PIP_EDIT}"
 	echo "FALCON_PIP_USER=${FALCON_PIP_USER}"
 check:
-	python -c 'import pypeflow.common; print pypeflow.common'
+	python -c 'import pypeflow.simple_pwatcher_bridge; print pypeflow.simple_pwatcher_bridge'
 	python -c 'import falcon_kit; print falcon_kit.falcon'
 extra:
 	pip install ${FALCON_PIP_USER} Cython
diff --git a/FALCON/falcon_kit/bash.py b/FALCON/falcon_kit/bash.py
index fabe142..dbf3378 100644
--- a/FALCON/falcon_kit/bash.py
+++ b/FALCON/falcon_kit/bash.py
@@ -262,8 +262,8 @@ ln -sf ${{db_dir}}/{db_prefix}.db .
 ln -sf ${{db_dir}}/.{db_prefix}.dust.anno .
 ln -sf ${{db_dir}}/.{db_prefix}.dust.data .
 {daligner_cmd}
-rm -f *.C?.las *.C?.S.las
-rm -f *.N?.las *.N?.S.las
+rm -f *.C?.las *.C?.S.las *.C??.las *.C??.S.las *.C???.las *.C???.S.las
+rm -f *.N?.las *.N?.S.las *.N??.las *.N??.S.las *.N???.las *.N???.S.las
 """.format(db_dir=db_dir, db_prefix=db_prefix, daligner_cmd=daligner_cmd)
         yield job_uid, bash
 
diff --git a/FALCON/falcon_kit/mains/fetch_reads.py b/FALCON/falcon_kit/mains/fetch_reads.py
index c4576c9..9f7b458 100644
--- a/FALCON/falcon_kit/mains/fetch_reads.py
+++ b/FALCON/falcon_kit/mains/fetch_reads.py
@@ -6,45 +6,41 @@ import sys
 import re
 
 
-
-
 def fetch_ref_and_reads(base_dir, fofn, ctg_id, out_dir, min_ctg_lenth):
-
-
     read_fofn = fofn
     if out_dir == None:
-        out_dir = os.path.join( base_dir, "3-unzip/reads")
+        out_dir = os.path.join(base_dir, '3-unzip/reads')
 
-    ctg_fa = os.path.join( base_dir, "2-asm-falcon/p_ctg.fa")
-    read_map_dir = os.path.join( base_dir, "2-asm-falcon/read_maps" )
+    ctg_fa = os.path.join(base_dir, '2-asm-falcon/p_ctg.fa')
+    read_map_dir = os.path.join(base_dir, '2-asm-falcon/read_maps')
 
-    rawread_id_file = os.path.join( read_map_dir, "raw_read_ids" )
-    pread_id_file = os.path.join( read_map_dir, "pread_ids" )
+    rawread_id_file = os.path.join(read_map_dir, 'dump_rawread_ids', 'rawread_ids')
+    pread_id_file = os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids')
 
-    rid_to_oid = open(rawread_id_file).read().split("\n")  #daligner raw read id to the original ids
-    pid_to_fid = open(pread_id_file).read().split("\n")  #daligner pread id to the fake ids
+    rid_to_oid = open(rawread_id_file).read().split('\n')  #daligner raw read id to the original ids
+    pid_to_fid = open(pread_id_file).read().split('\n')  #daligner pread id to the fake ids
 
     def pid_to_oid(pid):
         fid = pid_to_fid[int(pid)]
-        rid = int(fid.split("/")[1])/10
+        rid = int(fid.split('/')[1])/10
         return rid_to_oid[int(rid)]
 
     ref_fasta = FastaReader(ctg_fa)
     all_ctg_ids = set()
     for s in ref_fasta:
         s_id = s.name.split()[0]
-        if ctg_id != "all" and s_id != ctg_id:
+        if ctg_id != 'all' and s_id != ctg_id:
             continue
 
         if len(s.sequence) < min_ctg_lenth:
             continue
 
-        if ctg_id != "all":
-            ref_out = open( os.path.join( out_dir, "%s_ref.fa" % ctg_id), "w" )
+        if ctg_id != 'all':
+            ref_out = open( os.path.join( out_dir, '%s_ref.fa' % ctg_id), 'w' )
         else:
-            ref_out = open( os.path.join( out_dir, "%s_ref.fa" % s_id), "w" )
+            ref_out = open( os.path.join( out_dir, '%s_ref.fa' % s_id), 'w' )
 
-        print >>ref_out, ">%s" % s_id
+        print >>ref_out, '>%s' % s_id
         print >>ref_out, s.sequence
         all_ctg_ids.add(s_id)
         ref_out.close()
@@ -53,71 +49,71 @@ def fetch_ref_and_reads(base_dir, fofn, ctg_id, out_dir, min_ctg_lenth):
     read_set = {}
     ctg_id_hits = {}
 
-    map_fn = os.path.join(read_map_dir,"rawread_to_contigs")
-    with open(map_fn, "r") as f:
+    map_fn = os.path.join(read_map_dir,'rawread_to_contigs')
+    with open(map_fn, 'r') as f:
         for row in f:
             row = row.strip().split()
             hit_ctg = row[1]
-            hit_ctg = hit_ctg.split("-")[0]
+            hit_ctg = hit_ctg.split('-')[0]
             if int(row[3]) == 0:
                 o_id = rid_to_oid[int(row[0])]
                 read_set[o_id] = hit_ctg
                 ctg_id_hits[hit_ctg] = ctg_id_hits.get(hit_ctg, 0) + 1
 
-    map_fn = os.path.join(read_map_dir,"pread_to_contigs")
-    with open(map_fn, "r") as f:
+    map_fn = os.path.join(read_map_dir,'pread_to_contigs')
+    with open(map_fn, 'r') as f:
         for row in f:
             row = row.strip().split()
             hit_ctg = row[1]
-            hit_ctg = hit_ctg.split("-")[0]
+            hit_ctg = hit_ctg.split('-')[0]
             if hit_ctg not in read_set and int(row[3]) == 0:
                 o_id = pid_to_oid(row[0])
                 read_set[o_id] = hit_ctg
                 ctg_id_hits[hit_ctg] = ctg_id_hits.get(hit_ctg, 0) + 1
 
 
-    with open(os.path.join( out_dir, "ctg_list"),"w") as f:
+    with open(os.path.join( out_dir, 'ctg_list'),'w') as f:
         for ctg_id in sorted(list(all_ctg_ids)):
             if ctg_id_hits.get(ctg_id, 0) < 5:
                 continue
-            if ctg_id[-1] not in ["F", "R"]: #ignore small circle contigs, they need different approach
+            if ctg_id[-1] not in ['F', 'R']: #ignore small circle contigs, they need different approach
                 continue
             print >>f, ctg_id
 
     read_out_files = {}
-    with open(read_fofn, "r") as f:
+    with open(read_fofn, 'r') as f:
         for r_fn in f:
             r_fn = r_fn.strip()
             read_fa_file = FastaReader(r_fn)
             for r in read_fa_file:
                 rid = r.name.split()[0]
                 if rid not in read_set:
-                    ctg_id = "unassigned"
+                    ctg_id = 'unassigned'
                 else:
                     ctg_id = read_set[rid]
 
-                if ctg_id == "NA" or ctg_id not in all_ctg_ids:
-                    ctg_id = "unassigned"
+                if ctg_id == 'NA' or ctg_id not in all_ctg_ids:
+                    ctg_id = 'unassigned'
 
                 if ctg_id not in read_out_files:
-                    read_out = open( os.path.join( out_dir, "%s_reads.fa" % ctg_id), "w" )
+                    read_out = open( os.path.join( out_dir, '%s_reads.fa' % ctg_id), 'w' )
                     read_out_files[ctg_id] = 1
                 else:
-                    read_out = open( os.path.join( out_dir, "%s_reads.fa" % ctg_id), "a" )
+                    read_out = open( os.path.join( out_dir, '%s_reads.fa' % ctg_id), 'a' )
 
-                print >>read_out, ">"+rid
+                print >>read_out, '>'+rid
                 print >>read_out, r.sequence
                 read_out.close()
 
 def parse_args(argv):
     parser = argparse.ArgumentParser(description='using the read to contig mapping data to partition the reads grouped by contigs')
-    parser.add_argument('--base_dir', type=str, default="./", help='the base working dir of a falcon assembly')
-    parser.add_argument('--fofn', type=str, default="./input.fofn", help='path to the file of the list of raw read fasta files')
-    parser.add_argument('--ctg_id', type=str, default="all", help='contig identifier in the contig fasta file')
+    parser.add_argument('--base_dir', type=str, default='./', help='the base working dir of a falcon assembly')
+    parser.add_argument('--fofn', type=str, default='./input.fofn', help='path to the file of the list of raw read fasta files')
+    parser.add_argument('--ctg_id', type=str, default='all', help='contig identifier in the contig fasta file')
     parser.add_argument('--out_dir', default=None, type=str, help='the output base_dir, default to `base_dir/3-unzip/reads` directory')
     parser.add_argument('--min_ctg_lenth', default=20000, type=int, help='the minimum length of the contig for the outputs, default=20000')
-    #parser.add_argument('--ctg_fa', type=str, default="./2-asm-falcon/p_ctg.fa", help='path to the contig fasta file')
-    #parser.add_argument('--read_map_dir', type=str, default="./2-asm-falcon/read_maps", help='path to the read-contig map directory')
+    #parser.add_argument('--ctg_fa', type=str, default='./2-asm-falcon/p_ctg.fa', help='path to the contig fasta file')
+    #parser.add_argument('--read_map_dir', type=str, default='./2-asm-falcon/read_maps', help='path to the read-contig map directory')
     # we can run this in parallel mode in the furture
     #parser.add_argument('--n_core', type=int, default=4,
     #                    help='number of processes used for generating consensus')
@@ -129,5 +125,5 @@ def main(argv=sys.argv):
     args = parse_args(argv)
     fetch_ref_and_reads(**vars(args))
 
-if __name__ == "__main__":
+if __name__ == '__main__':
     main()
diff --git a/FALCON/falcon_kit/mains/get_read_ctg_map.py b/FALCON/falcon_kit/mains/get_read_ctg_map.py
index 7b85f95..349322f 100644
--- a/FALCON/falcon_kit/mains/get_read_ctg_map.py
+++ b/FALCON/falcon_kit/mains/get_read_ctg_map.py
@@ -1,10 +1,14 @@
-from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
-from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase
-from pypeflow.controller import PypeWorkflow, PypeMPWorkflow, PypeThreadWorkflow
-from falcon_kit.FastaReader import FastaReader
+from __future__ import absolute_import
+#from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
+#from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase
+#from pypeflow.controller import PypeWorkflow, PypeMPWorkflow, PypeThreadWorkflow
+from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
+        makePypeLocalFile, fn, PypeTask)
+PypeThreadTaskBase = MyFakePypeThreadTaskBase
 from falcon_kit.fc_asm_graph import AsmGraph
 import argparse
 import glob
+import logging
 import sys
 import subprocess as sp
 import shlex
@@ -14,97 +18,106 @@ def make_dirs(d):
     if not os.path.isdir(d):
         os.makedirs(d)
 
+def dump_rawread_ids(self):
+    rawread_db = fn(self.rawread_db)
+    rawread_id_file = fn(self.rawread_id_file)
+    os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file))
 
-def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
-
-    read_map_dir = os.path.abspath(os.path.join(asm_dir, "read_maps"))
-    make_dirs(read_map_dir)
+def dump_pread_ids(self):
+    pread_db = fn(self.pread_db)
+    pread_id_file = fn(self.pread_id_file)
+    os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file))
 
-    PypeMPWorkflow.setNumThreadAllowed(12, 12)
-    wf = PypeMPWorkflow()
+def generate_read_to_ctg_map(self):
+    rawread_id_file = fn(self.rawread_id_file)
+    pread_id_file = fn(self.pread_id_file)
+    read_to_contig_map = fn(self.read_to_contig_map)
 
-    rawread_db = makePypeLocalFile( os.path.join( rawread_dir, "raw_reads.db" ) )
-    rawread_id_file = makePypeLocalFile( os.path.join( read_map_dir, "raw_read_ids" ) )
+    pread_did_to_rid = open(pread_id_file).read().split('\n')
+    rid_to_oid = open(rawread_id_file).read().split('\n')
 
-    @PypeTask( inputs = {"rawread_db": rawread_db},
-               outputs =  {"rawread_id_file": rawread_id_file},
-               TaskType = PypeThreadTaskBase,
-               URL = "task://localhost/dump_rawread_ids" )
-    def dump_rawread_ids(self):
-        rawread_db = fn( self.rawread_db )
-        rawread_id_file = fn( self.rawread_id_file )
-        os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (rawread_db, rawread_id_file) )
+    asm_G = AsmGraph(fn(self.sg_edges_list),
+                        fn(self.utg_data),
+                        fn(self.ctg_paths))
 
-    wf.addTask( dump_rawread_ids )
-
-    pread_db = makePypeLocalFile( os.path.join( pread_dir, "preads.db" ) )
-    pread_id_file = makePypeLocalFile( os.path.join( read_map_dir, "pread_ids" ) )
-
-    @PypeTask( inputs = {"pread_db": pread_db},
-               outputs =  {"pread_id_file": pread_id_file},
-               TaskType = PypeThreadTaskBase,
-               URL = "task://localhost/dump_pread_ids" )
-    def dump_pread_ids(self):
-        pread_db = fn( self.pread_db )
-        pread_id_file = fn( self.pread_id_file )
-        os.system("DBshow -n %s | tr -d '>' | LD_LIBRARY_PATH= awk '{print $1}' > %s" % (pread_db, pread_id_file) )
+    pread_to_contigs = {}
 
-    wf.addTask( dump_pread_ids )
+    with open(read_to_contig_map, 'w') as f:
+        for ctg in asm_G.ctg_data:
+            if ctg[-1] == 'R':
+                continue
+            ctg_g = asm_G.get_sg_for_ctg(ctg)
+            for n in ctg_g.nodes():
+                pid = int(n.split(':')[0])
 
-    wf.refreshTargets() # block
+                rid = pread_did_to_rid[pid].split('/')[1]
+                rid = int(int(rid)/10)
+                oid = rid_to_oid[rid]
+                k = (pid, rid, oid)
+                pread_to_contigs.setdefault(k, set())
+                pread_to_contigs[k].add(ctg)
 
-    sg_edges_list = makePypeLocalFile( os.path.join(asm_dir, "sg_edges_list") )
-    utg_data = makePypeLocalFile( os.path.join(asm_dir, "utg_data") )
-    ctg_paths = makePypeLocalFile( os.path.join(asm_dir, "ctg_paths") )
 
-    inputs = { "rawread_id_file": rawread_id_file,
-               "pread_id_file": pread_id_file,
-               "sg_edges_list": sg_edges_list,
-               "utg_data": utg_data,
-               "ctg_paths": ctg_paths }
+        for k in pread_to_contigs:
+            pid, rid, oid = k
+            for ctg in list(pread_to_contigs[ k ]):
+                print >>f, '%09d %09d %s %s' % (pid, rid, oid, ctg)
 
-    read_to_contig_map = makePypeLocalFile( os.path.join(read_map_dir, "read_to_contig_map") )
+def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
+    read_map_dir = os.path.abspath(os.path.join(asm_dir, 'read_maps'))
+    make_dirs(read_map_dir)
 
-    @PypeTask( inputs = inputs,
-               outputs = {"read_to_contig_map": read_to_contig_map},
+    wf = PypeProcWatcherWorkflow(
+            max_jobs=12,
+    )
+    """
+            job_type=config['job_type'],
+            job_queue=config['job_queue'],
+            sge_option=config.get('sge_option', ''),
+            watcher_type=config['pwatcher_type'],
+            watcher_directory=config['pwatcher_directory'])
+    """
+
+    rawread_db = makePypeLocalFile(os.path.join(rawread_dir, 'raw_reads.db'))
+    rawread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_rawread_ids', 'rawread_ids'))
+
+    task = PypeTask(
+              inputs = {'rawread_db': rawread_db},
+              outputs = {'rawread_id_file': rawread_id_file},
+              TaskType = PypeThreadTaskBase,
+              URL = 'task://localhost/dump_rawread_ids')
+    wf.addTask(task(dump_rawread_ids))
+
+    pread_db = makePypeLocalFile(os.path.join(pread_dir, 'preads.db'))
+    pread_id_file = makePypeLocalFile(os.path.join(read_map_dir, 'dump_pread_ids', 'pread_ids'))
+
+    task = PypeTask(
+               inputs = {'pread_db': pread_db},
+               outputs = {'pread_id_file': pread_id_file},
                TaskType = PypeThreadTaskBase,
-               URL = "task://localhost/get_ctg_read_map" )
-    def generate_read_to_ctg_map(self):
-        rawread_id_file = fn( self.rawread_id_file )
-        pread_id_file = fn( self.pread_id_file )
-        read_to_contig_map = fn( self.read_to_contig_map )
-
-        pread_did_to_rid = open(pread_id_file).read().split("\n")
-        rid_to_oid = open(rawread_id_file).read().split("\n")
+               URL = 'task://localhost/dump_pread_ids' )
+    wf.addTask(task(dump_pread_ids))
 
-        asm_G = AsmGraph(fn(self.sg_edges_list),
-                         fn(self.utg_data),
-                         fn(self.ctg_paths) )
-
-        pread_to_contigs = {}
-
-        with open(read_to_contig_map, "w") as f:
-            for ctg in asm_G.ctg_data:
-                if ctg[-1] == "R":
-                    continue
-                ctg_g = asm_G.get_sg_for_ctg(ctg)
-                for n in ctg_g.nodes():
-                    pid = int(n.split(":")[0])
+    wf.refreshTargets() # block
 
-                    rid = pread_did_to_rid[pid].split("/")[1]
-                    rid = int(int(rid)/10)
-                    oid = rid_to_oid[rid]
-                    k = (pid, rid, oid)
-                    pread_to_contigs.setdefault( k, set() )
-                    pread_to_contigs[ k ].add( ctg )
+    sg_edges_list = makePypeLocalFile( os.path.join(asm_dir, 'sg_edges_list') )
+    utg_data = makePypeLocalFile( os.path.join(asm_dir, 'utg_data') )
+    ctg_paths = makePypeLocalFile( os.path.join(asm_dir, 'ctg_paths') )
 
+    inputs = { 'rawread_id_file': rawread_id_file,
+               'pread_id_file': pread_id_file,
+               'sg_edges_list': sg_edges_list,
+               'utg_data': utg_data,
+               'ctg_paths': ctg_paths }
 
-            for k in pread_to_contigs:
-                pid, rid, oid = k
-                for ctg in list(pread_to_contigs[ k ]):
-                    print >>f, "%09d %09d %s %s" % (pid, rid, oid, ctg)
+    read_to_contig_map = makePypeLocalFile(os.path.join(read_map_dir, 'get_ctg_read_map', 'read_to_contig_map'))
 
-    wf.addTask( generate_read_to_ctg_map )
+    task = PypeTask(
+              inputs = inputs,
+              outputs = {'read_to_contig_map': read_to_contig_map},
+              TaskType = PypeThreadTaskBase,
+              URL = 'task://localhost/get_ctg_read_map')
+    wf.addTask(task(generate_read_to_ctg_map))
 
     wf.refreshTargets() # block
 
@@ -112,18 +125,19 @@ def parse_args(argv):
     parser = argparse.ArgumentParser(description='generate `2-asm-falcon/read_maps/read_to_contig_map` that contains the \
 information from the chain of mapping: (contig id) -> (internal p-read id) -> (internal raw-read id) -> (original read id)',
               formatter_class=argparse.ArgumentDefaultsHelpFormatter)
-    parser.add_argument('--basedir', type=str, default="./", help='the base working dir of a FALCON assembly')
+    parser.add_argument('--basedir', type=str, default='./', help='the base working dir of a FALCON assembly')
     args = parser.parse_args(argv[1:])
     return args
 
 def main(argv=sys.argv):
+    logging.basicConfig()
     args = parse_args(argv)
     basedir = args.basedir
-    rawread_dir = os.path.abspath( os.path.join( basedir, "0-rawreads" )  )
-    pread_dir = os.path.abspath( os.path.join( basedir, "1-preads_ovl" ) )
-    asm_dir = os.path.abspath( os.path.join( basedir, "2-asm-falcon") )
+    rawread_dir = os.path.abspath(os.path.join( basedir, '0-rawreads'))
+    pread_dir = os.path.abspath(os.path.join( basedir, '1-preads_ovl'))
+    asm_dir = os.path.abspath(os.path.join( basedir, '2-asm-falcon'))
 
     get_read_ctg_map(rawread_dir=rawread_dir, pread_dir=pread_dir, asm_dir=asm_dir)
 
-if __name__ == "__main__":
+if __name__ == '__main__':
     main()
diff --git a/FALCON/falcon_kit/mains/pr_ctg_track.py b/FALCON/falcon_kit/mains/pr_ctg_track.py
index f516866..f1b06fc 100644
--- a/FALCON/falcon_kit/mains/pr_ctg_track.py
+++ b/FALCON/falcon_kit/mains/pr_ctg_track.py
@@ -54,8 +54,8 @@ def tr_stage1(readlines, min_len, bestn, pid_to_ctg):
 def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
     io.LOG('preparing tr_stage1')
     io.logstats()
-    asm_dir = os.path.abspath( os.path.join(base_dir, "2-asm-falcon") )
-    pid_to_ctg = get_pid_to_ctg( os.path.join(asm_dir, "read_maps/read_to_contig_map" ) )
+    asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon'))
+    pid_to_ctg = get_pid_to_ctg(os.path.join(asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map'))
     inputs = []
     for fn in file_list:
         inputs.append( (run_tr_stage1, db_fn, fn, min_len, bestn, pid_to_ctg) )
@@ -135,7 +135,7 @@ def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream):
 
 def parse_args(argv):
     parser = argparse.ArgumentParser(description='scan the pread overlap information to identify the best hit from the preads \
-to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/read_to_contig_map`',
+to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`',
             formatter_class=argparse.ArgumentDefaultsHelpFormatter)
     parser.add_argument('--n_core', type=int, default=4,
                         help='number of processes used for for tracking reads; '
diff --git a/FALCON/falcon_kit/mains/rr_ctg_track.py b/FALCON/falcon_kit/mains/rr_ctg_track.py
index ad07f48..397d382 100644
--- a/FALCON/falcon_kit/mains/rr_ctg_track.py
+++ b/FALCON/falcon_kit/mains/rr_ctg_track.py
@@ -54,8 +54,8 @@ def tr_stage1(readlines, min_len, bestn, rid_to_ctg):
 def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
     io.LOG('preparing tr_stage1')
     io.logstats()
-    asm_dir = os.path.abspath( os.path.join(base_dir, "2-asm-falcon") )
-    rid_to_ctg = get_rid_to_ctg( os.path.join(asm_dir, "read_maps/read_to_contig_map" ) )
+    asm_dir = os.path.abspath(os.path.join(base_dir, '2-asm-falcon'))
+    rid_to_ctg = get_rid_to_ctg(os.path.join(asm_dir, 'read_maps', 'get_ctg_read_map', 'read_to_contig_map'))
     inputs = []
     for fn in file_list:
         inputs.append( (run_tr_stage1, db_fn, fn, min_len, bestn, rid_to_ctg) )
@@ -75,12 +75,12 @@ def run_track_reads(exe_pool, base_dir, file_list, min_len, bestn, db_fn):
                 else:
                     heappushpop( bread_to_areads[k], item )
 
-    #rid_to_oid = open(os.path.join( rawread_dir, "raw_read_ids" ) ).read().split("\n")
+    #rid_to_oid = open(os.path.join(rawread_dir, 'dump_rawread_ids', 'raw_read_ids')).read().split('\n')
 
     """
     For each b-read, we find the best contig map throgh the b->a->contig map.
     """
-    with open( os.path.join(asm_dir, "read_maps/rawread_to_contigs"), "w") as out_f:
+    with open(os.path.join(asm_dir, 'read_maps/rawread_to_contigs'), 'w') as out_f:
         for bread in bread_to_areads:
 
             ctg_score = {}
@@ -145,7 +145,7 @@ def track_reads(n_core, base_dir, min_len, bestn, debug, silent, stream):
 
 def parse_args(argv):
     parser = argparse.ArgumentParser(description='scan the raw read overlap information to identify the best hit from the reads \
-to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/read_to_contig_map`',
+to the contigs with read_to_contig_map generated by `fc_get_read_ctg_map` in `2-asm-falcon/read_maps/get_ctg_read_map/read_to_contig_map`',
             formatter_class=argparse.ArgumentDefaultsHelpFormatter)
     parser.add_argument('--n_core', type=int, default=4,
                         help='number of processes used for for tracking reads; '
diff --git a/FALCON/falcon_kit/mains/run1.py b/FALCON/falcon_kit/mains/run1.py
index cdfb05b..8fd7352 100644
--- a/FALCON/falcon_kit/mains/run1.py
+++ b/FALCON/falcon_kit/mains/run1.py
@@ -1,9 +1,9 @@
 from .. import run_support as support
 from .. import bash, pype_tasks
 from ..util.system import only_these_symlinks
-from pypeflow.pwatcher_bridge import PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase
-from pypeflow.data import makePypeLocalFile, fn
-from pypeflow.task import PypeTask
+#from pypeflow.pwatcher_bridge import PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase
+#from pypeflow.data import makePypeLocalFile, fn
+#from pypeflow.task import PypeTask
 from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
         makePypeLocalFile, fn, PypeTask)
 import argparse
@@ -133,7 +133,6 @@ def main1(prog_name, input_config_fn, logger_config_fn=None):
         fc_run_logger.exception('Failed to parse config "{}".'.format(input_config_fn))
         raise
     input_fofn_plf = makePypeLocalFile(config['input_fofn'])
-    #Workflow = PypeProcWatcherWorkflow
     wf = PypeProcWatcherWorkflow(job_type=config['job_type'],
             job_queue=config['job_queue'],
             sge_option=config.get('sge_option', ''),
@@ -142,13 +141,12 @@ def main1(prog_name, input_config_fn, logger_config_fn=None):
     run(wf, config,
             os.path.abspath(input_config_fn),
             input_fofn_plf=input_fofn_plf,
-            setNumThreadAllowed=PypeProcWatcherWorkflow.setNumThreadAllowed)
+    )
 
 
 def run(wf, config,
         input_config_fn,
         input_fofn_plf,
-        setNumThreadAllowed,
         ):
     """
     Preconditions (for now):
diff --git a/FALCON/test/test_functional.py b/FALCON/test/test_functional.py
index 832ba1f..aafb6d4 100644
--- a/FALCON/test/test_functional.py
+++ b/FALCON/test/test_functional.py
@@ -30,7 +30,6 @@ def test_get_daligner_job_descriptions_small():
 example_se161 = os.path.join(thisdir, 'se161.sh')
 
 def test_get_daligner_job_descriptions_se161():
-    # when there is only 1 block, a special case
     example_HPCdaligner = open(example_se161)
     result = f.get_daligner_job_descriptions(
             example_HPCdaligner, 'raw_reads', single=False)
diff --git a/pypeFLOW/pypeflow/common.py b/pypeFLOW/pypeflow/common.py
deleted file mode 100644
index 4c12aef..0000000
--- a/pypeFLOW/pypeflow/common.py
+++ /dev/null
@@ -1,149 +0,0 @@
-
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin 
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeCommon: provide the common base classes and general module level utility functions
-            and constants for PypeEngine
-
-"""
-
-from urlparse import urlparse
-
-import rdflib
-try:
-    from rdflib import ConjunctiveGraph as Graph #work for rdflib-3.1.0
-    # need to install rdfextras for rdflib-3.0.0
-    rdflib.plugin.register('sparql', rdflib.query.Processor,
-                           'rdfextras.sparql.processor', 'Processor')
-    rdflib.plugin.register('sparql', rdflib.query.Result,
-                           'rdfextras.sparql.query', 'SPARQLQueryResult')
-except Exception:
-    from rdflib.Graph import ConjunctiveGraph as Graph #work for rdflib-2.4.2
-from rdflib import Namespace
-from rdflib import Literal
-from rdflib import URIRef
-
-from subprocess import Popen, PIPE
-import time
-
-pypeNS = Namespace("pype://v0.1/")
-
-class PypeError(Exception):
-    def __init__(self, msg):
-        Exception.__init__(self, msg)  # to make __repr__() show class name
-        self.msg = msg
-    def __str__(self):
-        return repr(self.msg)
-
-class NotImplementedError(PypeError):
-    pass
-
-class URLSchemeNotSupportYet(PypeError):
-    pass
-
-class PypeObject(object):
-
-    """ 
-
-    Base class for all PypeObjects
-
-    Every PypeObject should have an URL.
-    The instance attributes can be set by using keyword argument in __init__(). 
-
-    """
-
-    def __init__(self, URL, **attributes):
-
-        URLParseResult = urlparse(URL)
-        if URLParseResult.scheme not in self.__class__.supportedURLScheme:
-            raise URLSchemeNotSupportYet("%s is not supported yet" % URLParseResult.scheme )
-        else:
-            self.URL = URL
-            for k,v in attributes.iteritems():
-                if k not in self.__dict__:
-                    self.__dict__[k] = v
-
-    def _updateURL(self, newURL):
-        URLParseResult = urlparse(self.URL)
-        newURLParseResult = urlparse(newURL)
-        if URLParseResult.scheme != newURLParseResult.scheme:
-            raise PypeError, "the URL scheme can not be changed for obj %s" % self.URL
-        self.URL = newURL
-     
-    @property 
-    def _RDFGraph(self):
-        graph = Graph()
-
-        for k, v in self.__dict__.iteritems():
-            if k == "URL": continue
-            if k[0] == "_": continue
-            if hasattr(v, "URL"):
-                graph.add( ( URIRef(self.URL), pypeNS[k], URIRef(v.URL) ) )
-        return graph
-
-
-    
-    @property
-    def RDFXML(self):
-
-        """ 
-        RDF XML representation of the everything related to the PypeObject 
-        """
-
-        return self._RDFGraph.serialize() 
-
-def runShellCmd(args,**kwargs):
-
-    """ 
-    Utility function that runs a shell script command. 
-    It blocks until the command is finished. The return value
-    from the shell command is returned
-    """
-
-    p = Popen(args,**kwargs)
-    pStatus = None
-    while 1:
-        time.sleep(0.2)
-        pStatus = p.poll()
-        if pStatus != None:
-            break
-    return pStatus
-
-def runSgeSyncJob(args):
-
-    """ 
-    Utility function that runs a shell script with SGE. 
-    It blocks until the command is finished. The return value
-    from the shell command is returned
-    """
-
-    p = Popen(args)
-    pStatus = None
-    while 1:
-        time.sleep(0.1)
-        pStatus = p.poll()
-        if pStatus != None:
-            break
-    return pStatus
diff --git a/pypeFLOW/pypeflow/controller.py b/pypeFLOW/pypeflow/controller.py
deleted file mode 100644
index ce39e89..0000000
--- a/pypeFLOW/pypeflow/controller.py
+++ /dev/null
@@ -1,875 +0,0 @@
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin 
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeController: This module provides the PypeWorkflow that controlls how a workflow is excuted.
-
-"""
-import sys
-import datetime
-import multiprocessing
-import threading 
-import time 
-import traceback
-import logging
-import Queue
-from cStringIO import StringIO 
-from urlparse import urlparse
-
-# TODO(CD): When we stop using Python 2.5, use relative-imports and remove this dir from PYTHONPATH.
-from common import PypeError, PypeObject, Graph, URIRef, pypeNS
-from data import PypeDataObjectBase, PypeSplittableLocalFile
-from task import PypeTaskBase, PypeTaskCollection, PypeThreadTaskBase, getFOFNMapTasks
-from task import TaskInitialized, TaskDone, TaskFail
-
-logger = logging.getLogger(__name__)
-
-class TaskExecutionError(PypeError):
-    pass
-class TaskTypeError(PypeError):
-    pass
-class TaskFailureError(PypeError):
-    pass
-class LateTaskFailureError(PypeError):
-    pass
-
-class PypeNode(object):
-    """ 
-    Representing a node in the dependence DAG. 
-    """
-
-    def __init__(self, obj):
-        self.obj = obj
-        self._outNodes = set()
-        self._inNodes = set()
-
-    def addAnOutNode(self, obj):
-        self._outNodes.add(obj)
-        
-    def addAnInNode(self, obj):
-        self._inNodes.add(obj)
-
-    def removeAnOutNode(self, obj):
-        self._outNodes.remove(obj)
-
-    def removeAnInNode(self, obj):
-        self._inNodes.remove(obj)
-
-    @property
-    def inDegree(self):
-        return len(self._inNodes)
-
-    @property
-    def outDegree(self):
-        return len(self._outNodes)
-    
-    @property
-    def depth(self):
-        if self.inDegree == 0:
-            return 1
-        return 1 + max([ node.depth for node in self._inNodes ])
-
-class PypeGraph(object):
-    """ 
-    Representing a dependence DAG with PypeObjects. 
-    """
-
-    def __init__(self, RDFGraph, subGraphNodes=None):
-        """
-        Construct an internal DAG with PypeObject given an RDF graph.
-        A sub-graph can be constructed if subGraphNodes is not "None"
-        """
-
-        self._RDFGraph = RDFGraph
-        self._allEdges = set()
-        self._allNodes = set()
-        self.url2Node ={}
-
-        for row in self._RDFGraph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)):
-            if subGraphNodes != None:
-                if row[0] not in subGraphNodes: continue
-                if row[1] not in subGraphNodes: continue
-            
-            sURL, oURL = str(row[0]), str(row[1])
-            
-            self.url2Node[sURL] = self.url2Node.get( sURL, PypeNode(str(row[0])) )
-            self.url2Node[oURL] = self.url2Node.get( oURL, PypeNode(str(row[1])) )
-
-            n1 = self.url2Node[oURL]
-            n2 = self.url2Node[sURL]
-            
-            n1.addAnOutNode(n2)
-            n2.addAnInNode(n1)
-            
-            anEdge = (n1, n2)
-            self._allNodes.add( n1 )
-            self._allNodes.add( n2 )
-            self._allEdges.add( anEdge )
-            
-    def __getitem__(self, url):
-        """PypeGraph["URL"] ==> PypeNode"""
-        return self.url2Node[url]
-
-    def tSort(self): #return a topoloical sort node list
-        """
-        Output topological sorted list of the graph element. 
-        It raises a TeskExecutionError if a circle is detected.
-        """
-        edges = self._allEdges.copy()
-        
-        S = [x for x in self._allNodes if x.inDegree == 0]
-        L = []
-        while len(S) != 0:
-            n = S.pop()
-            L.append(n)
-            outNodes = n._outNodes.copy()
-            for m in outNodes:
-                edges.remove( (n, m) )
-                n.removeAnOutNode(m)
-                m.removeAnInNode(n)
-                if m.inDegree == 0:
-                    S.append(m)
-        
-        if len(edges) != 0:
-            raise TaskExecutionError(" Circle detectd in the dependency graph ")
-        else:
-            return [x.obj for x in L]
-                    
-class PypeWorkflow(PypeObject):
-    """ 
-    Representing a PypeWorkflow. PypeTask and PypeDataObjects can be added
-    into the workflow and executed through the instanct methods.
-
-    >>> import os, time 
-    >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
-    >>> from pypeflow.task import *
-    >>> try:
-    ...     os.makedirs("/tmp/pypetest")
-    ...     _ = os.system("rm -f /tmp/pypetest/*")
-    ... except Exception:
-    ...     pass
-    >>> time.sleep(1)
-    >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False)
-    >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False)
-    >>> @PypeTask(outputDataObjs={"test_out":fout},
-    ...           inputDataObjs={"test_in":fin},
-    ...           parameters={"a":'I am "a"'}, **{"b":'I am "b"'})
-    ... def test(self):
-    ...     print test.test_in.localFileName
-    ...     print test.test_out.localFileName
-    ...     os.system( "touch %s" % fn(test.test_out) )
-    ...     pass
-    >>> os.system( "touch %s" %  (fn(fin))  )
-    0
-    >>> from pypeflow.controller import PypeWorkflow
-    >>> wf = PypeWorkflow()
-    >>> wf.addTask(test)
-    >>> def finalize(self):
-    ...     def f():
-    ...         print "in finalize:", self._status
-    ...     return f
-    >>> test.finalize = finalize(test)  # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overriden by subclasses. 
-    >>> wf.refreshTargets( objs = [fout] )
-    /tmp/pypetest/testfile_in
-    /tmp/pypetest/testfile_out
-    in finalize: done
-    True
-    """
-
-    supportedURLScheme = ["workflow"]
-
-    def __init__(self, URL = None, **attributes ):
-
-        if URL == None:
-            URL = "workflow://" + __file__+"/%d" % id(self)
-
-        self._pypeObjects = {}
-
-        PypeObject.__init__(self, URL, **attributes)
-
-        self._referenceRDFGraph = None #place holder for a reference RDF
-
-        
-    def addObject(self, obj):
-        self.addObjects([obj])
-
-    def addObjects(self, objs):
-        """
-        Add data objects into the workflow. One can add also task object to the workflow using this method for
-        non-threaded workflow.
-        """
-        for obj in objs:
-            if obj.URL in self._pypeObjects:
-                if id(self._pypeObjects[obj.URL]) != id(obj):
-                    raise PypeError, "Add different objects with the same URL %s" % obj.URL
-                else:
-                    continue
-            self._pypeObjects[obj.URL] = obj
-
-    def addTask(self, taskObj):
-        self.addTasks([taskObj])
-
-
-    def addTasks(self, taskObjs):
-        """
-        Add tasks into the workflow. The dependent input and output data objects are added automatically too. 
-        It sets the message queue used for communicating between the task thread and the main thread. One has
-        to use addTasks() or addTask() to add task objects to a threaded workflow.
-        """
-        for taskObj in taskObjs:
-            if isinstance(taskObj, PypeTaskCollection):
-                for subTaskObj in taskObj.getTasks() + taskObj.getScatterGatherTasks():
-                    self.addObjects(subTaskObj.inputDataObjs.values())
-                    self.addObjects(subTaskObj.outputDataObjs.values())
-                    self.addObjects(subTaskObj.mutableDataObjs.values())
-                    self.addObject(subTaskObj)
-
-            else:
-                for dObj in taskObj.inputDataObjs.values() +\
-                            taskObj.outputDataObjs.values() +\
-                            taskObj.mutableDataObjs.values() :
-                    if isinstance(dObj, PypeSplittableLocalFile):
-                        self.addObjects([dObj._completeFile])
-                    self.addObjects([dObj])
-
-                self.addObject(taskObj)
-
-            
-    def removeTask(self, taskObj):
-        self.removeTasks([taskObj])
-        
-    def removeTasks(self, taskObjs ):
-        """
-        Remove tasks from the workflow.
-        """
-        self.removeObjects(taskObjs)
-            
-    def removeObjects(self, objs):
-        """
-        Remove objects from the workflow. If the object cannot be found, a PypeError is raised.
-        """
-        for obj in objs:
-            if obj.URL in self._pypeObjects:
-                del self._pypeObjects[obj.URL]
-            else:
-                raise PypeError, "Unable to remove %s from the graph. (Object not found)" % obj.URL
-
-    def updateURL(self, oldURL, newURL):
-        obj = self._pypeObjects[oldURL]
-        obj._updateURL(newURL)
-        self._pypeObjects[newURL] = obj
-        del self._pypeObjects[oldURL]
-
-
-            
-    @property
-    def _RDFGraph(self):
-        # expensive to recompute
-        graph = Graph()
-        for URL, obj in self._pypeObjects.iteritems():
-            for s,p,o in obj._RDFGraph:
-                graph.add( (s,p,o) )
-        return graph
-
-    def setReferenceRDFGraph(self, fn):
-        self._referenceRDFGraph = Graph()
-        self._referenceRDFGraph.load(fn)
-        refMD5s = self._referenceRDFGraph.subject_objects(pypeNS["codeMD5digest"])
-        for URL, md5digest in refMD5s:
-            obj = self._pypeObjects[str(URL)]
-            obj.setReferenceMD5(md5digest)
-
-    def _graphvizDot(self, shortName=False):
-        graph = self._RDFGraph
-        dotStr = StringIO()
-        shapeMap = {"file":"box", "state":"box", "task":"component"}
-        colorMap = {"file":"yellow", "state":"cyan", "task":"green"}
-        dotStr.write( 'digraph "%s" {\n rankdir=LR;' % self.URL)
-        for URL in self._pypeObjects.keys():
-            URLParseResult = urlparse(URL)
-            if URLParseResult.scheme not in shapeMap:
-                continue
-            else:
-                shape = shapeMap[URLParseResult.scheme]
-                color = colorMap[URLParseResult.scheme]
-
-                s = URL
-                if shortName == True:
-                    s = URLParseResult.scheme + "://..." + URLParseResult.path.split("/")[-1] 
-                dotStr.write( '"%s" [shape=%s, fillcolor=%s, style=filled];\n' % (s, shape, color))
-
-        for row in graph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)):
-            s, o = row
-            if shortName == True:
-                    s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] 
-                    o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] 
-            dotStr.write( '"%s" -> "%s";\n' % (o, s))
-        for row in graph.query('SELECT ?s ?o WHERE {?s pype:hasMutable ?o . }', initNs=dict(pype=pypeNS)):
-            s, o = row
-            if shortName == True:
-                    s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] 
-                    o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] 
-            dotStr.write( '"%s" -- "%s" [arrowhead=both, style=dashed ];\n' % (s, o))
-        dotStr.write ("}")
-        return dotStr.getvalue()
-
-    @property
-    def graphvizDot(self):
-        return self._graphvizDot()
-
-    @property
-    def graphvizShortNameDot(self):
-        return self._graphvizDot(shortName = True)
-
-    @property
-    def makeFileStr(self):
-        """
-        generate a string that has the information of the execution dependency in
-        a "Makefile" like format. It can be written into a "Makefile" and
-        executed by "make".
-        """
-        for URL in self._pypeObjects.keys():
-            URLParseResult = urlparse(URL)
-            if URLParseResult.scheme != "task": continue
-            taskObj = self._pypeObjects[URL]
-            if not hasattr(taskObj, "script"):
-                raise TaskTypeError("can not convert non shell script based workflow to a makefile") 
-        makeStr = StringIO()
-        for URL in self._pypeObjects.keys():
-            URLParseResult = urlparse(URL)
-            if URLParseResult.scheme != "task": continue
-            taskObj = self._pypeObjects[URL]
-            inputFiles = taskObj.inputDataObjs
-            outputFiles = taskObj.outputDataObjs
-            #for oStr in [o.localFileName for o in outputFiles.values()]:
-            if 1:
-                oStr = " ".join( [o.localFileName for o in outputFiles.values()])
-
-                iStr = " ".join([i.localFileName for i in inputFiles.values()])
-                makeStr.write( "%s:%s\n" % ( oStr, iStr ) )
-                makeStr.write( "\t%s\n\n" % taskObj.script )
-        makeStr.write("all: %s" %  " ".join([o.localFileName for o in outputFiles.values()]) )
-        return makeStr.getvalue()
-
-    @staticmethod
-    def getSortedURLs(rdfGraph, objs):
-        if len(objs) != 0:
-            connectedPypeNodes = set()
-            for obj in objs:
-                if isinstance(obj, PypeSplittableLocalFile):
-                    obj = obj._completeFile
-                for x in rdfGraph.transitive_objects(URIRef(obj.URL), pypeNS["prereq"]):
-                    connectedPypeNodes.add(x)
-            tSortedURLs = PypeGraph(rdfGraph, connectedPypeNodes).tSort( )
-        else:
-            tSortedURLs = PypeGraph(rdfGraph).tSort( )
-        return tSortedURLs
-
-    def refreshTargets(self, objs = [], callback = (None, None, None) ):
-        """
-        Execute the DAG to reach all objects in the "objs" argument.
-        """
-        tSortedURLs = self.getSortedURLs(self._RDFGraph, objs)
-        for URL in tSortedURLs:
-            obj = self._pypeObjects[URL]
-            if not isinstance(obj, PypeTaskBase):
-                continue
-            else:
-                obj()
-                obj.finalize()
-        self._runCallback(callback)
-        return True
-
-    def _runCallback(self, callback = (None, None, None ) ):
-        if callback[0] != None and callable(callback[0]):
-            argv = []
-            kwargv = {}
-            if callback[1] != None and isinstance( callback[1], type(list()) ):
-                argv = callback[1]
-            else:
-                raise TaskExecutionError( "callback argument type error") 
-
-            if callback[2] != None and isinstance( callback[1], type(dict()) ):
-                kwargv = callback[2]
-            else:
-                raise TaskExecutionError( "callback argument type error") 
-
-            callback[0](*argv, **kwargv)
-
-        elif callback[0] != None:
-            raise TaskExecutionError( "callback is not callable") 
-    
-    @property
-    def dataObjects( self ):
-        return [ o for o in self._pypeObjects.values( ) if isinstance( o, PypeDataObjectBase )]
-    
-    @property
-    def tasks( self ):
-        return [ o for o in self._pypeObjects.values( ) if isinstance( o, PypeTaskBase )]
-
-    @property
-    def inputDataObjects(self):
-        graph = self._RDFGraph
-        inputObjs = []
-        for obj in self.dataObjects:
-            r = graph.query('SELECT ?o WHERE {<%s> pype:prereq ?o .  }' % obj.URL, initNs=dict(pype=pypeNS))
-            if len(r) == 0:
-                inputObjs.append(obj)
-        return inputObjs
-     
-    @property
-    def outputDataObjects(self):
-        graph = self._RDFGraph
-        outputObjs = []
-        for obj in self.dataObjects:
-            r = graph.query('SELECT ?s WHERE {?s pype:prereq <%s> .  }' % obj.URL, initNs=dict(pype=pypeNS))
-            if len(r) == 0:
-                outputObjs.append(obj)
-        return outputObjs
-
-def PypeMPWorkflow(URL = None, **attributes):
-    """Factory for the workflow using multiprocessing.
-    """
-    th = _PypeProcsHandler()
-    mq = multiprocessing.Queue()
-    se = multiprocessing.Event()
-    return _PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
-            attributes=attributes)
-
-def PypeThreadWorkflow(URL = None, **attributes):
-    """Factory for the workflow using threading.
-    """
-    th = _PypeThreadsHandler()
-    mq = Queue.Queue()
-    se = threading.Event()
-    return _PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
-            attributes=attributes)
-
-class _PypeConcurrentWorkflow(PypeWorkflow):
-    """ 
-    Representing a PypeWorkflow that can excute tasks concurrently using threads. It
-    assume all tasks block until they finish. PypeTask and PypeDataObjects can be added
-    into the workflow and executed through the instance methods.
-    """
-
-    CONCURRENT_THREAD_ALLOWED = 16
-    MAX_NUMBER_TASK_SLOT = CONCURRENT_THREAD_ALLOWED
-
-    @classmethod
-    def setNumThreadAllowed(cls, nT, nS):
-        """
-        Override the default number of threads used to run the tasks with this method.
-        """
-        cls.CONCURRENT_THREAD_ALLOWED = nT
-        cls.MAX_NUMBER_TASK_SLOT = nS
-
-    def __init__(self, URL, thread_handler, messageQueue, shutdown_event, attributes):
-        PypeWorkflow.__init__(self, URL, **attributes )
-        self.thread_handler = thread_handler
-        self.messageQueue = messageQueue
-        self.shutdown_event = shutdown_event
-        self.jobStatusMap = dict()
-
-    def addTasks(self, taskObjs):
-        """
-        Add tasks into the workflow. The dependent input and output data objects are added automatically too. 
-        It sets the message queue used for communicating between the task thread and the main thread. One has
-        to use addTasks() or addTask() to add task objects to a threaded workflow.
-        """
-        for taskObj in taskObjs:
-            if isinstance(taskObj, PypeTaskCollection):
-                for subTaskObj in taskObj.getTasks() + taskObj.getScatterGatherTasks():
-                    if not isinstance(subTaskObj, PypeThreadTaskBase):
-                        raise TaskTypeError("Only PypeThreadTask can be added into a PypeThreadWorkflow. The task object %s has type %s " % (subTaskObj.URL, repr(type(subTaskObj))))
-                    subTaskObj.setMessageQueue(self.messageQueue)
-                    subTaskObj.setShutdownEvent(self.shutdown_event)
-            else:
-                if not isinstance(taskObj, PypeThreadTaskBase):
-                    raise TaskTypeError("Only PypeThreadTask can be added into a PypeThreadWorkflow. The task object has type %s " % repr(type(taskObj)))
-                taskObj.setMessageQueue(self.messageQueue)
-                taskObj.setShutdownEvent(self.shutdown_event)
-
-        PypeWorkflow.addTasks(self, taskObjs)
-
-    def refreshTargets(self, objs=None,
-                       callback=(None, None, None),
-                       updateFreq=None,
-                       exitOnFailure=True):
-        if objs is None:
-            objs = []
-        task2thread = {}
-        try:
-            rtn = self._refreshTargets(task2thread, objs = objs, callback = callback, updateFreq = updateFreq, exitOnFailure = exitOnFailure)
-            return rtn
-        except:
-            tb = traceback.format_exc()
-            self.shutdown_event.set()
-            logger.critical("Any exception caught in RefreshTargets() indicates an unrecoverable error. Shutting down...")
-            shutdown_msg = """
-            "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
-            "! Please wait for all threads / processes to terminate !"
-            "! Also, maybe use 'ps' or 'qstat' to check all threads,!"
-            "! processes and/or jobs are terminated cleanly.        !"
-            "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
-            """
-            import warnings
-            warnings.warn(shutdown_msg)
-            th = self.thread_handler
-            threads = list(task2thread.values())
-            logger.warning("#tasks=%d, #alive=%d" %(len(threads), th.alive(threads)))
-            try:
-                while th.alive(threads):
-                    th.join(threads, 2)
-                    logger.warning("Now, #tasks=%d, #alive=%d" %(len(threads), th.alive(threads)))
-            except (KeyboardInterrupt, SystemExit) as e:
-                logger.debug("Interrupted while joining threads (while handling exception from RefreshTargets()). Trying to terminate any working processes before final exit.")
-                th.notifyTerminate(threads)
-            raise Exception('Caused by:\n' + tb)
-
-
-    def _refreshTargets(self, task2thread, objs,
-                        callback,
-                        updateFreq,
-                        exitOnFailure):
-        thread = self.thread_handler.create
-
-        rdfGraph = self._RDFGraph # expensive to recompute, should not change during execution
-        tSortedURLs = self.getSortedURLs(rdfGraph, objs)
-
-        sortedTaskList = [ (str(u), self._pypeObjects[u], self._pypeObjects[u].getStatus()) for u in tSortedURLs
-                            if isinstance(self._pypeObjects[u], PypeTaskBase) ]
-        self.jobStatusMap = dict( ( (t[0], t[2]) for t in sortedTaskList ) )
-        logger.info("# of tasks in complete graph: %d" %(
-            len(sortedTaskList),
-            ))
-
-        prereqJobURLMap = {}
-
-        for URL, taskObj, tStatus in sortedTaskList:
-            prereqJobURLs = [str(u) for u in rdfGraph.transitive_objects(URIRef(URL), pypeNS["prereq"])
-                                    if isinstance(self._pypeObjects[str(u)], PypeTaskBase) and str(u) != URL ]
-
-            prereqJobURLMap[URL] = prereqJobURLs
-
-            logger.debug("Determined prereqs for %r to be %r" % (URL, ", ".join(prereqJobURLs)))
-
-            if taskObj.nSlots > self.MAX_NUMBER_TASK_SLOT:
-                raise TaskExecutionError("%s requests more %s task slots which is more than %d task slots allowed" %
-                                          (str(URL), taskObj.nSlots, self.MAX_NUMBER_TASK_SLOT) )
-
-        sleep_time = 0
-        nSubmittedJob = 0
-        usedTaskSlots = 0
-        loopN = 0
-        lastUpdate = None
-        activeDataObjs = set() #keep a set of output data object. repeats are illegal.
-        mutableDataObjs = set() #keep a set of mutable data object. a task will be delayed if a running task has the same output.
-        updatedTaskURLs = set() #to avoid extra stat-calls
-        failedJobCount = 0
-        succeededJobCount = 0
-        jobsReadyToBeSubmitted = []
-
-        while 1:
-
-            loopN += 1
-            if not ((loopN - 1) & loopN):
-                # exponential back-off for logging
-                logger.info("tick: %d, #updatedTasks: %d, sleep_time=%f" %(loopN, len(updatedTaskURLs), sleep_time))
-
-            for URL, taskObj, tStatus in sortedTaskList:
-                if self.jobStatusMap[URL] != TaskInitialized:
-                    continue
-                logger.debug(" #outputDataObjs: %d; #mutableDataObjs: %d" %(
-                    len(taskObj.outputDataObjs.values()),
-                    len(taskObj.mutableDataObjs.values()),
-                    ))
-                prereqJobURLs = prereqJobURLMap[URL]
-
-                logger.debug(' preqs of %s:' %URL)
-                for u in prereqJobURLs:
-                    logger.debug('  %s: %s' %(self.jobStatusMap[u], u))
-                if any(self.jobStatusMap[u] != "done" for u in prereqJobURLs):
-                    # Note: If self.jobStatusMap[u] raises, then the sorting was wrong.
-                    #logger.debug('Prereqs not done! %s' %URL)
-                    continue
-                # Check for mutable collisions; delay task if any.
-                outputCollision = False
-                for dataObj in taskObj.mutableDataObjs.values():
-                    for fromTaskObjURL, mutableDataObjURL in mutableDataObjs:
-                        if dataObj.URL == mutableDataObjURL and taskObj.URL != fromTaskObjURL:
-                            logger.debug("mutable output collision detected for data object %r betw %r and %r" %(
-                                dataObj, dataObj.URL, mutableDataObjURL))
-                            outputCollision = True
-                            break
-                if outputCollision:
-                    continue
-                # Check for illegal collisions.
-                if len(activeDataObjs) < 100:
-                    # O(n^2) on active tasks, but pretty fast.
-                    for dataObj in taskObj.outputDataObjs.values():
-                        for fromTaskObjURL, activeDataObjURL in activeDataObjs:
-                            if dataObj.URL == activeDataObjURL and taskObj.URL != fromTaskObjURL:
-                                raise Exception("output collision detected for data object %r betw %r and %r" %(
-                                    dataObj, dataObj.URL, activeDataObjURL))
-                # We use 'updatedTaskURLs' to short-circuit 'isSatisfied()', to avoid many stat-calls.
-                # Note: Sorting should prevent FileNotExistError in isSatisfied().
-                if not (set(prereqJobURLs) & updatedTaskURLs) and taskObj.isSatisfied():
-                    #taskObj.setStatus(pypeflow.task.TaskDone) # Safe b/c thread is not running yet, and all prereqs are done.
-                    logger.info(' Skipping already done task: %s' %(URL,))
-                    logger.debug(' (Status was %s)' %(self.jobStatusMap[URL],))
-                    taskObj.setStatus(TaskDone) # to avoid re-stat on subsequent call to refreshTargets()
-                    self.jobStatusMap[str(URL)] = TaskDone # to avoid re-stat on *this* call
-                    successfullTask = self._pypeObjects[URL]
-                    successfullTask.finalize()
-                    continue
-                self.jobStatusMap[str(URL)] = "ready" # in case not all ready jobs are given threads immediately, to avoid re-stat
-                jobsReadyToBeSubmitted.append( (URL, taskObj) )
-                for dataObj in taskObj.outputDataObjs.values():
-                    logger.debug( "add active data obj: %s" %(dataObj,))
-                    activeDataObjs.add( (taskObj.URL, dataObj.URL) )
-                for dataObj in taskObj.mutableDataObjs.values():
-                    logger.debug( "add mutable data obj: %s" %(dataObj,))
-                    mutableDataObjs.add( (taskObj.URL, dataObj.URL) )
-
-            logger.debug( "#jobsReadyToBeSubmitted: %d" % len(jobsReadyToBeSubmitted) )
-
-            numAliveThreads = self.thread_handler.alive(task2thread.values())
-            logger.debug( "Total # of running threads: %d; alive tasks: %d; sleep=%f, loopN=%d" % (
-                threading.activeCount(), numAliveThreads, sleep_time, loopN) )
-            if numAliveThreads == 0 and len(jobsReadyToBeSubmitted) == 0 and self.messageQueue.empty(): 
-                #TODO: better job status detection. messageQueue should be empty and all return condition should be "done", or "fail"
-                logger.info( "_refreshTargets() finished with no thread running and no new job to submit" )
-                for URL in task2thread:
-                    assert self.jobStatusMap[str(URL)] in ("done", "fail"), "status(%s)==%r" %(
-                            URL, self.jobStatusMap[str(URL)])
-                break # End of loop!
-
-            while jobsReadyToBeSubmitted:
-                URL, taskObj  = jobsReadyToBeSubmitted[0]
-                numberOfEmptySlot = self.MAX_NUMBER_TASK_SLOT - usedTaskSlots 
-                logger.debug( "#empty_slots = %d/%d; #jobs_ready=%d" % (numberOfEmptySlot, self.MAX_NUMBER_TASK_SLOT, len(jobsReadyToBeSubmitted)))
-                if numberOfEmptySlot >= taskObj.nSlots and numAliveThreads < self.CONCURRENT_THREAD_ALLOWED:
-                    t = thread(target = taskObj)
-                    t.start()
-                    task2thread[URL] = t
-                    nSubmittedJob += 1
-                    usedTaskSlots += taskObj.nSlots
-                    numAliveThreads += 1
-                    self.jobStatusMap[URL] = "submitted"
-                    # Note that we re-submit completed tasks whenever refreshTargets() is called.
-                    logger.debug("Submitted %r" %URL)
-                    logger.debug(" Details: %r" %taskObj)
-                    jobsReadyToBeSubmitted.pop(0)
-                else:
-                    break
-
-            time.sleep(sleep_time)
-            if updateFreq != None:
-                elapsedSeconds = updateFreq if lastUpdate==None else (datetime.datetime.now()-lastUpdate).seconds
-                if elapsedSeconds >= updateFreq:
-                    self._update( elapsedSeconds )
-                    lastUpdate = datetime.datetime.now( )
-
-            sleep_time = sleep_time + 0.1 if (sleep_time < 1) else 1
-            while not self.messageQueue.empty():
-                sleep_time = 0 # Wait very briefly while messages are coming in.
-                URL, message = self.messageQueue.get()
-                updatedTaskURLs.add(URL)
-                self.jobStatusMap[str(URL)] = message
-                logger.debug("message for %s: %r" %(URL, message))
-
-                if message in ["done"]:
-                    successfullTask = self._pypeObjects[str(URL)]
-                    nSubmittedJob -= 1
-                    usedTaskSlots -= successfullTask.nSlots
-                    logger.info("Success (%r). Joining %r..." %(message, URL))
-                    task2thread[URL].join(timeout=10)
-                    #del task2thread[URL]
-                    succeededJobCount += 1
-                    successfullTask.finalize()
-                    for o in successfullTask.outputDataObjs.values():
-                        activeDataObjs.remove( (successfullTask.URL, o.URL) )
-                    for o in successfullTask.mutableDataObjs.values():
-                        mutableDataObjs.remove( (successfullTask.URL, o.URL) )
-                elif message in ["fail"]:
-                    failedTask = self._pypeObjects[str(URL)]
-                    nSubmittedJob -= 1
-                    usedTaskSlots -= failedTask.nSlots
-                    logger.info("Failure (%r). Joining %r..." %(message, URL))
-                    task2thread[URL].join(timeout=10)
-                    #del task2thread[URL]
-                    failedJobCount += 1
-                    failedTask.finalize()
-                    for o in failedTask.outputDataObjs.values():
-                        activeDataObjs.remove( (failedTask.URL, o.URL) )
-                    for o in failedTask.mutableDataObjs.values():
-                        mutableDataObjs.remove( (failedTask.URL, o.URL) )
-                elif message in ["started, runflag: 1"]:
-                    logger.info("Queued %s ..." %repr(URL))
-                elif message in ["started, runflag: 0"]:
-                    logger.debug("Queued %s (already completed) ..." %repr(URL))
-                    raise Exception('It should not be possible to start an already completed task.')
-                else:
-                    logger.warning("Got unexpected message %r from URL %r." %(message, URL))
-
-            for u,s in sorted(self.jobStatusMap.items()):
-                logger.debug("task status: %r, %r, used slots: %d" % (str(u),str(s), self._pypeObjects[str(u)].nSlots))
-
-            if failedJobCount != 0 and (exitOnFailure or succeededJobCount == 0):
-                raise TaskFailureError("Counted %d failure(s) with 0 successes so far." %failedJobCount)
-
-
-        for u,s in sorted(self.jobStatusMap.items()):
-            logger.debug("task status: %s, %r" % (u, s))
-
-        self._runCallback(callback)
-        if failedJobCount != 0:
-            # Slightly different exception when !exitOnFailure.
-            raise LateTaskFailureError("Counted a total of %d failure(s) and %d success(es)." %(
-                failedJobCount, succeededJobCount))
-        return True #TODO: There is no reason to return anything anymore.
-    
-    def _update(self, elapsed):
-        """Can be overridden to provide timed updates during execution"""
-        pass
-
-    def _graphvizDot(self, shortName=False):
-
-        graph = self._RDFGraph
-        dotStr = StringIO()
-        shapeMap = {"file":"box", "state":"box", "task":"component"}
-        colorMap = {"file":"yellow", "state":"cyan", "task":"green"}
-        dotStr.write( 'digraph "%s" {\n rankdir=LR;' % self.URL)
-
-
-        for URL in self._pypeObjects.keys():
-            URLParseResult = urlparse(URL)
-            if URLParseResult.scheme not in shapeMap:
-                continue
-            else:
-                shape = shapeMap[URLParseResult.scheme]
-                color = colorMap[URLParseResult.scheme]
-
-                s = URL
-                if shortName == True:
-                    s = URLParseResult.scheme + "://..." + URLParseResult.path.split("/")[-1] 
-
-                if URLParseResult.scheme == "task":
-                    jobStatus = self.jobStatusMap.get(URL, None)
-                    if jobStatus != None:
-                        if jobStatus == "fail":
-                            color = 'red'
-                        elif jobStatus == "done":
-                            color = 'green'
-                    else:
-                        color = 'white'
-                    
-                dotStr.write( '"%s" [shape=%s, fillcolor=%s, style=filled];\n' % (s, shape, color))
-
-        for row in graph.query('SELECT ?s ?o WHERE {?s pype:prereq ?o . }', initNs=dict(pype=pypeNS)):
-            s, o = row
-            if shortName == True:
-                s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] 
-                o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] 
-            dotStr.write( '"%s" -> "%s";\n' % (o, s))
-        for row in graph.query('SELECT ?s ?o WHERE {?s pype:hasMutable ?o . }', initNs=dict(pype=pypeNS)):
-            s, o = row
-            if shortName == True:
-                    s = urlparse(s).scheme + "://..." + urlparse(s).path.split("/")[-1] 
-                    o = urlparse(o).scheme + "://..." + urlparse(o).path.split("/")[-1] 
-            dotStr.write( '"%s" -- "%s" [arrowhead=both, style=dashed ];\n' % (s, o))
-        dotStr.write ("}")
-        return dotStr.getvalue()
-
-# For a class-method:
-PypeThreadWorkflow.setNumThreadAllowed = _PypeConcurrentWorkflow.setNumThreadAllowed
-PypeMPWorkflow.setNumThreadAllowed = _PypeConcurrentWorkflow.setNumThreadAllowed
-
-class _PypeThreadsHandler(object):
-    """Stateless method delegator, for injection.
-    """
-    def create(self, target):
-        thread = threading.Thread(target=target)
-        thread.daemon = True  # so it will terminate on exit
-        return thread
-    def alive(self, threads):
-        return sum(thread.is_alive() for thread in threads)
-    def join(self, threads, timeout):
-        then = datetime.datetime.now()
-        for thread in threads:
-            assert thread is not threading.current_thread()
-            if thread.is_alive():
-                to = max(0, timeout - (datetime.datetime.now() - then).seconds)
-                thread.join(to)
-    def notifyTerminate(self, threads):
-        """Assume these are daemon threads.
-        We will attempt to join them all quickly, but non-daemon threads may
-        eventually block the program from quitting.
-        """
-        self.join(threads, 1)
-
-class _PypeProcsHandler(object):
-    """Stateless method delegator, for injection.
-    """
-    def create(self, target):
-        proc = multiprocessing.Process(target=target)
-        return proc
-    def alive(self, procs):
-        return sum(proc.is_alive() for proc in procs)
-    def join(self, procs, timeout):
-        then = datetime.datetime.now()
-        for proc in procs:
-            if proc.is_alive():
-                proc.join((datetime.datetime.now() - then).seconds)
-    def notifyTerminate(self, procs):
-        """This can orphan sub-processes.
-        """
-        for proc in procs:
-            if proc.is_alive():
-                proc.terminate()
-
-
-def defaultOutputTemplate(fn):
-    return fn + ".out"
-
-def applyFOFN( task_fun = None, 
-               fofonFileName = None, 
-               outTemplateFunc = defaultOutputTemplate,
-               nproc = 8 ):
-               
-    tasks = getFOFNMapTasks( FOFNFileName = fofonFileName, 
-                             outTemplateFunc = outTemplateFunc, 
-                             TaskType=PypeThreadTaskBase,
-                             parameters = dict(nSlots = 1))( task_fun )
-
-    wf = PypeThreadWorkflow()
-    wf.CONCURRENT_THREAD_ALLOWED = nproc 
-    wf.MAX_NUMBER_TASK_SLOT = nproc
-    wf.addTasks(tasks)
-    wf.refreshTargets(exitOnFailure=False)
-
-
-if __name__ == "__main__":
-    import doctest
-    doctest.testmod()
diff --git a/pypeFLOW/pypeflow/data.py b/pypeFLOW/pypeflow/data.py
deleted file mode 100644
index 43a5dc2..0000000
--- a/pypeFLOW/pypeflow/data.py
+++ /dev/null
@@ -1,315 +0,0 @@
-
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin 
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeData: This module defines the general interface and class for PypeData Objects. 
-
-"""
-
-
-from urlparse import urlparse, urljoin
-import platform
-import os, shutil
-from common import pypeNS, PypeObject, PypeError, NotImplementedError
-import logging
-    
-logger = logging.getLogger(__name__)
-
-class FileNotExistError(PypeError):
-    pass
-
-class TypeMismatchError(PypeError):
-    pass
-
-def fn(obj):
-    return obj.localFileName
-
-class PypeDataObjectBase(PypeObject):
-    
-    """ 
-    Represent the common interface for a PypeData object.
-    """
-
-    def __init__(self, URL, **attributes):
-        PypeObject.__init__(self, URL, **attributes)
-        self.verification = []
-        self._mutatble = False
-
-    @property
-    def timeStamp(self):
-        raise NotImplementedError, self.__expr__()
-
-    @property
-    def isMutable(self):
-        return self._mutatble
-
-    @property
-    def exists(self):
-        raise NotImplementedError
-
-    def addVerifyFunction( self, verifyFunction ):
-        self.verification.append( verifyFunction )
-
-    def __str__( self ):
-        return self.URL
-
-    def _updateURL(self, newURL):
-        super(PypeDataObjectBase, self)._updateURL(newURL)
-        self._updatePath()
-
-    def _updatePath(self):
-        URLParseResult = urlparse(self.URL)
-        self.localFileName = URLParseResult.path
-        self._path = self.localFileName #for local file, _path is the same as full local file name
-
-class PypeLocalFile(PypeDataObjectBase):
-
-    """ 
-    Represent a PypeData object that can be accessed as a file in a local
-    filesystem.
-
-    >>> f = PypeLocalFile("file://localhost/test.txt")
-    >>> f.localFileName == "/test.txt"
-    True
-    >>> fn(f)
-    '/test.txt'
-    >>> f = PypeLocalFile("file://localhost/test.txt", False, isFasta = True)
-    >>> f.isFasta == True
-    True
-    """
-    def __repr__(self): return "PypeLocalFile(%r, %r)" %(self.URL, self._path)
-    supportedURLScheme = ["file", "state"]
-    def __init__(self, URL, readOnly = False, **attributes):
-        PypeDataObjectBase.__init__(self, URL, **attributes)
-        self._updatePath()
-        self.readOnly = readOnly
-        self._mutable = attributes.get("mutable", False)
-
-    @property
-    def timeStamp(self):
-        if not os.path.exists(self.localFileName):
-            logger.warning('failed: os.path.exists("%s")\n%s'%(
-                self.localFileName,repr(os.listdir(os.path.dirname(self.localFileName)))))
-            raise FileNotExistError("No such file:%r on %r" % (self.localFileName, platform.node()) )
-        return os.stat(self.localFileName).st_mtime 
-
-    @property
-    def exists(self):
-        return os.path.exists(self.localFileName)
-    
-    def verify(self):
-        logger.debug("Verifying contents of %s" % self.URL)
-        # Get around the NFS problem
-        os.listdir(os.path.dirname(self.path)) 
-        
-        errors = [ ]
-        for verifyFn in self.verification:
-            try:
-                errors.extend( verifyFn(self.path) )
-            except Exception, e:
-                errors.append( str(e) )
-        if len(errors) > 0:
-            for e in errors:
-                logger.error(e)
-        return errors
-    
-    @property
-    def path(self):
-        if self._path == None:
-            raise IOError, "Must resolve this file (%s) with a context " + \
-                            "before you can access .path"
-        return self._path
-    
-    def clean(self):
-        if os.path.exists( self.path ):
-            logger.info("Removing %s" % self.path )
-            if os.path.isdir( self.path ):
-                shutil.rmtree( self.path )
-            else:
-                os.remove( self.path )
-
-class PypeHDF5Dataset(PypeDataObjectBase):  #stub for now Mar 17, 2010
-
-    """ 
-    Represent a PypeData object that is an HDF5 dataset.
-    Not implemented yet.
-    """
-
-    supportedURLScheme = ["hdf5ds"]
-    def __init__(self, URL, readOnly = False, **attributes):
-        PypeDataObjectBase.__init__(self, URL, **attributes)
-        URLParseResult = urlparse(URL)
-        self.localFileName = URLParseResult.path
-        #the rest of the URL goes to HDF5 DS
-
-
-class PypeLocalFileCollection(PypeDataObjectBase):  #stub for now Mar 17, 2010
-
-    """ 
-    Represent a PypeData object that is a composition of multiple files.
-    It will provide a container that allows the tasks to choose one or all file to
-    process.
-    """
-
-    supportedURLScheme = ["files"]
-    def __init__(self, URL, readOnly = False, select = 1, **attributes):
-        """
-           currently we only support select = 1, 
-           namely, we only pass the first file add to the collection to the tasks
-        """
-        PypeDataObjectBase.__init__(self, URL, **attributes)
-        URLParseResult = urlparse(URL)
-        self.compositedDataObjName = URLParseResult.path
-        self.localFileName =  None
-        self._path = None
-        self.readOnly = readOnly
-        self.verification = []
-        self.localFiles = [] # a list of all files within the obj
-        self.select = select
-
-    def addLocalFile(self, pLocalFile):
-        if not isinstance(pLocalFile, PypeLocalFile):
-            raise TypeMismatchError, "only PypeLocalFile object can be added into PypeLocalFileColletion"
-        self.localFiles.append(pLocalFile)
-        if self.select == 1:
-            self.localFileName = self.localFiles[0].localFileName
-            self._path = self.localFileName
-
-    @property
-    def timeStamp(self):
-        if self.localFileName == None:
-            raise PypeError, "No PypeLocalFile is added into the PypeLocalFileColletion yet"
-        if not os.path.exists(self.localFileName):
-            raise FileNotExistError("No such file:%s on %s" % (self.localFileName, platform.node()) )
-        return os.stat(self.localFileName).st_mtime 
-
-    @property
-    def exists(self):
-        if self.localFileName == None:
-            raise PypeError, "No PypeLocalFile is added into the PypeLocalFileColletion yet"
-        return os.path.exists(self.localFileName)
-        
-
-class PypeSplittableLocalFile(PypeDataObjectBase):
-    """ 
-    Represent a PypeData object that has two different local file
-      (1) the whole file (could be a virtual one)
-      (2) the split files
-
-    * Such data object can have either a scatter task attached or a gather task attached.
-    * If a scatter task is attached, the task will be inserted to generate the scattered files.
-    * If a gather task is attached, the task will be inserted to generate the whole file.
-    * If neither scatter task nor gather task is specified, then the file is mostly like interme
-      Namely, the whole file representation is not used any place else.
-    * One can not specify scatter task and gather task for the same object since it will create
-    """
-    supportedURLScheme = ["splittablefile"]
-
-    def __init__(self, URL, readOnly = False, nChunk = 1, **attributes):
-        PypeDataObjectBase.__init__(self, URL, **attributes)
-        self._updatePath()
-        self.readOnly = readOnly
-        self.verification = []
-        self._scatterTask = None
-        self._gatherTask = None
-        self._splittedFiles = []
-        self.nChunk = nChunk
-
-        URLParseResult = urlparse(self.URL)
-        cfURL = "file://%s%s" % (URLParseResult.netloc, URLParseResult.path) 
-
-        self._completeFile = PypeLocalFile(cfURL, readOnly, **attributes)
-
-        dirname, basename = os.path.split(self._path)
-
-        for i in range(nChunk):
-            chunkBasename = "%03d_%s" % (i, basename)
-            if dirname != "":
-                chunkURL = "file://%s%s/%s" % (URLParseResult.netloc, dirname, chunkBasename) 
-            else:
-                chunkURL = "file://%s/%s" % (URLParseResult.netloc, chunkBasename) 
-
-            subfile = PypeLocalFile(chunkURL, readOnly, **attributes)
-            self._splittedFiles.append(subfile) 
-
-    def setGatherTask(self, TaskCreator, TaskType, function):
-        assert self._scatterTask == None
-        inputDataObjs = dict( ( ("subfile%03d" % c[0], c[1]) 
-                                for c in enumerate(self._splittedFiles) ) )
-        outputDataObjs = {"completeFile": self._completeFile}
-        gatherTask = TaskCreator( inputDataObjs = inputDataObjs,
-                                  outputDataObjs = outputDataObjs,
-                                  URL = "task://gather/%s" % self._path ,
-                                  TaskType=TaskType) ( function )
-        self._gatherTask = gatherTask
-
-    def setScatterTask(self, TaskCreator, TaskType, function):
-        assert self._gatherTask == None
-        outputDataObjs = dict( ( ("subfile%03d" % c[0], c[1]) 
-                                for c in enumerate(self._splittedFiles) ) )
-        inputDataObjs = {"completeFile": self._completeFile}
-        scatterTask = TaskCreator( inputDataObjs = inputDataObjs,
-                                   outputDataObjs = outputDataObjs,
-                                   URL = "task://scatter/%s" % self._path ,
-                                   TaskType=TaskType) ( function )
-        self._scatterTask = scatterTask
-
-    def getGatherTask(self):
-        return self._gatherTask
-
-    def getScatterTask(self):
-        return self._scatterTask
-
-    def getSplittedFiles(self):
-        return self._splittedFiles
-
-    @property
-    def timeStamp(self):
-        return self._completeFile.timeStamp
-
-def makePypeLocalFile(aLocalFileName, readOnly = False, scheme="file", **attributes):
-    """
-    >>> f = makePypeLocalFile("/tmp/test.txt")
-    >>> f.localFileName == "/tmp/test.txt"
-    True
-    >>> fn(f)
-    '/tmp/test.txt'
-    """
-    aLocalFileName = os.path.abspath(aLocalFileName)
-
-    #if aLocalFileName.startswith("/"):
-    #    aLocalFileName.lstrip("/")
-    
-    return PypeLocalFile("%s://localhost%s" % (scheme, aLocalFileName), readOnly, **attributes)
-
-def makePypeLocalStateFile(stateName, readOnly = False, **attributes):
-    dirname, basename  = os.path.split(stateName)
-    stateFileName = os.path.join(dirname, "."+basename)
-    return makePypeLocalFile( stateFileName, readOnly = readOnly, scheme = "state", **attributes)
-
-if __name__ == "__main__":
-    import doctest
-    doctest.testmod()
-    
diff --git a/pypeFLOW/pypeflow/pwatcher_bridge.py b/pypeFLOW/pypeflow/pwatcher_bridge.py
deleted file mode 100644
index d8b78c8..0000000
--- a/pypeFLOW/pypeflow/pwatcher_bridge.py
+++ /dev/null
@@ -1,391 +0,0 @@
-"""Bridge pattern to adapt pypeFLOW with pwatcher.
-
-This is a bit messy, but it avoids re-writing the useful bits
-of pypeFLOW.
-
-With PypeProcWatcherWorkflow, the refreshTargets() loop will
-be single-threaded!
-"""
-from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase, TaskFunctionError
-from pypeflow.data import fn
-import pwatcher.blocking
-import pwatcher.fs_based
-import pwatcher.network_based
-import pypeflow.controller
-import pypeflow.task
-import collections
-import datetime
-import glob
-import hashlib
-import json
-import logging
-import os
-import pprint
-import re
-import sys
-import time
-import traceback
-
-log = logging.getLogger(__name__)
-
-def PypeProcWatcherWorkflow(
-        URL = None,
-        job_type='local',
-        job_queue='UNSPECIFIED_QUEUE',
-        watcher_type='fs_based',
-        watcher_directory='mypwatcher',
-        max_jobs=None, # ignore here for now
-        **attributes):
-    """Factory for the workflow using our new
-    filesystem process watcher.
-    """
-    if watcher_type == 'blocking':
-        pwatcher_impl = pwatcher.blocking
-    elif watcher_type == 'network_based':
-        pwatcher_impl = pwatcher.network_based
-    else:
-        pwatcher_impl = pwatcher.fs_based
-    log.warning('In pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
-    log.info('In pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
-    watcher = pwatcher_impl.get_process_watcher(watcher_directory)
-    log.info('job_type={!r}, job_queue={!r}'.format(job_type, job_queue))
-    th = MyPypeFakeThreadsHandler(watcher, job_type, job_queue)
-    mq = MyMessageQueue()
-    se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)
-    return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
-            attributes=attributes)
-
-PypeProcWatcherWorkflow.setNumThreadAllowed = pypeflow.controller._PypeConcurrentWorkflow.setNumThreadAllowed
-
-class Fred(object):
-    """Fake thread.
-    """
-    INITIALIZED = 10
-    STARTED = 20
-    RUNNING = 30
-    JOINED = 40
-    def is_alive(self):
-        return self.__status in (Fred.STARTED, Fred.RUNNING)
-    def start(self):
-        self.__status = Fred.STARTED
-        self.__th.enqueue(self)
-    def join(self, timeout=None):
-        # Maybe we should wait until the sentinel is visible in the filesystem?
-        # Also, if we were STARTED but not RUNNING, then we never did anything!
-
-        #assert self.__status is not Fred.JOINED # Nope. Might be called a 2nd time by notifyTerminate().
-        self.__status = Fred.JOINED
-    # And our own special methods.
-    def task(self):
-        return self.__target
-    def generate(self):
-        self.__target()
-    def setTargetStatus(self, status):
-        self.__target.setStatus(status)
-    def endrun(self, status):
-        """By convention for now, status is one of:
-            'DEAD'
-            'UNSUBMITTED' (a pseudo-status defined in the ready-loop of alive())
-            'EXIT rc'
-        """
-        name = status.split()[0]
-        if name == 'DEAD':
-            log.warning(''.join(traceback.format_stack()))
-            log.error('Task {}\n is DEAD, meaning no HEARTBEAT, but this can be a race-condition. If it was not killed, then restarting might suffice. Otherwise, you might have excessive clock-skew.'.format(self.brief()))
-            self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better
-        elif name == 'UNSUBMITTED':
-            log.warning(''.join(traceback.format_stack()))
-            log.error('Task {}\n is UNSUBMITTED, meaning job-submission somehow failed. Possibly a re-start would work. Otherwise, you need to investigate.'.format(self.brief()))
-            self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better
-        elif name != 'EXIT':
-            raise Exception('Unexpected status {!r}'.format(name))
-        else:
-            code = int(status.split()[1])
-            if 0 == code:
-                self.__target.check_missing()
-                # TODO: If missing, just leave the status as TaskInitialized?
-            else:
-                log.error('Task {} failed with exit-code={}'.format(self.brief(), code))
-                self.setTargetStatus(pypeflow.task.TaskFail) # for lack of anything better
-        self.__target.finish()
-    def brief(self):
-        return 'Fred{}'.format(self.__target.brief())
-    def __repr__(self):
-        return 'FRED with taskObj={!r}'.format(self.__target)
-    def __init__(self, target, th):
-        assert isinstance(target, MyFakePypeThreadTaskBase)
-        self.__target = target # taskObj?
-        self.__th = th # thread handler
-        self.__status = Fred.INITIALIZED
-
-class MyMessageQueue(object):
-    def empty(self):
-        return not self.__msgq
-    def get(self):
-        return self.__msgq.popleft()
-    def put(self, msg):
-        self.__msgq.append(msg)
-    def __init__(self):
-        self.__msgq = collections.deque()
-
-class MyFakeShutdownEvent(object):
-    """I do not see any code which actually uses the
-    shutdown_event, but if needed someday, we can use this.
-    """
-    def set(self):
-        pass
-
-_prev_q = {} # To avoid excessive log output.
-
-class MyPypeFakeThreadsHandler(object):
-    """Stateless method delegator, for injection.
-    """
-    def create(self, target):
-        thread = Fred(target=target, th=self)
-        return thread
-    def alive(self, threads):
-        ready = dict()
-        while self.__jobq:
-            fred = self.__jobq.popleft()
-            taskObj = fred.task()
-            fred.generate() # -> taskObj->generated_script_fn by convention
-            #log.info('param:\n%s' %pprint.pformat(taskObj.parameters)) # I do not think these change.
-            try:
-                script_fn = taskObj.generated_script_fn # BY CONVENTION
-            except AttributeError:
-                log.warning('Missing taskObj.generated_script_fn for task. Maybe we did not need it? Skipping and continuing.')
-                fred.endrun('EXIT 0')
-                continue
-            log.info('script_fn:%s' %repr(script_fn))
-            content = open(script_fn).read()
-            digest = hashlib.sha256(content).hexdigest()
-            jobid = 'J{}'.format(digest)
-            log.info('jobid=%s' %jobid)
-            taskObj.jobid = jobid
-            ready[jobid] = fred
-            self.__known[jobid] = fred
-        if ready:
-            # Start anything in the 'ready' queue.
-            # Note: It is safe to run this block always, but we save a
-            # call to pwatcher with 'if ready'.
-            log.debug('ready dict keys:\n%s' %pprint.pformat(ready.keys()))
-            jobids = dict()
-            #sge_option='-pe smp 8 -q default'
-            for jobid, fred in ready.iteritems():
-                generated_script_fn = fred.task().generated_script_fn
-                rundir, basename = os.path.split(os.path.abspath(generated_script_fn))
-                cmd = '/bin/bash {}'.format(basename)
-                sge_option = fred.task().parameters.get('sge_option', None)
-                job_type = fred.task().parameters.get('job_type', None)
-                job_queue = fred.task().parameters.get('job_queue', None)
-                job_nprocs = fred.task().parameters.get('job_nprocs', None)
-                jobids[jobid] = {
-                    'cmd': cmd,
-                    'rundir': rundir,
-                    # These are optional:
-                    'job_type': job_type,
-                    'job_queue': job_queue,
-                    'job_nprocs': job_nprocs,
-                    'sge_option': sge_option,
-                }
-            # Also send the default type and queue-name.
-            watcher_args = {
-                    'jobids': jobids,
-                    'job_type': self.__job_type,
-                    'job_queue': self.__job_queue,
-            }
-            result = self.watcher.run(**watcher_args)
-            log.debug('Result of watcher.run()={}'.format(repr(result)))
-            submitted = result['submitted']
-            self.__running.update(submitted)
-            #log.info("QQQ ADDED: {}".format(jobid))
-            for jobid in set(jobids.keys()) - set(submitted):
-                fred = ready[jobid]
-                fred.endrun('UNSUBMITTED')
-
-        watcher_args = {
-            'jobids': list(self.__running),
-            'which': 'list',
-        }
-        q = self.watcher.query(**watcher_args)
-        #log.debug('In alive(), result of query:%s' %repr(q))
-        global _prev_q
-        if q != _prev_q:
-            #log.debug('In alive(), updated result of query:%s' %repr(q))
-            _prev_q = q
-            _prev_q = None
-        for jobid, status in q['jobids'].iteritems():
-            if status.startswith('EXIT') or status.startswith('DEAD'):
-                self.__running.remove(jobid)
-                #log.info("QQQ REMOVED: {}".format(jobid))
-                fred = self.__known[jobid]
-                try:
-                    fred.endrun(status)
-                except Exception as e:
-                    msg = 'Failed to clean-up FakeThread: jobid={} status={}'.format(jobid, repr(status))
-                    log.exception(msg)
-                    raise
-        #log.info('len(jobq)==%d' %len(self.__jobq))
-        #log.info(''.join(traceback.format_stack()))
-        return sum(thread.is_alive() for thread in threads)
-    def join(self, threads, timeout):
-        then = datetime.datetime.now()
-        for thread in threads:
-            #assert thread is not threading.current_thread()
-            if thread.is_alive():
-                to = max(0, timeout - (datetime.datetime.now() - then).seconds)
-        # This is called only in the refreshTargets() catch, so
-        # it can simply terminate all threads.
-                thread.join(to)
-        self.notifyTerminate(threads)
-    def notifyTerminate(self, threads):
-        """Assume these are daemon threads.
-        We will attempt to join them all quickly, but non-daemon threads may
-        eventually block the program from quitting.
-        """
-        pass #self.join(threads, 1)
-        # TODO: Terminate only the jobs for 'threads'.
-        # For now, use 'known' instead of 'infer' b/c we
-        # also want to kill merely queued jobs, though that is currently difficult.
-        watcher_args = {
-            'jobids': list(self.__running),
-            'which': 'known',
-        }
-        q = self.watcher.delete(**watcher_args)
-        log.debug('In notifyTerminate(), result of delete:%s' %repr(q))
-
-
-    # And our special methods.
-    def enqueue(self, fred):
-        self.__jobq.append(fred)
-    def __init__(self, watcher, job_type, job_queue=None):
-        """
-        job_type and job_queue are defaults, possibly over-ridden for specific jobs.
-        Note: job_queue is a string, not a collection. If None, then it would need to
-        come via per-job settings.
-        """
-        self.watcher = watcher
-        self.__job_type = job_type
-        self.__job_queue = job_queue
-        self.__jobq = collections.deque()
-        self.__running = set()
-        self.__known = dict()
-
-def makedirs(path):
-    if not os.path.isdir(path):
-        log.debug('makedirs {!r}'.format(path))
-        os.makedirs(path)
-
-class MyFakePypeThreadTaskBase(PypeThreadTaskBase):
-    """Fake for PypeConcurrentWorkflow.
-    It demands a subclass, even though we do not use threads at all.
-    Here, we override everything that it overrides. PypeTaskBase defaults are fine.
-    """
-    @property
-    def nSlots(self):
-        """(I am not sure what happend if > 1, but we will not need that. ~cdunn)
-        Return the required number of slots to run, total number of slots is determined by
-        PypeThreadWorkflow.MAX_NUMBER_TASK_SLOT, increase this number by passing desired number
-        through the "parameters" argument (e.g parameters={"nSlots":2}) to avoid high computationa
-        intensive job running concurrently in local machine One can set the max number of thread
-        of a workflow by PypeThreadWorkflow.setNumThreadAllowed()
-        """
-        try:
-            nSlots = self.parameters["nSlots"]
-        except AttributeError:
-            nSlots = 1
-        except KeyError:
-            nSlots = 1
-        return nSlots
-
-    def setMessageQueue(self, q):
-        self._queue = q
-
-    def setShutdownEvent(self, e):
-        self.shutdown_event = e
-
-    def __call__(self, *argv, **kwargv):
-        """Trap all exceptions, set fail flag, SEND MESSAGE, log, and re-raise.
-        """
-        try:
-            return self.runInThisThread(*argv, **kwargv)
-        except: # and re-raise
-            #log.exception('%s __call__ failed:\n%r' %(type(self).__name__, self))
-            self._status = pypeflow.task.TaskFail  # TODO: Do not touch internals of base class.
-            self._queue.put( (self.URL, pypeflow.task.TaskFail) )
-            raise
-
-    def runInThisThread(self, *argv, **kwargv):
-        """
-        Similar to the PypeTaskBase.run(), but it provide some machinary to pass information
-        back to the main thread that run this task in a sepearated thread through the standard python
-        queue from the Queue module.
-
-        We expect this to be used *only* for tasks which generate run-scripts.
-        Our version does not actually run the script. Instead, we expect the script-filename to be returned
-        by run().
-        """
-        if self._queue == None:
-            raise Exception('There seems to be a case when self.queue==None, so we need to let this block simply return.')
-        self._queue.put( (self.URL, "started, runflag: %d" % True) )
-        return self.run(*argv, **kwargv)
-
-    # We must override this from PypeTaskBase b/c we do *not* produce outputs
-    # immediately.
-    def run(self, *argv, **kwargv):
-        argv = list(argv)
-        argv.extend(self._argv)
-        kwargv.update(self._kwargv)
-
-        inputDataObjs = self.inputDataObjs
-        self.syncDirectories([o.localFileName for o in inputDataObjs.values()])
-
-        outputDataObjs = self.outputDataObjs
-        for datao in outputDataObjs.values():
-            makedirs(os.path.dirname(fn(datao)))
-        parameters = self.parameters
-
-        log.info('Running task from function %s()' %(self._taskFun.__name__))
-        rtn = self._runTask(self, *argv, **kwargv)
-
-        if self.inputDataObjs != inputDataObjs or self.parameters != parameters:
-            raise TaskFunctionError("The 'inputDataObjs' and 'parameters' should not be modified in %s" % self.URL)
-            # Jason, note that this only tests whether self.parameters was rebound.
-            # If it is altered, then so is parameters, so the check would pass.
-            # TODO(CD): What is the importance of this test? Should it be fixed or deleted?
-
-        return True # to indicate that it run, since we no longer rely on runFlag
-
-    def check_missing(self):
-        timeout_s = 30
-        sleep_s = .1
-        self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()]) # Sometimes helps in NFS.
-        lastUpdate = datetime.datetime.now()
-        while timeout_s > (datetime.datetime.now()-lastUpdate).seconds:
-            missing = [(k,o) for (k,o) in self.outputDataObjs.iteritems() if not o.exists]
-            if missing:
-                log.debug("%s failed to generate all outputs; %s; missing:\n%s" %(
-                    self.URL, repr(self._status),
-                    pprint.pformat(missing),
-                ))
-                #import commands
-                #cmd = 'pstree -pg -u cdunn'
-                #output = commands.getoutput(cmd)
-                #log.debug('`%s`:\n%s' %(cmd, output))
-                dirs = set(os.path.dirname(o.localFileName) for o in self.outputDataObjs.itervalues())
-                for d in dirs:
-                    log.debug('listdir(%s): %s' %(d, repr(os.listdir(d))))
-                #self._status = pypeflow.task.TaskFail
-                time.sleep(sleep_s)
-                sleep_s *= 2.0
-            else:
-                self._status = pypeflow.task.TaskDone
-                break
-        else:
-            log.info('timeout(%ss) in check_missing()' %timeout_s)
-            self._status = pypeflow.task.TaskFail
-
-    # And our own special methods.
-    def finish(self):
-        self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()])
-        self._queue.put( (self.URL, self._status) )
diff --git a/pypeFLOW/pypeflow/pwatcher_workflow.py b/pypeFLOW/pypeflow/pwatcher_workflow.py
new file mode 100644
index 0000000..0ca0384
--- /dev/null
+++ b/pypeFLOW/pypeflow/pwatcher_workflow.py
@@ -0,0 +1,9 @@
+from .simple_pwatcher_bridge import (
+        PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
+        makePypeLocalFile, fn, PypeTask)
+PypeThreadTaskBase = MyFakePypeThreadTaskBase
+
+__all__ = [
+        'PypeProcWatcherWorkflow', 'PypeThreadTaskBase',
+        'makePypeLocalFile', 'fn', 'PypeTask',
+]
diff --git a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
index d1c7467..3b2fe59 100644
--- a/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
+++ b/pypeFLOW/pypeflow/simple_pwatcher_bridge.py
@@ -77,6 +77,9 @@ class PwatcherTaskQueue(object):
 
             rundir, basename = os.path.split(os.path.abspath(generated_script_fn))
             cmd = '/bin/bash {}'.format(basename)
+            LOG.debug('In rundir={!r}, sge_option={!r}, __sge_option={!r}'.format(
+                rundir,
+                node.pypetask.parameters.get('sge_option'), self.__sge_option))
             sge_option = node.pypetask.parameters.get('sge_option', self.__sge_option)
             job_type = node.pypetask.parameters.get('job_type', None)
             job_queue = node.pypetask.parameters.get('job_queue', None)
@@ -241,13 +244,13 @@ class Workflow(object):
                         len(unsubmitted), len(to_submit), unsubmitted))
                     #ready.update(unsubmitted) # Resubmit only in pwatcher, if at all.
                 submitted.update(to_submit - unsubmitted)
-            LOG.debug('N in queue: {}'.format(len(submitted)))
+            LOG.debug('N in queue: {} (max_jobs={})'.format(len(submitted), self.max_jobs))
             recently_done = set(self.tq.check_done())
             if not recently_done:
                 if not submitted:
                     LOG.warning('Nothing is happening, and we had {} failures. Should we quit? Instead, we will just sleep.'.format(failures))
                     #break
-                LOG.info('sleep {}'.format(sleep_time))
+                LOG.info('sleep {}s'.format(sleep_time))
                 time.sleep(sleep_time)
                 sleep_time = sleep_time + 0.1 if (sleep_time < updateFreq) else updateFreq
                 continue
@@ -430,8 +433,8 @@ def find_work_dir(paths):
     """
     dirnames = set(os.path.dirname(os.path.normpath(p)) for p in paths)
     if len(dirnames) != 1:
-        raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {!r}'.format(
-            paths))
+        raise Exception('Cannot find work-dir for paths in multiple (or zero) dirs: {} in {}'.format(
+            pprint.pformat(paths), pprint.pformat(dirnames)))
     d = dirnames.pop()
     return os.path.abspath(d)
 class PypeLocalFile(object):
@@ -554,13 +557,12 @@ def PypeProcWatcherWorkflow(
     LOG.warning('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
     LOG.info('In simple_pwatcher_bridge, pwatcher_impl={!r}'.format(pwatcher_impl))
     watcher = pwatcher_impl.get_process_watcher(watcher_directory)
-    LOG.info('job_type={!r}, job_queue={!r}'.format(job_type, job_queue))
+    LOG.info('job_type={!r}, job_queue={!r}, sge_option={!r}'.format(job_type, job_queue, sge_option))
     return Workflow(watcher, job_type=job_type, job_queue=job_queue, max_jobs=max_jobs, sge_option=sge_option)
     #th = MyPypeFakeThreadsHandler('mypwatcher', job_type, job_queue)
     #mq = MyMessageQueue()
     #se = MyFakeShutdownEvent() # TODO: Save pwatcher state on ShutdownEvent. (Not needed for blocking pwatcher. Mildly useful for fs_based.)
     #return pypeflow.controller._PypeConcurrentWorkflow(URL=URL, thread_handler=th, messageQueue=mq, shutdown_event=se,
     #        attributes=attributes)
-PypeProcWatcherWorkflow.setNumThreadAllowed = lambda x, y: None
 
 __all__ = ['PypeProcWatcherWorkflow', 'fn', 'makePypeLocalFile', 'MyFakePypeThreadTaskBase', 'PypeTask']
diff --git a/pypeFLOW/pypeflow/task.py b/pypeFLOW/pypeflow/task.py
deleted file mode 100644
index 32564ab..0000000
--- a/pypeFLOW/pypeflow/task.py
+++ /dev/null
@@ -1,892 +0,0 @@
-
-# @author Jason Chin
-#
-# Copyright (C) 2010 by Jason Chin 
-# Copyright (C) 2011 by Jason Chin, Pacific Biosciences
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-"""
-
-PypeTask: This module provides the PypeTask class and the decorators that can convert 
-a regular python funtion into a PypeTask instance. 
-
-"""
-
-import pprint
-import inspect
-import hashlib
-import logging
-import copy
-import sys
-import time
-
-PYTHONVERSION = sys.version_info[:2]
-if PYTHONVERSION == (2,5):
-    import simplejson as json
-else:
-    import json
-
-import os
-import shlex
-
-from common import PypeError, PypeObject, pypeNS, runShellCmd, Graph, URIRef, Literal
-from data import FileNotExistError, PypeSplittableLocalFile, makePypeLocalFile
-
-logger = logging.getLogger(__name__)
-
-class TaskFunctionError(PypeError):
-    pass
-
-# These must be strings.
-TaskInitialized = "TaskInitialized"
-TaskDone = "done"
-TaskFail = "fail"
-# TODO(CD): Make user-code compare by variable name.
-
-class PypeTaskBase(PypeObject):
-    """
-    Represent a PypeTask. Subclass it to for different kind of
-    task.
-    """
-
-    supportedURLScheme = ["task"]
-
-    def __init__(self, URL, *argv, **kwargv):
-
-        """
-        Constructor of a PypeTask.
-        """
-
-        PypeObject.__init__(self, URL, **kwargv)
-
-        self._argv = argv
-        self._kwargv = kwargv
-        self._taskFun = kwargv['_taskFun']
-        self._referenceMD5 = None
-        self._status = TaskInitialized
-        self._queue = None
-        self.shutdown_event = None
-        
-
-        for defaultAttr in ["inputDataObjs", "outputDataObjs", "parameters", "mutableDataObjs"]:
-            if defaultAttr not in self.__dict__:
-                self.__dict__[defaultAttr] = {}
-
-        # "input" and "output" short cut
-        if "inputs" in kwargv:
-            self.inputDataObjs.update(kwargv["inputs"])
-            del kwargv["inputs"]
-
-        if "outputs" in kwargv:
-            self.outputDataObjs.update(kwargv["outputs"])
-            del kwargv["outputs"]
-
-        if "mutables" in kwargv:
-            self.mutableDataObjs.update(kwargv["mutables"])
-            del kwargv["mutables"]
-
-        #the keys in inputDataObjs/outputDataObjs/parameters will become a task attribute 
-        for defaultAttr in ["inputDataObjs", "outputDataObjs", "mutableDataObjs", "parameters"]:
-            vars(self).update(self.__dict__[defaultAttr]) 
-
-        if "chunk_id" in kwargv:
-            self.chunk_id = kwargv["chunk_id"]
-
-        self._codeMD5digest = kwargv.get("_codeMD5digest", "")
-        self._paramMD5digest = kwargv.get("_paramMD5digest", "")
-        self._compareFunctions = kwargv.get("_compareFunctions", [ timeStampCompare ])
-
-        for o in self.outputDataObjs.values():
-            if o.readOnly == True:
-                raise PypeError, "Cannot assign read only data object %s for task %s" % (o.URL, self.URL) 
-        
-    @property
-    def status(self):
-        return self._status
-        
-    def setInputs( self, inputDataObjs ):
-        self.inputDataObjs = inputDataObjs
-        vars(self).update( inputDataObjs )
-        
-    def setOutputs( self, outputDataObjs ):
-        self.outputDataObjs = outputDataObjs
-        vars(self).update( outputDataObjs )
-        
-    def setReferenceMD5(self, md5Str):
-        self._referenceMD5 = md5Str
-
-    def _getRunFlag(self):
-        """Determine whether the PypeTask should be run. It can be overridden in
-        subclass to allow more flexible rules.
-        """
-        runFlag = False
-        if self._referenceMD5 is not None and self._referenceMD5 != self._codeMD5digest:
-            self._referenceMD5 = self._codeMD5digest
-            # Code has changed.
-            return True
-        return any( [ f(self.inputDataObjs, self.outputDataObjs, self.parameters) for f in self._compareFunctions] )
-
-    def isSatisfied(self):
-        """Compare dependencies. (Kinda expensive.)
-        Note: Do not call this while the task is actually running!
-        """
-        return not self._getRunFlag()
-
-    def getStatus(self):
-        """
-        Note: Do not call this while the task is actually running!
-        """
-        return self._status
-
-    def setStatus(self, status):
-        """
-        Note: Do not call this while the task is actually running!
-        """
-        assert status in (TaskInitialized, TaskDone, TaskFail)
-        self._status = status
-
-    def _runTask(self, *argv, **kwargv):
-        """ 
-        The method to run the decorated function _taskFun(). It is called through run() of
-        the PypeTask object and it should never be called directly
-
-        TODO: the arg porcessing is still a mess, need to find a better way to do this 
-        """
-        if PYTHONVERSION == (2,5): #TODO(CD): Does this even work anymore?
-            (args, varargs, varkw, defaults)  = inspect.getargspec(self._taskFun)
-            #print  (args, varargs, varkw, defaults)
-        else:
-            argspec = inspect.getargspec(self._taskFun)
-            (args, varargs, varkw, defaults) = argspec.args, argspec.varargs, argspec.keywords, argspec.defaults
-
-        if varkw != None:
-            return self._taskFun(self, *argv, **kwargv)
-        elif varargs != None:
-            return self._taskFun(self, *argv)
-        elif len(args) != 0:
-            nkwarg = {}
-            if defaults != None:
-                defaultArg = args[-len(defaults):]
-                for a in defaultArg:
-                    nkwarg[a] = kwargv[a]
-                return self._taskFun(self, *argv, **nkwarg)
-            else:
-                return self._taskFun(self)
-        else:
-            return self._taskFun(self)
-
-    @property
-    def _RDFGraph(self):
-        graph = Graph()
-        for k,v in self.__dict__.iteritems():
-            if k == "URL": continue
-            if k[0] == "_": continue
-            if k in ["inputDataObjs", "outputDataObjs", "mutableDataObjs", "parameters"]:
-                if k == "inputDataObjs":
-                    for ft, f in v.iteritems():
-                        graph.add( (URIRef(self.URL), pypeNS["prereq"], URIRef(f.URL) ) )
-                elif k == "outputDataObjs":
-                    for ft, f in v.iteritems():
-                        graph.add( (URIRef(f.URL), pypeNS["prereq"], URIRef(self.URL) ) )
-                elif k == "mutableDataObjs":
-                    for ft, f in v.iteritems():
-                        graph.add( (URIRef(self.URL), pypeNS["hasMutable"], URIRef(f.URL)   ) )
-                elif k == "parameters":
-                    graph.add( (URIRef(self.URL), pypeNS["hasParameters"], Literal(json.dumps(v)) ) )
-            
-                continue
-
-            if k in self.inputDataObjs:
-                graph.add( ( URIRef(self.URL), pypeNS["inputDataObject"], URIRef(v.URL) ) )
-                continue
-
-            if k in self.outputDataObjs:
-                graph.add( ( URIRef(self.URL), pypeNS["outputDataObject"], URIRef(v.URL) ) )
-                continue
-
-            if k in self.mutableDataObjs:
-                graph.add( ( URIRef(self.URL), pypeNS["mutableDataObject"], URIRef(v.URL) ) )
-                continue
-
-            if hasattr(v, "URL"):
-                graph.add( ( URIRef(self.URL), pypeNS[k], URIRef(v.URL) ) )
-
-            graph.add(  ( URIRef(self.URL), pypeNS["codeMD5digest"], Literal(self._codeMD5digest) ) )
-            graph.add(  ( URIRef(self.URL), pypeNS["parameterMD5digest"], Literal(self._paramMD5digest) ) )
-
-        return graph
-
-    def __call__(self, *argv, **kwargv):
-        """Trap all exceptions, set fail flag, log, and re-raise.
-        If you need to do more, then over-ride this method.
-        """
-        try:
-            return self.run(*argv, **kwargv)
-        except: # and re-raise
-            logger.exception('PypeTaskBase failed unexpectedly:\n%r' %self)
-            self._status = TaskFail
-            raise
-
-    @staticmethod
-    def syncDirectories(fns):
-        # need the following loop to force the stupid Islon to update the metadata in the directory
-        # otherwise, the file would be appearing as non-existence... sigh, this is a >5 hours hard earned hacks
-        # Yes, a friend at AMD had this problem too. Painful. ~cd
-        for d in set(os.path.dirname(fn) for fn in fns):
-            try:
-                os.listdir(d)
-            except OSError:
-                pass
-
-    def run(self, *argv, **kwargv):
-        """Determine whether a task should be run when called.
-        If the dependency is not satisified,
-        then the _taskFun() will be called to generate the output data objects.
-
-        Derived class can over-ride this method, but if __call__ is over-ridden,
-        then derived must call this explicitly.
-        """
-        argv = list(argv)
-        argv.extend(self._argv)
-        kwargv.update(self._kwargv)
-
-        inputDataObjs = self.inputDataObjs
-        self.syncDirectories([o.localFileName for o in inputDataObjs.values()])
-
-        outputDataObjs = self.outputDataObjs
-        parameters = self.parameters
-
-        logger.info('Running task from function %s()' %(self._taskFun.__name__))
-        rtn = self._runTask(self, *argv, **kwargv)
-
-        if self.inputDataObjs != inputDataObjs or self.parameters != parameters:
-            raise TaskFunctionError("The 'inputDataObjs' and 'parameters' should not be modified in %s" % self.URL)
-        missing = [(k,o) for (k,o) in self.outputDataObjs.iteritems() if not o.exists]
-        if missing:
-            logger.debug("%s fails to generate all outputs; missing:\n%s" %(self.URL, pprint.pformat(missing)))
-            self._status = TaskFail
-        else:
-            self._status = TaskDone
-
-        return True # to indicate that it run, since we no longer rely on runFlag
-
-    def __repr__(self):
-        r = dict()
-        r['_status'] = self._status
-        r['inputDataObjs'] = self.inputDataObjs
-        r['outputDataObjs'] = self.outputDataObjs
-        r['mutableDataObjs'] = self.mutableDataObjs
-        r['parameters'] = self.parameters
-        r['URL'] = getattr(self, 'URL', 'No URL?')
-        r['__class__.__name__'] = self.__class__.__name__
-        return pprint.pformat(r)
-    def brief(self):
-        r = dict()
-        r['URL'] = self.URL
-        return pprint.pformat(r)
-
-    def finalize(self): 
-        """ 
-        This method is intended to be overriden by subclass to provide extra processing that is not 
-        directed related to the processing the input and output data. For the thread workflow, this
-        method will be called in the main thread after a take is finished regardless the job status.
-        """
-        pass
-
-class PypeThreadTaskBase(PypeTaskBase):
-
-    """
-    Represent a PypeTask that can be run within a thread. 
-    Subclass it to for different kind of task.
-    """
-
-    @property
-    def nSlots(self):
-        """
-        Return the required number of slots to run, total number of slots is determined by 
-        PypeThreadWorkflow.MAX_NUMBER_TASK_SLOT, increase this number by passing desired number 
-        through the "parameters" argument (e.g parameters={"nSlots":2}) to avoid high computationa 
-        intensive job running concurrently in local machine One can set the max number of thread 
-        of a workflow by PypeThreadWorkflow.setNumThreadAllowed()
-        """
-        try:
-            nSlots = self.parameters["nSlots"]
-        except AttributeError:
-            nSlots = 1
-        except KeyError:
-            nSlots = 1
-        return nSlots
-
-
-    def setMessageQueue(self, q):
-        self._queue = q
-
-    def setShutdownEvent(self, e):
-        self.shutdown_event = e
-
-    def __call__(self, *argv, **kwargv):
-        """Trap all exceptions, set fail flag, SEND MESSAGE, log, and re-raise.
-        """
-        try:
-            return self.runInThisThread(*argv, **kwargv)
-        except: # and re-raise
-            logger.exception('PypeTaskBase failed:\n%r' %self)
-            self._status = TaskFail  # TODO: Do not touch internals of base class.
-            self._queue.put( (self.URL, TaskFail) )
-            raise
-
-    def runInThisThread(self, *argv, **kwargv):
-        """
-        Similar to the PypeTaskBase.run(), but it provide some machinary to pass information
-        back to the main thread that run this task in a sepearated thread through the standard python
-        queue from the Queue module.
-        """
-        if self._queue == None:
-            logger.debug('Testing threads w/out queue?')
-            self.run(*argv, **kwargv)
-            # return
-            # raise until we know what this should do.
-            raise Exception('There seems to be a case when self.queue==None, so we need to let this block simply return.')
-
-        self._queue.put( (self.URL, "started, runflag: %d" % True) )
-        self.run(*argv, **kwargv)
-
-        self.syncDirectories([o.localFileName for o in self.outputDataObjs.values()])
-
-        self._queue.put( (self.URL, self._status) )
-
-class PypeDistributiableTaskBase(PypeThreadTaskBase):
-
-    """
-    Represent a PypeTask that can be run within a thread or submit to
-    a grid-engine like job scheduling system. 
-    Subclass it to for different kind of task.
-    """
-
-    def __init__(self, URL, *argv, **kwargv):
-        PypeTaskBase.__init__(self, URL, *argv, **kwargv)
-        self.distributed = True
-
-
-class PypeTaskCollection(PypeObject):
-
-    """
-    Represent an object that encapsules a number of tasks
-    """
-
-    supportedURLScheme = ["tasks"]
-    def __init__(self, URL, tasks = [], scatterGatherTasks = [], **kwargv):
-        PypeObject.__init__(self, URL, **kwargv)
-        self._tasks = tasks[:]
-        self._scatterGatherTasks = scatterGatherTasks[:]
-
-    def addTask(self, task):
-        self._tasks.append(task)
-
-    def getTasks(self):
-        return self._tasks
-
-    def addScatterGatherTask(self, task):
-        self._scatterGatherTasks.append(task)
-
-    def getScatterGatherTasks(self):
-        return self._scatterGatherTasks
-
-    def __getitem__(self, k):
-        return self._tasks[k]
-
-_auto_names = set()
-def _unique_name(name):
-    """
-    >>> def foo(): pass
-    >>> _unique_name('foo')
-    'foo'
-    >>> _unique_name('foo')
-    'foo.01'
-    >>> _unique_name('foo')
-    'foo.02'
-    """
-    if name in _auto_names:
-        n = 0
-        while True:
-            n += 1
-            try_name = '%s.%02d' %(name, n)
-            if try_name not in _auto_names:
-                break
-        name = try_name
-    _auto_names.add(name)
-    return name
-def _auto_task_url(taskFun):
-    # Note: in doctest, the filename would be weird.
-    return "task://" + inspect.getfile(taskFun) + "/"+ _unique_name(taskFun.func_name)
-
-def PypeTask(**kwargv):
-
-    """
-    A decorator that converts a function into a PypeTaskBase object.
-
-    >>> import os, time 
-    >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
-    >>> from pypeflow.task import *
-    >>> try:
-    ...     os.makedirs("/tmp/pypetest")
-    ...     _ = os.system("rm -f /tmp/pypetest/*")   
-    ... except Exception:
-    ...     pass
-    >>> time.sleep(.1)
-    >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False)
-    >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False)
-    >>> @PypeTask(outputs={"test_out":fout},
-    ...           inputs={"test_in":fin},
-    ...           parameters={"a":'I am "a"'}, **{"b":'I am "b"'})
-    ... def test(self):
-    ...     print test.test_in.localFileName
-    ...     print test.test_out.localFileName
-    ...     os.system( "touch %s" % fn(test.test_out) )
-    ...     print self.test_in.localFileName
-    ...     print self.test_out.localFileName
-    ...     pass
-    >>> type(test) 
-    <class 'pypeflow.task.PypeTaskBase'>
-    >>> test.test_in.localFileName
-    '/tmp/pypetest/testfile_in'
-    >>> test.test_out.localFileName
-    '/tmp/pypetest/testfile_out'
-    >>> os.system( "touch %s" %  ( fn(fin))  )
-    0
-    >>> timeStampCompare(test.inputDataObjs, test.outputDataObjs, test.parameters)
-    True
-    >>> print test._getRunFlag()
-    True
-    >>> test()
-    /tmp/pypetest/testfile_in
-    /tmp/pypetest/testfile_out
-    /tmp/pypetest/testfile_in
-    /tmp/pypetest/testfile_out
-    True
-    >>> timeStampCompare(test.inputDataObjs, test.outputDataObjs, test.parameters)
-    False
-    >>> print test._getRunFlag()
-    False
-    >>> print test.a
-    I am "a"
-    >>> print test.b
-    I am "b"
-    >>> os.system( "touch %s" %  (fn(fin))  )
-    0
-    >>> # test PypeTask.finalize()
-    >>> from controller import PypeWorkflow
-    >>> wf = PypeWorkflow()
-    >>> wf.addTask(test)
-    >>> def finalize(self):
-    ...     def f():
-    ...         print "in finalize:", self._status
-    ...     return f
-    >>> test.finalize = finalize(test)  # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overriden by subclasses. 
-    >>> wf.refreshTargets( objs = [fout] )
-    /tmp/pypetest/testfile_in
-    /tmp/pypetest/testfile_out
-    /tmp/pypetest/testfile_in
-    /tmp/pypetest/testfile_out
-    in finalize: done
-    True
-    >>> #The following code show how to set up a task with a PypeThreadWorkflow that allows running multitple tasks in parallel. 
-    >>> from pypeflow.controller import PypeThreadWorkflow
-    >>> wf = PypeThreadWorkflow()
-    >>> @PypeTask(outputDataObjs={"test_out":fout},
-    ...           inputDataObjs={"test_in":fin},
-    ...           TaskType=PypeThreadTaskBase,
-    ...           parameters={"a":'I am "a"'}, **{"b":'I am "b"'})
-    ... def test(self):
-    ...     print test.test_in.localFileName
-    ...     print test.test_out.localFileName
-    ...     os.system( "touch %s" % fn(test.test_out) )
-    ...     print self.test_in.localFileName
-    ...     print self.test_out.localFileName
-    >>> wf.addTask(test)
-    >>> def finalize(self):
-    ...     def f():
-    ...         print "in finalize:", self._status
-    ...     return f
-    >>> test.finalize = finalize(test)  # For testing only. Please don't do this in your code. The PypeTask.finalized() is intended to be overided by subclasses. 
-    >>> wf.refreshTargets( objs = [fout] ) #doctest: +SKIP
-    """
-
-    def f(taskFun):
-
-        TaskType = kwargv.get("TaskType", PypeTaskBase)
-
-        if "TaskType" in kwargv:
-            del kwargv["TaskType"]
-
-        kwargv["_taskFun"] = taskFun
-
-        if kwargv.get("URL",None) == None:
-            kwargv["URL"] = _auto_task_url(taskFun)
-        try:
-            kwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest()
-        except IOError: #python2.7 seems having problem to get source code from docstring, this is a work around to make docstring test working
-            kwargv["_codeMD5digest"] = ""
-        kwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest()
-
-        newKwargv = copy.copy(kwargv)
-        inputDataObjs = kwargv.get("inputDataObjs",{}) 
-        inputDataObjs.update(kwargv.get("inputs", {}))
-        outputDataObjs = kwargv.get("outputDataObjs",{}) 
-        outputDataObjs.update(kwargv.get("outputs", {}))
-        newInputs = {}
-        for inputKey, inputDO in inputDataObjs.items():
-            if isinstance(inputDO, PypeSplittableLocalFile):
-                newInputs[inputKey] = inputDO._completeFile
-            else:
-                newInputs[inputKey] = inputDO
-
-        newOutputs = {}
-        for outputKey, outputDO in outputDataObjs.items():
-            if isinstance(outputDO, PypeSplittableLocalFile):
-                newOutputs[outputKey] = outputDO._completeFile
-            else:
-                newOutputs[outputKey] = outputDO
-
-        newKwargv["inputDataObjs"] = newInputs
-        newKwargv["outputDataObjs"] = newOutputs
-        task = TaskType(**newKwargv)
-        task.__doc__ = taskFun.__doc__
-        return task
-
-    return f
-
-def PypeShellTask(**kwargv):
-
-    """
-    A function that converts a shell script into a PypeTaskBase object.
-
-    >>> import os, time 
-    >>> from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
-    >>> from pypeflow.task import *
-    >>> try:
-    ...     os.makedirs("/tmp/pypetest")
-    ...     _ = os.system("rm -f /tmp/pypetest/*")
-    ... except Exception:
-    ...     pass
-    >>> time.sleep(.1)
-    >>> fin = makePypeLocalFile("/tmp/pypetest/testfile_in", readOnly=False)
-    >>> fout = makePypeLocalFile("/tmp/pypetest/testfile_out", readOnly=False)
-    >>> f = open("/tmp/pypetest/shellTask.sh","w")
-    >>> f.write( "touch %s" % (fn(fout)))
-    >>> f.close()
-    >>> shellTask = PypeShellTask(outputDataObjs={"test_out":fout},
-    ...                           inputDataObjs={"test_in":fin},
-    ...                           parameters={"a":'I am "a"'}, **{"b":'I am "b"'}) 
-    >>> shellTask = shellTask("/tmp/pypetest/shellTask.sh")
-    >>> type(shellTask) 
-    <class 'pypeflow.task.PypeTaskBase'>
-    >>> print fn(shellTask.test_in)
-    /tmp/pypetest/testfile_in
-    >>> os.system( "touch %s" %  fn(fin)  ) 
-    0
-    >>> timeStampCompare(shellTask.inputDataObjs, shellTask.outputDataObjs, shellTask.parameters)
-    True
-    >>> print shellTask._getRunFlag()
-    True
-    >>> shellTask() # run task
-    True
-    >>> timeStampCompare(shellTask.inputDataObjs, shellTask.outputDataObjs, shellTask.parameters)
-    False
-    >>> print shellTask._getRunFlag()
-    False
-    >>> shellTask()
-    True
-    """
-
-    def f(scriptToRun):
-        def taskFun(self):
-            """make shell script using a template"""
-            """run shell command"""
-            shellCmd = "/bin/bash %s" % scriptToRun
-            runShellCmd(shlex.split(shellCmd))
-
-        kwargv["script"] = scriptToRun
-        return PypeTask(**kwargv)(taskFun)
-
-    return f
-
-
-def PypeSGETask(**kwargv):
-
-    """
-    Similar to PypeShellTask, but the shell script job will be executed through SGE.
-    """
-
-    def f(scriptToRun):
-
-        def taskFun():
-            """make shell script using the template"""
-            """run shell command"""
-            shellCmd = "qsub -sync y -S /bin/bash %s" % scriptToRun
-            runShellCmd(shlex.split(shellCmd))
-
-        kwargv["script"] = scriptToRun
-
-        return PypeTask(**kwargv)(taskFun)
-
-    return f
-
-def PypeDistributibleTask(**kwargv):
-
-    """
-    Similar to PypeShellTask and PypeSGETask, with an additional argument "distributed" to decide
-    whether a job to be run through local shell or SGE.
-    """
-
-    distributed = kwargv.get("distributed", False)
-    def f(scriptToRun):
-        def taskFun(self):
-            """make shell script using the template"""
-            """run shell command"""
-            if distributed == True:
-                shellCmd = "qsub -sync y -S /bin/bash %s" % scriptToRun
-            else:
-                shellCmd = "/bin/bash %s" % scriptToRun
-
-            runShellCmd(shlex.split(shellCmd))
-
-        kwargv["script"] = scriptToRun
-        return PypeTask(**kwargv)(taskFun)
-
-    return f
-
-
-def PypeScatteredTasks(**kwargv):
-
-    def f(taskFun):
-
-        TaskType = kwargv.get("TaskType", PypeTaskBase)
-
-        if "TaskType" in kwargv:
-            del kwargv["TaskType"]
-
-        kwargv["_taskFun"] = taskFun
-
-        inputDataObjs = kwargv["inputDataObjs"]
-        outputDataObjs = kwargv["outputDataObjs"]
-        nChunk = None
-        scatteredInput  = []
-
-        if kwargv.get("URL", None) == None:
-            kwargv["URL"] = "tasks://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name
-
-        tasks = PypeTaskCollection(kwargv["URL"])
-
-        for inputKey, inputDO in inputDataObjs.items():
-            if hasattr(inputDO, "nChunk"):
-                if nChunk != None:
-                    assert inputDO.nChunk == nChunk
-                else:
-                    nChunk = inputDO.nChunk
-                    if inputDO.getScatterTask() != None:
-                        tasks.addScatterGatherTask( inputDO.getScatterTask() )
-
-                scatteredInput.append( inputKey )
-
-        for outputKey, outputDO in outputDataObjs.items():
-            if hasattr(outputDO, "nChunk"):
-                if nChunk != None:
-                    assert outputDO.nChunk == nChunk
-                    if outputDO.getGatherTask() != None:
-                        tasks.addScatterGatherTask( outputDO.getGatherTask() )
-                else:
-                    nChunk = outputDO.nChunk
-
-
-        for i in range(nChunk):
-
-            newKwargv = copy.copy(kwargv)
-
-            subTaskInput = {}
-            for inputKey, inputDO in inputDataObjs.items():
-                if inputKey in scatteredInput:
-                    subTaskInput[inputKey] = inputDO.getSplittedFiles()[i]
-                else:
-                    subTaskInput[inputKey] = inputDO
-
-            subTaskOutput = {}
-            for outputKey, outputDO in outputDataObjs.items():
-                subTaskOutput[outputKey] = outputDO.getSplittedFiles()[i]
-
-            newKwargv["inputDataObjs"] = subTaskInput
-            newKwargv["outputDataObjs"] = subTaskOutput
-
-            #newKwargv["URL"] = "task://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name + "/%03d" % i
-            newKwargv["URL"] = kwargv["URL"].replace("tasks","task") + "/%03d" % i
-
-            try:
-                newKwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest()
-            except IOError: 
-                # python2.7 seems having problem to get source code from docstring, 
-                # this is a work around to make docstring test working
-                newKwargv["_codeMD5digest"] = ""
-
-            newKwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest()
-            newKwargv["chunk_id"] = i
-
-            
-            tasks.addTask( TaskType(**newKwargv) )
-        return tasks
-    return f
-
-getPypeScatteredTasks = PypeScatteredTasks
-
-def PypeFOFNMapTasks(**kwargv):
-    """
-    A special decorator that takes a FOFN (file of file names) as the main
-    input and generate the tasks with the inputs are the files specified in
-    the FOFN
-
-    Example:
-
-        def outTemplate(fn):
-            return fn + ".out"
-
-        def task(self, **kwargv):
-            in_f = self.in_f
-            out_f = self.out_f
-            #do something with in_f, and write something to out_f
-
-        tasks = PypeFOFNMapTasks(FOFNFileName = "./file.fofn", 
-                outTemplateFunc = outTemplate, 
-                TaskType = PypeThreadTaskBase,
-                parameters = dict(nSlots = 8))( alignTask )
-    """
-
-    def f(taskFun):
-
-        TaskType = kwargv.get("TaskType", PypeTaskBase)
-
-        if "TaskType" in kwargv:
-            del kwargv["TaskType"]
-
-        kwargv["_taskFun"] = taskFun
-
-        FOFNFileName = kwargv["FOFNFileName"]
-        outTemplateFunc = kwargv["outTemplateFunc"]
-
-        if kwargv.get("URL", None) == None:
-            kwargv["URL"] = "tasks://" + inspect.getfile(taskFun) + "/"+ taskFun.func_name
-
-        tasks = PypeTaskCollection(kwargv["URL"])
-
-        with open(FOFNFileName,"r") as FOFN:
-
-            newKwargv = copy.copy(kwargv)
-            
-            for fn in FOFN:
-
-                fn = fn.strip()
-
-                if len(fn) == 0:
-                    continue
-
-                newKwargv["inputDataObjs"] = {"in_f": makePypeLocalFile(fn) } 
-                outfileName = outTemplateFunc(fn)
-                newKwargv["outputDataObjs"] = {"out_f": makePypeLocalFile(outfileName) } 
-                newKwargv["URL"] = kwargv["URL"].replace("tasks","task") + "/%s" % hashlib.md5(fn).hexdigest() 
-
-                try:
-                    newKwargv["_codeMD5digest"] = hashlib.md5(inspect.getsource(taskFun)).hexdigest()
-                except IOError: 
-                    # python2.7 seems having problem to get source code from docstring, 
-                    # this is a work around to make docstring test working
-                    newKwargv["_codeMD5digest"] = ""
-
-
-                newKwargv["_paramMD5digest"] = hashlib.md5(repr(kwargv)).hexdigest()
-
-                tasks.addTask( TaskType(**newKwargv) )
-
-            allFOFNOutDataObjs = dict( [ ("FOFNout%03d" % t[0], t[1].in_f) for t in enumerate(tasks) ] )
-
-            def pseudoScatterTask(**kwargv):
-                pass
-
-            newKwargv = dict( inputDataObjs = {"FOFNin": makePypeLocalFile(FOFNFileName)}, 
-                              outputDataObjs = allFOFNOutDataObjs,
-                              _taskFun = pseudoScatterTask,
-                              _compareFunctions = [lambda inObjs, outObj, params: False], #this task is never meant to be run
-                              URL = "task://pseudoScatterTask/%s" % FOFNFileName)
-
-            tasks.addTask( TaskType(**newKwargv) )
-
-        return tasks
-
-    return f
-
-getFOFNMapTasks = PypeFOFNMapTasks
-
-def timeStampCompare( inputDataObjs, outputDataObjs, parameters) :
-
-    """
-    Given the inputDataObjs and the outputDataObjs, determine whether any
-    object in the inputDataObjs is created or modified later than any object
-    in outputDataObjects.
-    """
-
-    runFlag = False
-
-    inputDataObjsTS = []
-    for ft, f in inputDataObjs.iteritems():
-        if not f.exists:
-            wait_s = 60
-            logger.warning('input does not exist yet (in this filesystem): %r - waiting up to %ds' %(f, wait_s))
-            for i in range(wait_s):
-                time.sleep(1)
-                if f.exists:
-                    logger.warning('now exists: %r (after only %ds)' %(f, i))
-                    break
-            # At this point, if it still does not exist, the entire workflow will fail.
-            # What else can we do? The user's filesystem has too much latency.
-        inputDataObjsTS.append((f.timeStamp, 'A', f))
-
-    outputDataObjsTS = []
-    for ft, f in outputDataObjs.iteritems():
-        if not f.exists:
-            logger.debug('output does not exist yet: %r'%f)
-            runFlag = True
-            break
-        else:
-            # 'A' < 'B', so outputs are 'later' if timestamps match.
-            outputDataObjsTS.append((f.timeStamp, 'B', f))
-
-    if not outputDataObjs:
-        # 0 outputs => always run
-        runFlag = True
-
-    if not runFlag and inputDataObjs: # 0 inputs would imply that existence of outputs is enough.
-        minOut = min(outputDataObjsTS)
-        maxIn = max(inputDataObjsTS)
-        if minOut < maxIn:
-            logger.debug('timestamp of output < input: %r < %r'%(minOut, maxIn))
-            runFlag = True
-
-    return runFlag
-
-if __name__ == "__main__":
-    import doctest
-    doctest.testmod()
diff --git a/pypeFLOW/setup.py b/pypeFLOW/setup.py
index df77bec..1397b35 100644
--- a/pypeFLOW/setup.py
+++ b/pypeFLOW/setup.py
@@ -2,22 +2,19 @@ from setuptools import setup, Extension, find_packages
 
 setup(
     name = 'pypeflow',
-    version='0.1.1',
+    version='1.0.0',
     author='J. Chin',
     author_email='cschin at infoecho.net',
     license='LICENSE.txt',
     packages = [
         'pypeflow',
-        'pwatcher', # a separate package for here for convenience, for now
+        'pwatcher', # a separate package here for convenience, for now
         'pwatcher.mains',
     ],
     package_dir = {'':'.'},
     zip_safe = False,
     install_requires=[
-        'rdflib == 3.4.0',
-        'rdfextras >= 0.1',
-        'html5lib == 0.999999',
-        'networkx >=1.7, <=1.10',
+        'networkx >=1.7, <=1.11',
     ],
     entry_points = {'console_scripts': [
             'pwatcher-main=pwatcher.mains.pwatcher:main',

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/falcon.git



More information about the debian-med-commit mailing list