[med-svn] [kineticstools] 01/05: Imported Upstream version 0.6.0+dfsg

Afif Elghraoui afif at moszumanska.debian.org
Sun Dec 18 08:04:18 UTC 2016


This is an automated email from the git hooks/post-receive script.

afif pushed a commit to branch master
in repository kineticstools.

commit 9b27a33cc63f195f78ad11e87de794f861bf6ce9
Author: Afif Elghraoui <afif at ghraoui.name>
Date:   Mon Aug 22 23:11:01 2016 -0700

    Imported Upstream version 0.6.0+dfsg
---
 README.md                                 |   3 +-
 bin/__init__.py                           |  29 ----
 circle.yml                                |   3 +-
 kineticsTools/ReferenceUtils.py           |  11 +-
 kineticsTools/ResultWriter.py             | 241 ++++++++++++++++++++----------
 kineticsTools/ipdSummary.py               |  91 ++++++-----
 kineticsTools/summarizeModifications.py   |   2 +-
 kineticsTools/tasks/__init__.py           |   0
 kineticsTools/tasks/gather_kinetics_h5.py | 172 +++++++++++++++++++++
 requirements-ci.txt                       |   3 +-
 requirements-dev.txt                      |   1 -
 setup.py                                  |   6 +-
 test/cram/extra/bigwig.t                  |  15 ++
 test/cram/version.t                       |  19 +--
 test/test_gather_h5.py                    |  77 ++++++++++
 test/test_outputs.py                      | 105 +++++++++++++
 test/test_tool_contract.py                |  28 ++--
 17 files changed, 624 insertions(+), 182 deletions(-)

diff --git a/README.md b/README.md
index 1e95e30..ec8a88b 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,4 @@
-kineticsTools
-=============
+ kineticsTools [![Circle CI](https://circleci.com/gh/PacificBiosciences/kineticsTools.svg?style=svg)](https://circleci.com/gh/PacificBiosciences/kineticsTools)
 
 Tools for detecting DNA modifications from single molecule, real-time (SMRT®) sequencing data. This tool implements the P_ModificationDetection module in SMRT® Portal, used by the RS_Modification_Detection and RS_Modifications_and_Motif_Detection protocol. Researchers interested in understanding or extending the modification detection algorthims can use these tools as a starting point. 
 
diff --git a/bin/__init__.py b/bin/__init__.py
deleted file mode 100755
index 402d7a8..0000000
--- a/bin/__init__.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#################################################################################
-# Copyright (c) 2011-2013, Pacific Biosciences of California, Inc.
-#
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-# * Redistributions of source code must retain the above copyright
-#   notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright
-#   notice, this list of conditions and the following disclaimer in the
-#   documentation and/or other materials provided with the distribution.
-# * Neither the name of Pacific Biosciences nor the names of its
-#   contributors may be used to endorse or promote products derived from
-#   this software without specific prior written permission.
-#
-# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
-# THIS LICENSE.  THIS SOFTWARE IS PROVIDED BY PACIFIC BIOSCIENCES AND ITS
-# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL PACIFIC BIOSCIENCES OR
-# ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
-# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-#################################################################################
diff --git a/circle.yml b/circle.yml
index a1e6411..01a3209 100644
--- a/circle.yml
+++ b/circle.yml
@@ -12,7 +12,8 @@ dependencies:
     - (cd .circleci && bash installHDF5.sh)
     - HDF5_DIR=$PWD/.circleci/prefix pip install -r requirements-ci.txt
     - HDF5_DIR=$PWD/.circleci/prefix pip install -r requirements-dev.txt
+    - pip install cram nose
     
 test:
   override:
-    - make test # Run doctests in addition to the usual unit tests
+    - make unit-tests
diff --git a/kineticsTools/ReferenceUtils.py b/kineticsTools/ReferenceUtils.py
index a48ca8d..e63758c 100755
--- a/kineticsTools/ReferenceUtils.py
+++ b/kineticsTools/ReferenceUtils.py
@@ -46,7 +46,7 @@ ReferenceWindow = namedtuple("ReferenceWindow", ["refId", "refName", "start", "e
 class ReferenceUtils():
 
     @staticmethod
-    def loadReferenceContigs(referencePath, alignmentSet):
+    def loadReferenceContigs(referencePath, alignmentSet, windows=None):
         # FIXME we should get rid of this entirely, but I think it requires
         # fixing the inconsistency in how contigs are referenced here versus in
         # pbcore.io
@@ -58,7 +58,14 @@ class ReferenceUtils():
 
         # Read contigs from FASTA file (or XML dataset)
         refReader = ReferenceSet(referencePath)
-        contigs = [x for x in refReader]
+        contigs = []
+        if windows is not None:
+            refNames = set([rw.refName for rw in windows])
+            for contig in refReader:
+                if contig.id in refNames:
+                    contigs.append(contig)
+        else:
+            contigs.extend([x for x in refReader])
         contigDict = dict([(x.id, x) for x in contigs])
 
         # initially each contig has an id of None -- this will be overwritten with the id from the cmp.h5, if there are any
diff --git a/kineticsTools/ResultWriter.py b/kineticsTools/ResultWriter.py
index d5e74e2..0671fcd 100755
--- a/kineticsTools/ResultWriter.py
+++ b/kineticsTools/ResultWriter.py
@@ -28,6 +28,7 @@
 # POSSIBILITY OF SUCH DAMAGE.
 #################################################################################
 
+from collections import namedtuple, defaultdict
 import cProfile
 import logging
 import os.path
@@ -47,6 +48,8 @@ FRAC = 'frac'
 FRAClow = 'fracLow'
 FRACup = 'fracUp'
 
+log = logging.getLogger(__name__)
+
 
 class ResultCollectorProcess(Process):
 
@@ -61,7 +64,7 @@ class ResultCollectorProcess(Process):
         self._resultsQueue = resultsQueue
 
     def _run(self):
-        logging.info("Process %s (PID=%d) started running" % (self.name, self.pid))
+        log.info("Process %s (PID=%d) started running" % (self.name, self.pid))
 
         self.onStart()
 
@@ -94,7 +97,7 @@ class ResultCollectorProcess(Process):
 
                     nextChunkId += 1
 
-        logging.info("Result thread shutting down...")
+        log.info("Result thread shutting down...")
         self.onFinish()
 
     def run(self):
@@ -192,23 +195,6 @@ class KineticsWriter(ResultCollectorProcess):
         except Exception as e:
             print e
 
-    @consumer
-    def hdf5CsvConsumer(self, filename):
-
-        grp = h5py.File(filename, "w")
-
-        y = [int(ref.Length) for ref in self.refInfo]
-        dataLength = sum(y)
-        y.append(8192)
-        chunkSize = min(dataLength, 8192 * 2)
-        # print "dataLength = ", dataLength, " chunkSize = ", chunkSize, " y = ", y
-
-        refIdDataset = grp.create_dataset('refId', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,), compression_opts=2)
-        tplDataset = grp.create_dataset('tpl', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,), compression_opts=2)
-        strandDataset = grp.create_dataset('strand', (dataLength,), dtype="u1", compression="gzip", chunks=(chunkSize,), compression_opts=2)
-
-
-
 
     @consumer
     def csvConsumer(self, filename):
@@ -293,12 +279,85 @@ class KineticsWriter(ResultCollectorProcess):
             print e
 
     @consumer
-    def hdf5CsvConsumer(self, filename):
+    def bigWigConsumer(self, filename):
+        import pyBigWig
+        records = []
+        records_by_pos = defaultdict(list)
+        ranges = {}
+        BaseInfo = namedtuple("BaseInfo", ("seqid", "pos", "sense", "ipd"))
+        try:
+            while True:
+                chunk = (yield)
+                if len(chunk) == 0:
+                    continue
+                # Fill out the ipd observations into the dataset
+                for x in chunk:
+                    pos = int(x['tpl']) + 1
+                    seqid = x['refName']
+                    ranges.setdefault(seqid, (sys.maxint, 0))
+                    ranges[seqid] = (min(ranges[seqid][0], pos),
+                                     max(ranges[seqid][1], pos+1))
+                    rec = BaseInfo(
+                        seqid=seqid,
+                        pos=pos,
+                        sense=int(x['strand']),
+                        ipd=float(x['ipdRatio']))
+                    records.append(rec)
+                    records_by_pos[(rec.seqid, rec.pos)].append(rec)
+        except GeneratorExit:
+            records.sort(lambda a, b: cmp(a.pos, b.pos))
+            records.sort(lambda a, b: cmp(a.seqid, b.seqid))
+            regions = [(s, ranges[s][1]-1) for s in sorted(ranges.keys())]
+            if len(regions) == 0:
+                with open(filename, "wb") as _:
+                    return
+            bw = pyBigWig.open(filename, "w")
+            bw.addHeader(regions)
+            k = 0
+            seqids = []
+            starts = []
+            ends = []
+            ipd_enc = []
+            # records are not necessarily consecutive or two per base!
+            have_pos = set()
+            def encode_ipds(plus, minus):
+                enc = lambda x: min(65535, int(round(100*x)))
+                return float(enc(minus) + 65536*enc(plus))
+            for rec in records:
+                if (rec.seqid, rec.pos) in have_pos:
+                    continue
+                have_pos.add((rec.seqid, rec.pos))
+                strand_records = records_by_pos[(rec.seqid, rec.pos)]
+                if len(strand_records) == 2:
+                    rec_minus = strand_records[k] if strand_records[k].sense else strand_records[k + 1]
+                    rec_plus = strand_records[k + 1] if strand_records[k].sense else strand_records[k]
+                    assert rec_plus.pos == rec_minus.pos, (rec_plus, rec_minus)
+                    seqids.append(rec_plus.seqid)
+                    starts.append(rec_plus.pos-1)
+                    ends.append(rec_plus.pos)
+                    ipd_enc.append(encode_ipds(rec_plus.ipd, rec_minus.ipd))
+                else:
+                    seqids.append(rec.seqid)
+                    starts.append(rec.pos-1)
+                    ends.append(rec.pos)
+                    if rec.sense == 0:
+                        ipd_enc.append(encode_ipds(rec.ipd, 0))
+                    else:
+                        ipd_enc.append(encode_ipds(0, rec.ipd))
+            log.info("Writing records for {n} bases".format(n=len(seqids)))
+            bw.addEntries(seqids, starts, ends=ends, values=ipd_enc)
+            bw.close()
+            return
 
+    @consumer
+    def hdf5CsvConsumer(self, filename):
+        """
+        Write records to an HDF5 file equivalent to the CSV output, with +/-
+        bases adjacent in the arrays.
+        """
         grp = h5py.File(filename, "w")
-
         y = [int(ref.Length) for ref in self.refInfo]
-        dataLength = sum(y)
+        dataLength = 2 * sum(y)
         y.append(8192)
         chunkSize = min(dataLength, 8192 * 2)
         # print "dataLength = ", dataLength, " chunkSize = ", chunkSize, " y = ", y
@@ -345,10 +404,9 @@ class KineticsWriter(ResultCollectorProcess):
                     fracUpDataset = grp[FRACup]
                 '''
 
-                start = min(x['tpl'] for x in chunk)
-                end = min(max(x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
-
-                arrLen = end - start + 1
+                start = min(2*x['tpl'] for x in chunk)
+                end = min(max(2*x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
+                arrLen = end - start + 2
 
                 refId = np.empty(arrLen, dtype="u4")
                 tpl = np.zeros(arrLen, dtype="u4")
@@ -368,13 +426,15 @@ class KineticsWriter(ResultCollectorProcess):
                 # Fill out the ipd observations into the dataset
                 for x in chunk:
                     # offset into the current chunk
-                    idx = x['tpl'] - start
+                    _strand = int(x['strand'])
+                    idx = (2 * x['tpl']) - start + _strand
 
-                    # Data points past the end of the reference can make it through -- filter them out here
+                    # Data points past the end of the reference can make it
+                    # through -- filter them out here
                     if idx < arrLen:
                         refId[idx] = int(x['refId'])
-                        tpl[idx] = int(x['tpl'])
-                        strand[idx] = int(x['strand'])
+                        tpl[idx] = int(x['tpl']) + 1 # 'tpl' is 0-based
+                        strand[idx] = _strand
                         base[idx] = x['base']
                         score[idx] = int(x['score'])
                         tMean[idx] = float(x['tMean'])
@@ -392,31 +452,41 @@ class KineticsWriter(ResultCollectorProcess):
                                 fracLow[idx] = np.nan
                                 fracUp[idx] = np.nan
 
-                refIdDataset[start:(end + 1)] = refId
-                tplDataset[start:(end + 1)] = tpl
-                strandDataset[start:(end + 1)] = strand
-                baseDataset[start:(end + 1)] = base
-                scoreDataset[start:(end + 1)] = score
-                tMeanDataset[start:(end + 1)] = tMean
-                tErrDataset[start:(end + 1)] = tErr
-                modelPredictionDataset[start:(end + 1)] = modelPrediction
-                ipdRatioDataset[start:(end + 1)] = ipdRatio
-                coverageDataset[start:(end + 1)] = coverage
+                refIdDataset[start:(end + 2)] = refId
+                tplDataset[start:(end + 2)] = tpl
+                strandDataset[start:(end + 2)] = strand
+                baseDataset[start:(end + 2)] = base
+                scoreDataset[start:(end + 2)] = score
+                tMeanDataset[start:(end + 2)] = tMean
+                tErrDataset[start:(end + 2)] = tErr
+                modelPredictionDataset[start:(end + 2)] = modelPrediction
+                ipdRatioDataset[start:(end + 2)] = ipdRatio
+                coverageDataset[start:(end + 2)] = coverage
                 if self.options.methylFraction:
-                    fracDataset[start:(end + 1)] = frac
-                    fracLowDataset[start:(end + 1)] = fracLow
-                    fracUpDataset[start:(end + 1)] = fracUp
-
+                    fracDataset[start:(end + 2)] = frac
+                    fracLowDataset[start:(end + 2)] = fracLow
+                    fracUpDataset[start:(end + 2)] = fracUp
+                grp.flush()
+                del refId
+                del tpl
+                del strand
+                del base
+                del score
+                del tMean
+                del tErr
+                del modelPrediction
+                del ipdRatio
+                del coverage
         except GeneratorExit:
             # Close down the h5 file
             grp.close()
             return
 
-    # an alternative version that collects data into groups according to reference:
     @consumer
     def alt_hdf5CsvConsumer(self, filename):
         """
-        Similar to csv consumer but writing to hdf5 format.
+        Alternative HDF5 output that collects data into groups according to
+        reference (not currently enabled).
         """
 
         f = h5py.File(filename, "w")
@@ -424,25 +494,26 @@ class KineticsWriter(ResultCollectorProcess):
 
         for ref in self.refInfo:
             # Each reference group will house a collection of datasets:
-            chunkSize = min(ref.Length, 8192)
+            dataLength = ref.Length * 2
+            chunkSize = min(dataLength, 8192)
 
             # Create a group for each reference:
             grp = f.create_group(str(ref.Name))
 
-            ds = grp.create_dataset('tpl', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('strand', (ref.Length,), dtype="u1", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('base', (ref.Length,), dtype="a1", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('score', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('tMean', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('tErr', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('modelPrediction', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('ipdRatio', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
-            ds = grp.create_dataset('coverage', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('tpl', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('strand', (dataLength,), dtype="u1", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('base', (dataLength,), dtype="a1", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('score', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('tMean', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('tErr', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('modelPrediction', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('ipdRatio', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+            ds = grp.create_dataset('coverage', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,))
 
             if self.options.methylFraction:
-                ds = grp.create_dataset(FRAC, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
-                ds = grp.create_dataset(FRAClow, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
-                ds = grp.create_dataset(FRACup, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+                ds = grp.create_dataset(FRAC, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+                ds = grp.create_dataset(FRAClow, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+                ds = grp.create_dataset(FRACup, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
 
             # Maintain a dictionary of group paths?
             dsDict[ref.ID] = grp
@@ -473,10 +544,10 @@ class KineticsWriter(ResultCollectorProcess):
                     fracLowDataset = grp[FRAClow]
                     fracUpDataset = grp[FRACup]
 
-                start = min(x['tpl'] for x in chunk)
-                end = min(max(x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
+                start = min(2*x['tpl'] for x in chunk)
+                end = min(max(2*x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
 
-                arrLen = end - start + 1
+                arrLen = end - start + 2
 
                 tpl = np.zeros(arrLen, dtype="u4")
                 strand = np.zeros(arrLen, dtype="u1")
@@ -495,11 +566,12 @@ class KineticsWriter(ResultCollectorProcess):
                 # Fill out the ipd observations into the dataset
                 for x in chunk:
                     # offset into the current chunk
-                    idx = x['tpl'] - start
+                    _strand = int(x['strand'])
+                    idx = (2 * x['tpl']) - start + _strand
 
                     # Data points past the end of the reference can make it through -- filter them out here
                     if idx < arrLen:
-                        tpl[idx] += int(x['tpl'])
+                        tpl[idx] += int(x['tpl']) + 1
                         strand[idx] += int(x['strand'])
                         base[idx] = x['base']
                         score[idx] += int(x['score'])
@@ -519,20 +591,20 @@ class KineticsWriter(ResultCollectorProcess):
                                 fracUp[idx] = np.nan
 
                 # Write our chunk into the main dataset
-                tplDataset[start:(end + 1)] = tpl
-                strandDataset[start:(end + 1)] = strand
-                baseDataset[start:(end + 1)] = base
-                scoreDataset[start:(end + 1)] = score
-                tMeanDataset[start:(end + 1)] = tMean
-                tErrDataset[start:(end + 1)] = tErr
-                modelPredictionDataset[start:(end + 1)] = modelPrediction
-                ipdRatioDataset[start:(end + 1)] = ipdRatio
-                coverageDataset[start:(end + 1)] = coverage
+                tplDataset[start:(end + 2)] = tpl
+                strandDataset[start:(end + 2)] = strand
+                baseDataset[start:(end + 2)] = base
+                scoreDataset[start:(end + 2)] = score
+                tMeanDataset[start:(end + 2)] = tMean
+                tErrDataset[start:(end + 2)] = tErr
+                modelPredictionDataset[start:(end + 2)] = modelPrediction
+                ipdRatioDataset[start:(end + 2)] = ipdRatio
+                coverageDataset[start:(end + 2)] = coverage
                 if self.options.methylFraction:
-                    fracDataset[start:(end + 1)] = frac
-                    fracLowDataset[start:(end + 1)] = fracLow
-                    fracUpDataset[start:(end + 1)] = fracUp
-
+                    fracDataset[start:(end + 2)] = frac
+                    fracLowDataset[start:(end + 2)] = fracLow
+                    fracUpDataset[start:(end + 2)] = fracUp
+                f.flush()
         except GeneratorExit:
             # Close down the h5 file
             f.close()
@@ -559,7 +631,7 @@ class KineticsWriter(ResultCollectorProcess):
 
         for ref in self.refInfo:
             # FIXME -- create with good chunk parameters, activate compression
-            logging.info("Creating IpdRatio dataset w/ name: %s, Size: %d" % (str(ref.Name), ref.Length))
+            log.info("Creating IpdRatio dataset w/ name: %s, Size: %d" % (str(ref.Name), ref.Length))
 
             chunkSize = min(ref.Length, 8192)
 
@@ -835,10 +907,11 @@ class KineticsWriter(ResultCollectorProcess):
             ('m5Cgff', 'm5C.gff', self.m5CgffConsumer),
             ('gff', 'gff', self.gffConsumer),
             ('csv', 'csv', self.csvConsumer),
+            ('bigwig', 'bw', self.bigWigConsumer),
             ('ms_csv', 'ms.csv', self.msCsvConsumer),
             ('pickle', 'pickle', self.csvConsumer),
             ('summary_h5', 'summary.h5', self.ipdRatioH5Consumer),
-            ('csv_h5', 'h5', self.hdf5CsvConsumer)
+            ('csv_h5', 'h5', self.alt_hdf5CsvConsumer)
         ]
 
         sinkList = []
@@ -850,7 +923,15 @@ class KineticsWriter(ResultCollectorProcess):
 
             # The 'outfile argument causes all outputs to be generated
             if self.options.outfile:
-                name = self.options.outfile + '.' + ext
+                if ext == "bw":
+                    try:
+                        import pyBigWig
+                    except ImportError:
+                        pass
+                    else:
+                        name = self.options.outfile + '.' + ext
+                else:
+                    name = self.options.outfile + '.' + ext
 
             # Individual outputs can specified - these filename override the default
             if self.options.__getattribute__(fileType):
diff --git a/kineticsTools/ipdSummary.py b/kineticsTools/ipdSummary.py
index 5c52c54..6bfd088 100755
--- a/kineticsTools/ipdSummary.py
+++ b/kineticsTools/ipdSummary.py
@@ -61,12 +61,9 @@ from kineticsTools.ResultWriter import KineticsWriter
 from kineticsTools.ipdModel import IpdModel
 from kineticsTools.ReferenceUtils import ReferenceUtils
 
-# Version info
-__p4revision__ = "$Revision: #1 $"
-__p4change__ = "$Change: 100972 $"
-revNum = int(__p4revision__.strip("$").split(" ")[1].strip("#"))
-changeNum = int(__p4change__.strip("$").split(":")[-1])
-__version__ = "2.2"
+__version__ = "2.3"
+
+log = logging.getLogger(__name__)
 
 class Constants(object):
     TOOL_ID = "kinetics_tools.tasks.ipd_summary"
@@ -129,17 +126,23 @@ def get_parser():
         type=validateFile, help="Fasta or Reference DataSet")
     # XXX GFF and CSV are "option" for arg parser, not tool contract
     tcp.add_output_file_type(FileTypes.GFF, "gff",
-        name="GFF file",
+        name="Modified bases GFF",
         description="GFF file of modified bases",
         default_name="basemods")
-    tcp.add_output_file_type(FileTypes.CSV, "csv",
-        name="CSV file",
-        description="CSV file of per-nucleotide information",
+    tcp.add_output_file_type(FileTypes.BIGWIG, "bigwig",
+        name="BigWig file encoding base IpdRatios",
+        description="Compressed binary format containing the IpdRatios for every base (both strands)",
+        default_name="basemods")
+    tcp.add_output_file_type(FileTypes.H5, "csv_h5",
+        name="HDF5 file containing per-base information",
+        description="HDF5 equivalent of CSV output",
         default_name="basemods")
     argp.add_argument("--gff", action="store", default=None,
         help="Output GFF file of modified bases")
     argp.add_argument("--csv", action="store", default=None,
         help="Output CSV file out per-nucleotide information")
+    argp.add_argument("--bigwig", action="store", default=None,
+        help="Output BigWig file encoding IpdRatio for both strands")
     # FIXME use central --nproc option
     argp.add_argument('--numWorkers', '-j',
         dest='numWorkers',
@@ -372,13 +375,18 @@ def _get_more_options(parser):
                         default=None,
                         help="Random seed (for development and debugging purposes only)")
 
+    parser.add_argument("--referenceStride", action="store", type=int,
+                        default=1000,
+                        help="Size of reference window in internal "+
+                             "parallelization.  For testing purposes only.")
+
     return parser
 
 
 class KineticsToolsRunner(object):
     def __init__(self, args):
         self.args = args
-        self._sharedAlignmentSet = None
+        self.alignments = None
 
     def start(self):
         self.validateArgs()
@@ -508,7 +516,7 @@ class KineticsToolsRunner(object):
         for i in xrange(self.options.numWorkers):
             p = WorkerType(self.options, self._workQueue, self._resultsQueue,
                 self.ipdModel,
-                sharedAlignmentSet=self._sharedAlignmentSet)
+                sharedAlignmentSet=self.alignments)
             self._workers.append(p)
             p.start()
         logging.info("Launched worker processes.")
@@ -532,11 +540,11 @@ class KineticsToolsRunner(object):
         pass
 
     def loadReferenceAndModel(self, referencePath):
-        assert self._sharedAlignmentSet is not None
+        assert self.alignments is not None and self.referenceWindows is not None
         # Load the reference contigs - annotated with their refID from the cmp.h5
         logging.info("Loading reference contigs %s" % referencePath)
         contigs = ReferenceUtils.loadReferenceContigs(referencePath,
-            alignmentSet=self._sharedAlignmentSet)
+            alignmentSet=self.alignments, windows=self.referenceWindows)
 
         # There are three different ways the ipdModel can be loaded.
         # In order of precedence they are:
@@ -558,8 +566,7 @@ class KineticsToolsRunner(object):
                 logging.error("Params path doesn't exist: %s" % self.args.paramsPath)
                 sys.exit(1)
 
-            majorityChem = ReferenceUtils.loadAlignmentChemistry(
-                self._sharedAlignmentSet)
+            majorityChem = ReferenceUtils.loadAlignmentChemistry(self.alignments)
             ipdModel = os.path.join(self.args.paramsPath, majorityChem + ".h5")
             if majorityChem == 'unknown':
                 logging.error("Chemistry cannot be identified---cannot perform kinetic analysis")
@@ -580,11 +587,11 @@ class KineticsToolsRunner(object):
         """
         logging.info("Reading AlignmentSet: %s" % cmpH5Filename)
         logging.info("           reference: %s" % self.args.reference)
-        self._sharedAlignmentSet = AlignmentSet(cmpH5Filename,
-            referenceFastaFname=self.args.reference)
+        self.alignments = AlignmentSet(cmpH5Filename,
+                                       referenceFastaFname=self.args.reference)
         # XXX this should ensure that the file(s) get opened, including any
         # .pbi indices - but need to confirm this
-        self.refInfo = self._sharedAlignmentSet.referenceInfoTable
+        self.refInfo = self.alignments.referenceInfoTable
 
     def _mainLoop(self):
         """
@@ -603,31 +610,16 @@ class KineticsToolsRunner(object):
         # interpreter crashes sometimes.  See Bug 19704.  Since we
         # don't leak garbage cycles, disabling the cyclic GC is
         # essentially harmless.
-        gc.disable()
+        #gc.disable()
 
-        # Load a copy of the cmpH5 alignment index to share with the slaves
         self.loadSharedAlignmentSet(self.args.alignment_set)
 
-        # Load reference and IpdModel
-        self.loadReferenceAndModel(self.args.reference)
-        
-        # Spawn workers
-        self._launchSlaveProcesses()
-
-        # WARNING -- cmp.h5 file must be opened AFTER worker processes have been spawned
-        # cmp.h5 we're using -- use this to orchestrate the work
-        self.cmph5 = self._sharedAlignmentSet
-        logging.info('Generating kinetics summary for [%s]' % self.args.alignment_set)
-
-        #self.referenceMap = self.cmph5['/RefGroup'].asDict('RefInfoID', 'ID')
-        #self.alnInfo = self.cmph5['/AlnInfo'].asRecArray()
-
         # Resolve the windows that will be visited.
         if self.args.referenceWindowsAsString is not None:
             self.referenceWindows = []
             for s in self.args.referenceWindowsAsString.split(","):
                 try:
-                    win = ReferenceUtils.parseReferenceWindow(s, self.cmph5.referenceInfo)
+                    win = ReferenceUtils.parseReferenceWindow(s, self.alignments.referenceInfo)
                     self.referenceWindows.append(win)
                 except:
                     if self.args.skipUnrecognizedContigs:
@@ -635,11 +627,25 @@ class KineticsToolsRunner(object):
                     else:
                         raise Exception, "Unrecognized contig!"
         elif self.args.referenceWindowsFromAlignment:
-            self.referenceWindows = ReferenceUtils.referenceWindowsFromAlignment(self._sharedAlignmentSet, self.cmph5.referenceInfo)
+            self.referenceWindows = ReferenceUtils.referenceWindowsFromAlignment(self.alignments, self.alignments.referenceInfo)
+            refNames = set([rw.refName for rw in self.referenceWindows])
+            # limit output to contigs that overlap with reference windows
+            self.refInfo = [r for r in self.refInfo if r.Name in refNames]
         else:
             self.referenceWindows = ReferenceUtils.createReferenceWindows(
                 self.refInfo)
 
+        # Load reference and IpdModel
+        self.loadReferenceAndModel(self.args.reference)
+
+        # Spawn workers
+        self._launchSlaveProcesses()
+
+        logging.info('Generating kinetics summary for [%s]' % self.args.alignment_set)
+
+        #self.referenceMap = self.alignments['/RefGroup'].asDict('RefInfoID', 'ID')
+        #self.alnInfo = self.alignments['/AlnInfo'].asRecArray()
+
         # Main loop -- we loop over ReferenceGroups in the cmp.h5.  For each contig we will:
         # 1. Load the sequence into the main memory of the parent process
         # 2. Fork the workers
@@ -650,7 +656,7 @@ class KineticsToolsRunner(object):
         # Iterate over references
         for window in self.referenceWindows:
             logging.info('Processing window/contig: %s' % (window,))
-            for chunk in ReferenceUtils.enumerateChunks(1000, window):
+            for chunk in ReferenceUtils.enumerateChunks(self.args.referenceStride, window):
                 self._workQueue.put((self.workChunkCounter, chunk))
                 self.workChunkCounter += 1
 
@@ -667,7 +673,7 @@ class KineticsToolsRunner(object):
         self._resultsQueue.join()
         self._resultCollectorProcess.join()
         logging.info("ipdSummary.py finished. Exiting.")
-        del self.cmph5
+        self.alignments.close()
         return 0
 
 
@@ -715,12 +721,14 @@ def resolved_tool_contract_runner(resolved_contract):
     alignment_path = rc.task.input_files[0]
     reference_path = rc.task.input_files[1]
     gff_path = rc.task.output_files[0]
-    csv_path = rc.task.output_files[1]
+    bigwig_path = rc.task.output_files[1]
+    h5_path = rc.task.output_files[2]
     args = [
         alignment_path,
         "--reference", reference_path,
         "--gff", gff_path,
-        "--csv", csv_path,
+        "--bigwig", bigwig_path,
+        "--csv_h5", h5_path,
         "--numWorkers", str(rc.task.nproc),
         "--pvalue", str(rc.task.options[Constants.PVALUE_ID]),
         "--alignmentSetRefWindows",
@@ -737,6 +745,7 @@ def resolved_tool_contract_runner(resolved_contract):
         args.extend([
             "--identify", rc.task.options[Constants.IDENTIFY_ID],
         ])
+    log.info("Converted arguments: ipdSummary {c}".format(c=" ".join(args)))
     args_ = get_parser().arg_parser.parser.parse_args(args)
     return args_runner(args_)
 
diff --git a/kineticsTools/summarizeModifications.py b/kineticsTools/summarizeModifications.py
index deb2768..2f2c669 100755
--- a/kineticsTools/summarizeModifications.py
+++ b/kineticsTools/summarizeModifications.py
@@ -178,7 +178,7 @@ def get_parser():
         name="GFF file",
         description="Alignment summary GFF")
     p.add_output_file_type(FileTypes.GFF, "gff_out",
-        name="GFF file",
+        name="Alignment summary with basemods",
         description="Modified alignment summary file",
         default_name="alignment_summary_with_basemods")
     return p
diff --git a/kineticsTools/tasks/__init__.py b/kineticsTools/tasks/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/kineticsTools/tasks/gather_kinetics_h5.py b/kineticsTools/tasks/gather_kinetics_h5.py
new file mode 100644
index 0000000..76703f5
--- /dev/null
+++ b/kineticsTools/tasks/gather_kinetics_h5.py
@@ -0,0 +1,172 @@
+
+"""
+pbsmrtpipe task to gather chunked hdf5 files containing raw kinetics data.
+"""
+
+
+from collections import defaultdict
+import logging
+import sys
+
+import h5py
+
+from pbcommand.pb_io.common import load_pipeline_chunks_from_json
+from pbcommand.cli import pbparser_runner
+from pbcommand.models import get_gather_pbparser, FileTypes
+from pbcommand.utils import setup_log
+
+log = logging.getLogger(__name__)
+
+
+class Constants(object):
+    TASK_ID = "kinetics_tools.tasks.gather_kinetics_h5"
+    VERSION = "0.1.0"
+    DRIVER_EXE = "python -m kineticsTools.tasks.gather_kinetics_h5 --resolved-tool-contract "
+    CHUNK_KEY = "$chunk.h5_id"
+    OPT_CHUNK_KEY = "kinetics_tools.tasks.gather_h5_chunk_key"
+
+
+def get_parser():
+    p = get_gather_pbparser(Constants.TASK_ID,
+                            Constants.VERSION,
+                            "Dev Kinetics HDF5 Gather",
+                            "General Chunk Kinetics HDF5 Gather",
+                            Constants.DRIVER_EXE,
+                            is_distributed=True,
+                            default_level=logging.INFO)
+    p.add_input_file_type(FileTypes.CHUNK, "cjson_in", "GCHUNK Json",
+                          "Gathered CHUNK Json with BigWig chunk key")
+
+    p.add_output_file_type(FileTypes.H5, "h5_out",
+                           "Kinetics HDF5 file",
+                           "Gathered HDF5 file", "gathered")
+
+    # Only need to add to argparse layer for the commandline
+    p.arg_parser.add_str(Constants.OPT_CHUNK_KEY,
+                         "chunk_key",
+                         Constants.CHUNK_KEY,
+                         "Chunk key",
+                         "Chunk key to use (format $chunk.{chunk-key}")
+
+    return p
+
+
+def gather_kinetics_h5(chunked_files, output_file):
+    """
+    Gather a set of 'flat' kinetics hdf5 chunks.  This will not scale well for
+    large (i.e. non-bacterial) genomes.
+    """
+    log.info("creating {f}...".format(f=output_file))
+    out = h5py.File(output_file, "w")
+    first = h5py.File(chunked_files[0])
+    dataLength = len(first[first.keys()[0]])
+    chunkSize = min(dataLength, 8192 * 8)
+    datasets = {}
+    def _add_chunk(chunk):
+        # FIXME this is insanely inefficient
+        log.debug("  getting mask")
+        mask = chunk['base'].__array__() != ''
+        for key in datasets.keys():
+            log.debug("  copying '{k}'".format(k=key))
+            datasets[key][mask] = chunk[key][mask]
+        del mask
+    for key in first.keys():
+        ds = out.create_dataset(key, (dataLength,),
+                                dtype=first[key].dtype,
+                                compression="gzip",
+                                chunks=(chunkSize,),
+                                compression_opts=2)
+        datasets[key] = ds
+    log.info("adding first chunk in {f}".format(f=chunked_files[0]))
+    _add_chunk(first)
+    out.flush()
+    for file_name in chunked_files[1:]:
+        #out = h5py.File(output_file, "r+")
+        chunk = h5py.File(file_name)
+        log.info("adding chunk in {f}".format(f=file_name))
+        _add_chunk(chunk)
+    out.close()
+    return 0
+
+
+def gather_kinetics_h5_byref(chunked_files, output_file):
+    """
+    Gather a set of hdf5 files containing with per-reference datasets.  This
+    implementation sacrifices some computational efficiency to limit the
+    memory overhead.
+    """
+    log.info("creating {f}...".format(f=output_file))
+    out = h5py.File(output_file, "w")
+    first = h5py.File(chunked_files[0])
+    refs_by_file = defaultdict(list)
+    ds_types = {}
+    ref_sizes = {}
+    for file_name in chunked_files:
+        chunk = h5py.File(file_name)
+        for rname in chunk.keys():
+            refs_by_file[rname].append(file_name)
+            grp = chunk[rname]
+            for ds_id in grp.keys():
+                if not ds_id in ds_types:
+                    ds_types[ds_id] = grp[ds_id].dtype
+                else:
+                    assert ds_types[ds_id] == grp[ds_id].dtype
+                if not rname in ref_sizes:
+                    ref_sizes[rname] = len(grp[ds_id])
+                else:
+                    assert ref_sizes[rname] == len(grp[ds_id])
+        chunk.close()
+    for ref_name, file_names in refs_by_file.iteritems():
+        log.info("creating group {r} (dataset length {s})".format(
+                 r=ref_name,
+                 s=ref_sizes[ref_name]))
+        grp = out.create_group(ref_name)
+        dataLength = ref_sizes[ref_name]
+        chunkSize = min(dataLength, 8192)
+        for ds_id, ds_type in ds_types.iteritems():
+            log.debug("  dataset {i} ({t})".format(i=ds_id, t=ds_type))
+            ds = grp.create_dataset(ds_id, (dataLength,), dtype=ds_type,
+                                    compression="gzip", chunks=(chunkSize,))
+            for file_name in file_names:
+                log.debug("    reading from {f}".format(f=file_name))
+                chunk = h5py.File(file_name)
+                grp_chk = chunk[ref_name]
+                mask = grp_chk['base'].__array__() != ''
+                ds[mask] = grp_chk[ds_id][mask]
+                del mask
+                chunk.close()
+        out.flush()
+    out.close()
+    return 0
+
+
+def _run_main(chunk_input_json, output_file, chunk_key):
+    chunks = load_pipeline_chunks_from_json(chunk_input_json)
+    chunked_files = []
+    for chunk in chunks:
+        if chunk_key in chunk.chunk_keys:
+            chunked_files.append(chunk.chunk_d[chunk_key])
+        else:
+            raise KeyError("Unable to find chunk key '{i}' in {p}".format(i=chunk_key, p=chunk))
+    return gather_kinetics_h5_byref(chunked_files, output_file)
+
+
+def args_runner(args):
+    return _run_main(args.cjson_in, args.bigwig_out, args.chunk_key)
+
+
+def rtc_runner(rtc):
+    return _run_main(rtc.task.input_files[0], rtc.task.output_files[0], rtc.task.chunk_key)
+
+
+def main(argv=sys.argv):
+    return pbparser_runner(argv[1:],
+                           get_parser(),
+                           args_runner,
+                           rtc_runner,
+                           log,
+                           setup_log)
+
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/requirements-ci.txt b/requirements-ci.txt
index a43e0c4..002d11d 100644
--- a/requirements-ci.txt
+++ b/requirements-ci.txt
@@ -1,12 +1,13 @@
 cython
 numpy
-h5py
+h5py >= 1.3.0
 jinja2
 networkx
 jsonschema
 xmlbuilder
 functools32
 pyxb
+pyBigWig
 # Install from github
 -e git://github.com/PacificBiosciences/pbcore.git@master#egg=pbcore
 -e git://github.com/PacificBiosciences/pbcommand.git#egg=pbcommand
diff --git a/requirements-dev.txt b/requirements-dev.txt
index 80e9144..0af0380 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -1,3 +1,2 @@
--r requirements.txt
 sphinx
 nose
diff --git a/setup.py b/setup.py
index bc1f56b..e1eb4bf 100755
--- a/setup.py
+++ b/setup.py
@@ -4,12 +4,11 @@ import sys
 
 setup(
     name='kineticsTools',
-    version='0.5.2',
+    version='0.6.0',
     author='Pacific Biosciences',
     author_email='devnet at pacificbiosciences.com',
     license=open('LICENSES.txt').read(),
-    packages=find_packages('.'),
-    package_dir={'': '.'},
+    packages=find_packages("."),
     package_data={'kineticsTools': ['resources/*.h5']},
     ext_modules=[Extension('kineticsTools/tree_predict', ['kineticsTools/tree_predict.c'],
                            extra_compile_args=["-O3", "-shared", "-std=c99"],
@@ -21,6 +20,7 @@ setup(
         'h5py >= 1.3.0',
         'scipy >= 0.9.0',
         'pbcommand >= 0.3.22',
+        #'pyBigWig'
     ],
     entry_points={'console_scripts': [
         "ipdSummary = kineticsTools.ipdSummary:main",
diff --git a/test/cram/extra/bigwig.t b/test/cram/extra/bigwig.t
new file mode 100644
index 0000000..5742c29
--- /dev/null
+++ b/test/cram/extra/bigwig.t
@@ -0,0 +1,15 @@
+Test support for BigWig output of IpdRatio.
+
+  $ . $TESTDIR/../portability.sh
+
+Load in data:
+
+  $ DATA=/pbi/dept/secondary/siv/testdata/kineticsTools
+  $ INPUT=$DATA/Hpyl_1_5000.xml
+  $ REFERENCE=/pbi/dept/secondary/siv/references/Helicobacter_pylori_J99/sequence/Helicobacter_pylori_J99.fasta
+
+Run basic ipdSummary:
+
+  $ ipdSummary --log-level=WARNING --bigwig tmp1.bw --numWorkers 12 --pvalue 0.001 --identify m6A,m4C --reference $REFERENCE --referenceWindows="gi|12057207|gb|AE001439.1|:0-5000" $INPUT
+  $ ls tmp1.bw
+  tmp1.bw
diff --git a/test/cram/version.t b/test/cram/version.t
index ef00278..91ce3dd 100644
--- a/test/cram/version.t
+++ b/test/cram/version.t
@@ -9,14 +9,15 @@ A simple test of the version and help options:
                     [--log-file LOG_FILE]
                     [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL} | --debug | --quiet | -v]
                     --reference REFERENCE [--gff GFF] [--csv CSV]
-                    [--numWorkers NUMWORKERS] [--pvalue PVALUE]
-                    [--maxLength MAXLENGTH] [--identify IDENTIFY]
-                    [--methylFraction] [--outfile OUTFILE] [--m5Cgff M5CGFF]
-                    [--m5Cclassifer M5CCLASSIFIER] [--csv_h5 CSV_H5]
-                    [--pickle PICKLE] [--summary_h5 SUMMARY_H5]
-                    [--ms_csv MS_CSV] [--control CONTROL] [--useLDA]
-                    [--paramsPath PARAMSPATH] [--minCoverage MINCOVERAGE]
-                    [--maxQueueSize MAXQUEUESIZE] [--maxCoverage MAXCOVERAGE]
+                    [--bigwig BIGWIG] [--numWorkers NUMWORKERS]
+                    [--pvalue PVALUE] [--maxLength MAXLENGTH]
+                    [--identify IDENTIFY] [--methylFraction] [--outfile OUTFILE]
+                    [--m5Cgff M5CGFF] [--m5Cclassifer M5CCLASSIFIER]
+                    [--csv_h5 CSV_H5] [--pickle PICKLE]
+                    [--summary_h5 SUMMARY_H5] [--ms_csv MS_CSV]
+                    [--control CONTROL] [--useLDA] [--paramsPath PARAMSPATH]
+                    [--minCoverage MINCOVERAGE] [--maxQueueSize MAXQUEUESIZE]
+                    [--maxCoverage MAXCOVERAGE]
                     [--mapQvThreshold MAPQVTHRESHOLD] [--ipdModel IPDMODEL]
                     [--modelIters MODELITERS] [--cap_percentile CAP_PERCENTILE]
                     [--methylMinCov METHYLMINCOV]
@@ -27,7 +28,7 @@ A simple test of the version and help options:
                     [-W REFERENCEWINDOWSASSTRING]
                     [--skipUnrecognizedContigs SKIPUNRECOGNIZEDCONTIGS]
                     [--alignmentSetRefWindows] [--threaded] [--profile] [--pdb]
-                    [--seed RANDOMSEED]
+                    [--seed RANDOMSEED] [--referenceStride REFERENCESTRIDE]
                     alignment_set
   ipdSummary: error: too few arguments
   [2]
diff --git a/test/test_gather_h5.py b/test/test_gather_h5.py
new file mode 100644
index 0000000..954fa26
--- /dev/null
+++ b/test/test_gather_h5.py
@@ -0,0 +1,77 @@
+
+import unittest
+import tempfile
+
+import numpy as np
+import h5py
+
+import pbcommand.testkit.core
+from pbcommand.models import PipelineChunk
+from pbcommand.pb_io.common import write_pipeline_chunks
+
+from kineticsTools.tasks.gather_kinetics_h5 import gather_kinetics_h5
+
+class SetUpHDF5(object):
+    DATALENGTH = 10000
+    CHUNKSIZE = 2048
+    CHUNKED_FILES = [
+        tempfile.NamedTemporaryFile(suffix=".h5").name,
+        tempfile.NamedTemporaryFile(suffix=".h5").name
+    ]
+
+    @classmethod
+    def makeInputs(cls):
+        for k, ifn in enumerate(cls.CHUNKED_FILES):
+            f = h5py.File(cls.CHUNKED_FILES[k], "w")
+            a = f.create_dataset("base", (cls.DATALENGTH,),
+                                 dtype="a1", compression="gzip",
+                                 chunks=(cls.CHUNKSIZE,), compression_opts=2)
+            b = f.create_dataset("score", (cls.DATALENGTH,),
+                                 dtype="u4", compression="gzip",
+                                 chunks=(cls.CHUNKSIZE,), compression_opts=2)
+            c = f.create_dataset("tMean", (cls.DATALENGTH,),
+                                 dtype="f4", compression="gzip",
+                                 chunks=(cls.CHUNKSIZE,), compression_opts=2)
+            start = k * (cls.DATALENGTH / 2)
+            end = (k + 1) * (cls.DATALENGTH / 2)
+            a[start:end] = np.array(['A' for x in range(start, end)])
+            b[start:end] = np.array([2*(k+1) for x in range(start, end)])
+            c[start:end] = np.sqrt(np.array(range(start+1, end+1)))
+            f.close()
+
+
+class TestGatherHDF5(unittest.TestCase, SetUpHDF5):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.makeInputs()
+
+    def test_gather_kinetics_h5(self):
+        ofn = tempfile.NamedTemporaryFile(suffix=".h5").name
+        gather_kinetics_h5(self.CHUNKED_FILES, ofn)
+        f = h5py.File(ofn)
+        self.assertTrue(all(f['base'].__array__() == 'A'))
+        self.assertTrue(all(f['score'].__array__() > 0))
+        self.assertEqual(f['score'].__array__().mean(), 3)
+        self.assertTrue(all(f['tMean'].__array__() > 0))
+        d = np.round(f['tMean'].__array__() ** 2).astype("u4")
+        self.assertTrue(all(d == np.array(range(1, self.DATALENGTH+1))))
+
+
+class TestGatherH5ToolContract(pbcommand.testkit.core.PbTestGatherApp, SetUpHDF5):
+    DRIVER_BASE = "python -m kineticsTools.tasks.gather_kinetics_h5 "
+    INPUT_FILES = [tempfile.NamedTemporaryFile(suffix=".json").name]
+    CHUNK_KEY = "$chunk.h5_id"
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestGatherH5ToolContract, cls).setUpClass()
+        cls.makeInputs()
+        chunks = [PipelineChunk(chunk_id="chunk_data_{i}".format(i=i),
+                                **({cls.CHUNK_KEY:fn}))
+                  for i, fn in enumerate(cls.CHUNKED_FILES)]
+        write_pipeline_chunks(chunks, cls.INPUT_FILES[0], None)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_outputs.py b/test/test_outputs.py
new file mode 100644
index 0000000..03430de
--- /dev/null
+++ b/test/test_outputs.py
@@ -0,0 +1,105 @@
+
+"""
+Test sanity of various output formats for a minimal real-world example.
+"""
+
+import subprocess
+import tempfile
+import unittest
+import os.path
+import csv
+import re
+
+import h5py
+
+
+os.environ["PACBIO_TEST_ENV"] = "1" # turns off --verbose
+
+DATA_DIR = "/pbi/dept/secondary/siv/testdata/kineticsTools"
+REF_DIR = "/pbi/dept/secondary/siv/references/Helicobacter_pylori_J99"
+ALIGNMENTS = os.path.join(DATA_DIR, "Hpyl_1_5000.xml")
+REFERENCE = os.path.join(REF_DIR, "sequence", "Helicobacter_pylori_J99.fasta")
+
+skip_if_no_data = unittest.skipUnless(
+    os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
+    "%s or %s not available" % (DATA_DIR, REF_DIR))
+
+ at skip_if_no_data
+class TestOutputs(unittest.TestCase):
+
+    @classmethod
+    @skip_if_no_data
+    def setUpClass(cls):
+        prefix = tempfile.NamedTemporaryFile().name
+        cls.h5_file = "{p}.h5".format(p=prefix)
+        cls.csv_file = "{p}.csv".format(p=prefix)
+        cls.gff_file = "{p}.gff".format(p=prefix)
+        cls.bw_file = "{p}.bw".format(p=prefix)
+        args = [
+            "ipdSummary", "--log-level", "WARNING",
+            "--csv_h5", cls.h5_file,
+            "--csv", cls.csv_file,
+            "--gff", cls.gff_file,
+            "--bigwig", cls.bw_file,
+            "--numWorkers", "12",
+            "--pvalue", "0.001",
+            "--identify", "m6A,m4C",
+            "--referenceStride", "100",
+            "--referenceWindows", "gi|12057207|gb|AE001439.1|:0-200",
+            "--reference", REFERENCE,
+            ALIGNMENTS
+        ]
+        print " ".join(args)
+        assert subprocess.call(args) == 0
+        with open(cls.csv_file) as f:
+            cls.csv_records = [l.split(",") for l in f.read().splitlines()][1:]
+
+    @classmethod
+    def tearDownClass(cls):
+        return
+        for fn in [cls.h5_file, cls.csv_file, cls.gff_file, cls.bw_file]:
+            if os.path.exists(fn):
+                os.remove(fn)
+
+    def test_h5_output(self):
+        f = h5py.File(self.h5_file)
+        bases = f['base'].__array__()
+        self.assertEqual(bases[0], "A")
+        self.assertEqual(bases[100], "T")
+        self.assertEqual(list(bases[0:400] != "").count(True), 400)
+        seqh_fwd = ''.join([f['base'][x*2] for x in range(200)])
+        seqc_fwd = ''.join([self.csv_records[x*2][3] for x in range(200)])
+        self.assertEqual(seqh_fwd, seqc_fwd)
+        seqh_rev = ''.join([f['base'][(x*2)+1] for x in range(200)])
+        seqc_rev = ''.join([self.csv_records[(x*2)+1][3] for x in range(200)])
+        self.assertEqual(seqh_rev, seqc_rev)
+        tpl_fwd = [f['tpl'][x*2] for x in range(200)]
+        self.assertEqual(tpl_fwd, range(1, 201))
+        f = h5py.File(self.h5_file)
+        for i_rec, rec in enumerate(self.csv_records):
+            self.assertEqual(str(f['tpl'][i_rec]), self.csv_records[i_rec][1])
+            self.assertEqual("%.3f"%f['ipdRatio'][i_rec],
+                             self.csv_records[i_rec][8])
+
+    def test_csv_output(self):
+        self.assertEqual(len(self.csv_records), 400)
+        self.assertEqual(self.csv_records[0][3], "A")
+        self.assertEqual(self.csv_records[100][3], "T")
+
+    def test_bigwig(self):
+        import pyBigWig
+        f = pyBigWig.open(self.bw_file)
+        for i_rec, rec in enumerate(self.csv_records):
+            seqid = re.sub('\"', "", rec[0])
+            tpl = int(rec[1]) - 1
+            s = int(f.values(seqid, tpl, tpl+1)[0])
+            ipd_minus = (s % 65536) / 100.0
+            ipd_plus = (s >> 16) / 100.0
+            if rec[2] == "1":
+                self.assertAlmostEqual(ipd_minus, float(rec[8]), places=1)
+            else:
+                self.assertAlmostEqual(ipd_plus, float(rec[8]), places=1)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_tool_contract.py b/test/test_tool_contract.py
index 5056b24..023d585 100755
--- a/test/test_tool_contract.py
+++ b/test/test_tool_contract.py
@@ -9,6 +9,8 @@ import logging
 import os.path
 import csv
 
+import h5py
+
 import pbcommand.testkit
 
 os.environ["PACBIO_TEST_ENV"] = "1" # turns off --verbose
@@ -29,6 +31,7 @@ gi|12057207|gb|AE001439.1|\tkinModCall\tm6A\t35\t35\t187\t-\t.\tcoverage=118;con
 gi|12057207|gb|AE001439.1|\tkinModCall\tm4C\t60\t60\t49\t-\t.\tcoverage=112;context=AAAAAGCTCGCTCAAAAACCCTTGATTTAAGGGCGTTTTAT;IPDRatio=2.58;identificationQv=33
 gi|12057207|gb|AE001439.1|\tkinModCall\tm6A\t89\t89\t223\t+\t.\tcoverage=139;context=AGCGAGCTTTTTGCTCAAAGAATCCAAGATAGCGTTTAAAA;IPDRatio=5.69;identificationQv=187"""
 
+# FIXME this is much too slow
 @unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
     "%s or %s not available" % (DATA_DIR, REF_DIR))
 class TestIpdSummary(pbcommand.testkit.PbTestApp):
@@ -49,27 +52,29 @@ class TestIpdSummary(pbcommand.testkit.PbTestApp):
 
     def run_after(self, rtc, output_dir):
         gff_file = os.path.join(output_dir, rtc.task.output_files[0])
-        csv_file = os.path.join(output_dir, rtc.task.output_files[1])
         def lc(fn): return len(open(fn).readlines())
         self.assertEqual(lc(gff_file), Constants.N_LINES_GFF)
-        self.assertEqual(lc(csv_file), Constants.N_LINES_CSV)
-        def head(fn,n): return "\n".join( open(fn).read().splitlines()[0:n] )
-        self.assertEqual(head(csv_file, 3), Constants.INITIAL_LINES_CSV)
         def head2(fn,n):
             out = []
-            i = 0
             for line in open(fn).read().splitlines():
                 if line[0] != '#':
                     out.append(line)
-                    i += 1
-                if i == n:
+                if len(out) == n:
                     break
             return "\n".join(out)
         self.assertEqual(head2(gff_file, 3), Constants.INITIAL_LINES_GFF)
-
-
- at unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
-    "%s or %s not available" % (DATA_DIR, REF_DIR))
+        f = h5py.File(rtc.task.output_files[2])
+        seqh_fwd = ''.join([f['base'][x*2] for x in range(5000)])
+        seqh_rev = ''.join([f['base'][(x*2)+1] for x in range(5000)])
+        self.assertEqual(len(seqh_fwd), 5000)
+        self.assertEqual(len(seqh_rev), 5000)
+
+
+# FIXME this is covered by integration tests but the output is twitchy enough
+# that it would be good to test here as well
+#@unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
+#    "%s or %s not available" % (DATA_DIR, REF_DIR))
+ at unittest.skip("FIXME")
 class TestIpdSummaryChunk(TestIpdSummary):
     """
     This test is identical to the above except for using an AlignmentSet with
@@ -84,7 +89,6 @@ class TestIpdSummaryChunk(TestIpdSummary):
 
     def run_after(self, rtc, output_dir):
         gff_file = os.path.join(output_dir, rtc.task.output_files[0])
-        csv_file = os.path.join(output_dir, rtc.task.output_files[1])
         logging.critical(gff_file)
         logging.critical("%s %s" % (csv_file, os.path.getsize(csv_file)))
         with open(csv_file) as f:

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/kineticstools.git



More information about the debian-med-commit mailing list