[med-svn] [kineticstools] 01/05: Imported Upstream version 0.6.0+dfsg
Afif Elghraoui
afif at moszumanska.debian.org
Sun Dec 18 08:04:18 UTC 2016
This is an automated email from the git hooks/post-receive script.
afif pushed a commit to branch master
in repository kineticstools.
commit 9b27a33cc63f195f78ad11e87de794f861bf6ce9
Author: Afif Elghraoui <afif at ghraoui.name>
Date: Mon Aug 22 23:11:01 2016 -0700
Imported Upstream version 0.6.0+dfsg
---
README.md | 3 +-
bin/__init__.py | 29 ----
circle.yml | 3 +-
kineticsTools/ReferenceUtils.py | 11 +-
kineticsTools/ResultWriter.py | 241 ++++++++++++++++++++----------
kineticsTools/ipdSummary.py | 91 ++++++-----
kineticsTools/summarizeModifications.py | 2 +-
kineticsTools/tasks/__init__.py | 0
kineticsTools/tasks/gather_kinetics_h5.py | 172 +++++++++++++++++++++
requirements-ci.txt | 3 +-
requirements-dev.txt | 1 -
setup.py | 6 +-
test/cram/extra/bigwig.t | 15 ++
test/cram/version.t | 19 +--
test/test_gather_h5.py | 77 ++++++++++
test/test_outputs.py | 105 +++++++++++++
test/test_tool_contract.py | 28 ++--
17 files changed, 624 insertions(+), 182 deletions(-)
diff --git a/README.md b/README.md
index 1e95e30..ec8a88b 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,4 @@
-kineticsTools
-=============
+ kineticsTools [![Circle CI](https://circleci.com/gh/PacificBiosciences/kineticsTools.svg?style=svg)](https://circleci.com/gh/PacificBiosciences/kineticsTools)
Tools for detecting DNA modifications from single molecule, real-time (SMRT®) sequencing data. This tool implements the P_ModificationDetection module in SMRT® Portal, used by the RS_Modification_Detection and RS_Modifications_and_Motif_Detection protocol. Researchers interested in understanding or extending the modification detection algorthims can use these tools as a starting point.
diff --git a/bin/__init__.py b/bin/__init__.py
deleted file mode 100755
index 402d7a8..0000000
--- a/bin/__init__.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#################################################################################
-# Copyright (c) 2011-2013, Pacific Biosciences of California, Inc.
-#
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# * Neither the name of Pacific Biosciences nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
-# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY PACIFIC BIOSCIENCES AND ITS
-# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL PACIFIC BIOSCIENCES OR
-# ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
-# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-#################################################################################
diff --git a/circle.yml b/circle.yml
index a1e6411..01a3209 100644
--- a/circle.yml
+++ b/circle.yml
@@ -12,7 +12,8 @@ dependencies:
- (cd .circleci && bash installHDF5.sh)
- HDF5_DIR=$PWD/.circleci/prefix pip install -r requirements-ci.txt
- HDF5_DIR=$PWD/.circleci/prefix pip install -r requirements-dev.txt
+ - pip install cram nose
test:
override:
- - make test # Run doctests in addition to the usual unit tests
+ - make unit-tests
diff --git a/kineticsTools/ReferenceUtils.py b/kineticsTools/ReferenceUtils.py
index a48ca8d..e63758c 100755
--- a/kineticsTools/ReferenceUtils.py
+++ b/kineticsTools/ReferenceUtils.py
@@ -46,7 +46,7 @@ ReferenceWindow = namedtuple("ReferenceWindow", ["refId", "refName", "start", "e
class ReferenceUtils():
@staticmethod
- def loadReferenceContigs(referencePath, alignmentSet):
+ def loadReferenceContigs(referencePath, alignmentSet, windows=None):
# FIXME we should get rid of this entirely, but I think it requires
# fixing the inconsistency in how contigs are referenced here versus in
# pbcore.io
@@ -58,7 +58,14 @@ class ReferenceUtils():
# Read contigs from FASTA file (or XML dataset)
refReader = ReferenceSet(referencePath)
- contigs = [x for x in refReader]
+ contigs = []
+ if windows is not None:
+ refNames = set([rw.refName for rw in windows])
+ for contig in refReader:
+ if contig.id in refNames:
+ contigs.append(contig)
+ else:
+ contigs.extend([x for x in refReader])
contigDict = dict([(x.id, x) for x in contigs])
# initially each contig has an id of None -- this will be overwritten with the id from the cmp.h5, if there are any
diff --git a/kineticsTools/ResultWriter.py b/kineticsTools/ResultWriter.py
index d5e74e2..0671fcd 100755
--- a/kineticsTools/ResultWriter.py
+++ b/kineticsTools/ResultWriter.py
@@ -28,6 +28,7 @@
# POSSIBILITY OF SUCH DAMAGE.
#################################################################################
+from collections import namedtuple, defaultdict
import cProfile
import logging
import os.path
@@ -47,6 +48,8 @@ FRAC = 'frac'
FRAClow = 'fracLow'
FRACup = 'fracUp'
+log = logging.getLogger(__name__)
+
class ResultCollectorProcess(Process):
@@ -61,7 +64,7 @@ class ResultCollectorProcess(Process):
self._resultsQueue = resultsQueue
def _run(self):
- logging.info("Process %s (PID=%d) started running" % (self.name, self.pid))
+ log.info("Process %s (PID=%d) started running" % (self.name, self.pid))
self.onStart()
@@ -94,7 +97,7 @@ class ResultCollectorProcess(Process):
nextChunkId += 1
- logging.info("Result thread shutting down...")
+ log.info("Result thread shutting down...")
self.onFinish()
def run(self):
@@ -192,23 +195,6 @@ class KineticsWriter(ResultCollectorProcess):
except Exception as e:
print e
- @consumer
- def hdf5CsvConsumer(self, filename):
-
- grp = h5py.File(filename, "w")
-
- y = [int(ref.Length) for ref in self.refInfo]
- dataLength = sum(y)
- y.append(8192)
- chunkSize = min(dataLength, 8192 * 2)
- # print "dataLength = ", dataLength, " chunkSize = ", chunkSize, " y = ", y
-
- refIdDataset = grp.create_dataset('refId', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,), compression_opts=2)
- tplDataset = grp.create_dataset('tpl', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,), compression_opts=2)
- strandDataset = grp.create_dataset('strand', (dataLength,), dtype="u1", compression="gzip", chunks=(chunkSize,), compression_opts=2)
-
-
-
@consumer
def csvConsumer(self, filename):
@@ -293,12 +279,85 @@ class KineticsWriter(ResultCollectorProcess):
print e
@consumer
- def hdf5CsvConsumer(self, filename):
+ def bigWigConsumer(self, filename):
+ import pyBigWig
+ records = []
+ records_by_pos = defaultdict(list)
+ ranges = {}
+ BaseInfo = namedtuple("BaseInfo", ("seqid", "pos", "sense", "ipd"))
+ try:
+ while True:
+ chunk = (yield)
+ if len(chunk) == 0:
+ continue
+ # Fill out the ipd observations into the dataset
+ for x in chunk:
+ pos = int(x['tpl']) + 1
+ seqid = x['refName']
+ ranges.setdefault(seqid, (sys.maxint, 0))
+ ranges[seqid] = (min(ranges[seqid][0], pos),
+ max(ranges[seqid][1], pos+1))
+ rec = BaseInfo(
+ seqid=seqid,
+ pos=pos,
+ sense=int(x['strand']),
+ ipd=float(x['ipdRatio']))
+ records.append(rec)
+ records_by_pos[(rec.seqid, rec.pos)].append(rec)
+ except GeneratorExit:
+ records.sort(lambda a, b: cmp(a.pos, b.pos))
+ records.sort(lambda a, b: cmp(a.seqid, b.seqid))
+ regions = [(s, ranges[s][1]-1) for s in sorted(ranges.keys())]
+ if len(regions) == 0:
+ with open(filename, "wb") as _:
+ return
+ bw = pyBigWig.open(filename, "w")
+ bw.addHeader(regions)
+ k = 0
+ seqids = []
+ starts = []
+ ends = []
+ ipd_enc = []
+ # records are not necessarily consecutive or two per base!
+ have_pos = set()
+ def encode_ipds(plus, minus):
+ enc = lambda x: min(65535, int(round(100*x)))
+ return float(enc(minus) + 65536*enc(plus))
+ for rec in records:
+ if (rec.seqid, rec.pos) in have_pos:
+ continue
+ have_pos.add((rec.seqid, rec.pos))
+ strand_records = records_by_pos[(rec.seqid, rec.pos)]
+ if len(strand_records) == 2:
+ rec_minus = strand_records[k] if strand_records[k].sense else strand_records[k + 1]
+ rec_plus = strand_records[k + 1] if strand_records[k].sense else strand_records[k]
+ assert rec_plus.pos == rec_minus.pos, (rec_plus, rec_minus)
+ seqids.append(rec_plus.seqid)
+ starts.append(rec_plus.pos-1)
+ ends.append(rec_plus.pos)
+ ipd_enc.append(encode_ipds(rec_plus.ipd, rec_minus.ipd))
+ else:
+ seqids.append(rec.seqid)
+ starts.append(rec.pos-1)
+ ends.append(rec.pos)
+ if rec.sense == 0:
+ ipd_enc.append(encode_ipds(rec.ipd, 0))
+ else:
+ ipd_enc.append(encode_ipds(0, rec.ipd))
+ log.info("Writing records for {n} bases".format(n=len(seqids)))
+ bw.addEntries(seqids, starts, ends=ends, values=ipd_enc)
+ bw.close()
+ return
+ @consumer
+ def hdf5CsvConsumer(self, filename):
+ """
+ Write records to an HDF5 file equivalent to the CSV output, with +/-
+ bases adjacent in the arrays.
+ """
grp = h5py.File(filename, "w")
-
y = [int(ref.Length) for ref in self.refInfo]
- dataLength = sum(y)
+ dataLength = 2 * sum(y)
y.append(8192)
chunkSize = min(dataLength, 8192 * 2)
# print "dataLength = ", dataLength, " chunkSize = ", chunkSize, " y = ", y
@@ -345,10 +404,9 @@ class KineticsWriter(ResultCollectorProcess):
fracUpDataset = grp[FRACup]
'''
- start = min(x['tpl'] for x in chunk)
- end = min(max(x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
-
- arrLen = end - start + 1
+ start = min(2*x['tpl'] for x in chunk)
+ end = min(max(2*x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
+ arrLen = end - start + 2
refId = np.empty(arrLen, dtype="u4")
tpl = np.zeros(arrLen, dtype="u4")
@@ -368,13 +426,15 @@ class KineticsWriter(ResultCollectorProcess):
# Fill out the ipd observations into the dataset
for x in chunk:
# offset into the current chunk
- idx = x['tpl'] - start
+ _strand = int(x['strand'])
+ idx = (2 * x['tpl']) - start + _strand
- # Data points past the end of the reference can make it through -- filter them out here
+ # Data points past the end of the reference can make it
+ # through -- filter them out here
if idx < arrLen:
refId[idx] = int(x['refId'])
- tpl[idx] = int(x['tpl'])
- strand[idx] = int(x['strand'])
+ tpl[idx] = int(x['tpl']) + 1 # 'tpl' is 0-based
+ strand[idx] = _strand
base[idx] = x['base']
score[idx] = int(x['score'])
tMean[idx] = float(x['tMean'])
@@ -392,31 +452,41 @@ class KineticsWriter(ResultCollectorProcess):
fracLow[idx] = np.nan
fracUp[idx] = np.nan
- refIdDataset[start:(end + 1)] = refId
- tplDataset[start:(end + 1)] = tpl
- strandDataset[start:(end + 1)] = strand
- baseDataset[start:(end + 1)] = base
- scoreDataset[start:(end + 1)] = score
- tMeanDataset[start:(end + 1)] = tMean
- tErrDataset[start:(end + 1)] = tErr
- modelPredictionDataset[start:(end + 1)] = modelPrediction
- ipdRatioDataset[start:(end + 1)] = ipdRatio
- coverageDataset[start:(end + 1)] = coverage
+ refIdDataset[start:(end + 2)] = refId
+ tplDataset[start:(end + 2)] = tpl
+ strandDataset[start:(end + 2)] = strand
+ baseDataset[start:(end + 2)] = base
+ scoreDataset[start:(end + 2)] = score
+ tMeanDataset[start:(end + 2)] = tMean
+ tErrDataset[start:(end + 2)] = tErr
+ modelPredictionDataset[start:(end + 2)] = modelPrediction
+ ipdRatioDataset[start:(end + 2)] = ipdRatio
+ coverageDataset[start:(end + 2)] = coverage
if self.options.methylFraction:
- fracDataset[start:(end + 1)] = frac
- fracLowDataset[start:(end + 1)] = fracLow
- fracUpDataset[start:(end + 1)] = fracUp
-
+ fracDataset[start:(end + 2)] = frac
+ fracLowDataset[start:(end + 2)] = fracLow
+ fracUpDataset[start:(end + 2)] = fracUp
+ grp.flush()
+ del refId
+ del tpl
+ del strand
+ del base
+ del score
+ del tMean
+ del tErr
+ del modelPrediction
+ del ipdRatio
+ del coverage
except GeneratorExit:
# Close down the h5 file
grp.close()
return
- # an alternative version that collects data into groups according to reference:
@consumer
def alt_hdf5CsvConsumer(self, filename):
"""
- Similar to csv consumer but writing to hdf5 format.
+ Alternative HDF5 output that collects data into groups according to
+ reference (not currently enabled).
"""
f = h5py.File(filename, "w")
@@ -424,25 +494,26 @@ class KineticsWriter(ResultCollectorProcess):
for ref in self.refInfo:
# Each reference group will house a collection of datasets:
- chunkSize = min(ref.Length, 8192)
+ dataLength = ref.Length * 2
+ chunkSize = min(dataLength, 8192)
# Create a group for each reference:
grp = f.create_group(str(ref.Name))
- ds = grp.create_dataset('tpl', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('strand', (ref.Length,), dtype="u1", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('base', (ref.Length,), dtype="a1", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('score', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('tMean', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('tErr', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('modelPrediction', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('ipdRatio', (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset('coverage', (ref.Length,), dtype="u4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('tpl', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('strand', (dataLength,), dtype="u1", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('base', (dataLength,), dtype="a1", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('score', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('tMean', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('tErr', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('modelPrediction', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('ipdRatio', (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset('coverage', (dataLength,), dtype="u4", compression="gzip", chunks=(chunkSize,))
if self.options.methylFraction:
- ds = grp.create_dataset(FRAC, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset(FRAClow, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
- ds = grp.create_dataset(FRACup, (ref.Length,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset(FRAC, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset(FRAClow, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
+ ds = grp.create_dataset(FRACup, (dataLength,), dtype="f4", compression="gzip", chunks=(chunkSize,))
# Maintain a dictionary of group paths?
dsDict[ref.ID] = grp
@@ -473,10 +544,10 @@ class KineticsWriter(ResultCollectorProcess):
fracLowDataset = grp[FRAClow]
fracUpDataset = grp[FRACup]
- start = min(x['tpl'] for x in chunk)
- end = min(max(x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
+ start = min(2*x['tpl'] for x in chunk)
+ end = min(max(2*x['tpl'] for x in chunk), tplDataset.shape[0] - 1)
- arrLen = end - start + 1
+ arrLen = end - start + 2
tpl = np.zeros(arrLen, dtype="u4")
strand = np.zeros(arrLen, dtype="u1")
@@ -495,11 +566,12 @@ class KineticsWriter(ResultCollectorProcess):
# Fill out the ipd observations into the dataset
for x in chunk:
# offset into the current chunk
- idx = x['tpl'] - start
+ _strand = int(x['strand'])
+ idx = (2 * x['tpl']) - start + _strand
# Data points past the end of the reference can make it through -- filter them out here
if idx < arrLen:
- tpl[idx] += int(x['tpl'])
+ tpl[idx] += int(x['tpl']) + 1
strand[idx] += int(x['strand'])
base[idx] = x['base']
score[idx] += int(x['score'])
@@ -519,20 +591,20 @@ class KineticsWriter(ResultCollectorProcess):
fracUp[idx] = np.nan
# Write our chunk into the main dataset
- tplDataset[start:(end + 1)] = tpl
- strandDataset[start:(end + 1)] = strand
- baseDataset[start:(end + 1)] = base
- scoreDataset[start:(end + 1)] = score
- tMeanDataset[start:(end + 1)] = tMean
- tErrDataset[start:(end + 1)] = tErr
- modelPredictionDataset[start:(end + 1)] = modelPrediction
- ipdRatioDataset[start:(end + 1)] = ipdRatio
- coverageDataset[start:(end + 1)] = coverage
+ tplDataset[start:(end + 2)] = tpl
+ strandDataset[start:(end + 2)] = strand
+ baseDataset[start:(end + 2)] = base
+ scoreDataset[start:(end + 2)] = score
+ tMeanDataset[start:(end + 2)] = tMean
+ tErrDataset[start:(end + 2)] = tErr
+ modelPredictionDataset[start:(end + 2)] = modelPrediction
+ ipdRatioDataset[start:(end + 2)] = ipdRatio
+ coverageDataset[start:(end + 2)] = coverage
if self.options.methylFraction:
- fracDataset[start:(end + 1)] = frac
- fracLowDataset[start:(end + 1)] = fracLow
- fracUpDataset[start:(end + 1)] = fracUp
-
+ fracDataset[start:(end + 2)] = frac
+ fracLowDataset[start:(end + 2)] = fracLow
+ fracUpDataset[start:(end + 2)] = fracUp
+ f.flush()
except GeneratorExit:
# Close down the h5 file
f.close()
@@ -559,7 +631,7 @@ class KineticsWriter(ResultCollectorProcess):
for ref in self.refInfo:
# FIXME -- create with good chunk parameters, activate compression
- logging.info("Creating IpdRatio dataset w/ name: %s, Size: %d" % (str(ref.Name), ref.Length))
+ log.info("Creating IpdRatio dataset w/ name: %s, Size: %d" % (str(ref.Name), ref.Length))
chunkSize = min(ref.Length, 8192)
@@ -835,10 +907,11 @@ class KineticsWriter(ResultCollectorProcess):
('m5Cgff', 'm5C.gff', self.m5CgffConsumer),
('gff', 'gff', self.gffConsumer),
('csv', 'csv', self.csvConsumer),
+ ('bigwig', 'bw', self.bigWigConsumer),
('ms_csv', 'ms.csv', self.msCsvConsumer),
('pickle', 'pickle', self.csvConsumer),
('summary_h5', 'summary.h5', self.ipdRatioH5Consumer),
- ('csv_h5', 'h5', self.hdf5CsvConsumer)
+ ('csv_h5', 'h5', self.alt_hdf5CsvConsumer)
]
sinkList = []
@@ -850,7 +923,15 @@ class KineticsWriter(ResultCollectorProcess):
# The 'outfile argument causes all outputs to be generated
if self.options.outfile:
- name = self.options.outfile + '.' + ext
+ if ext == "bw":
+ try:
+ import pyBigWig
+ except ImportError:
+ pass
+ else:
+ name = self.options.outfile + '.' + ext
+ else:
+ name = self.options.outfile + '.' + ext
# Individual outputs can specified - these filename override the default
if self.options.__getattribute__(fileType):
diff --git a/kineticsTools/ipdSummary.py b/kineticsTools/ipdSummary.py
index 5c52c54..6bfd088 100755
--- a/kineticsTools/ipdSummary.py
+++ b/kineticsTools/ipdSummary.py
@@ -61,12 +61,9 @@ from kineticsTools.ResultWriter import KineticsWriter
from kineticsTools.ipdModel import IpdModel
from kineticsTools.ReferenceUtils import ReferenceUtils
-# Version info
-__p4revision__ = "$Revision: #1 $"
-__p4change__ = "$Change: 100972 $"
-revNum = int(__p4revision__.strip("$").split(" ")[1].strip("#"))
-changeNum = int(__p4change__.strip("$").split(":")[-1])
-__version__ = "2.2"
+__version__ = "2.3"
+
+log = logging.getLogger(__name__)
class Constants(object):
TOOL_ID = "kinetics_tools.tasks.ipd_summary"
@@ -129,17 +126,23 @@ def get_parser():
type=validateFile, help="Fasta or Reference DataSet")
# XXX GFF and CSV are "option" for arg parser, not tool contract
tcp.add_output_file_type(FileTypes.GFF, "gff",
- name="GFF file",
+ name="Modified bases GFF",
description="GFF file of modified bases",
default_name="basemods")
- tcp.add_output_file_type(FileTypes.CSV, "csv",
- name="CSV file",
- description="CSV file of per-nucleotide information",
+ tcp.add_output_file_type(FileTypes.BIGWIG, "bigwig",
+ name="BigWig file encoding base IpdRatios",
+ description="Compressed binary format containing the IpdRatios for every base (both strands)",
+ default_name="basemods")
+ tcp.add_output_file_type(FileTypes.H5, "csv_h5",
+ name="HDF5 file containing per-base information",
+ description="HDF5 equivalent of CSV output",
default_name="basemods")
argp.add_argument("--gff", action="store", default=None,
help="Output GFF file of modified bases")
argp.add_argument("--csv", action="store", default=None,
help="Output CSV file out per-nucleotide information")
+ argp.add_argument("--bigwig", action="store", default=None,
+ help="Output BigWig file encoding IpdRatio for both strands")
# FIXME use central --nproc option
argp.add_argument('--numWorkers', '-j',
dest='numWorkers',
@@ -372,13 +375,18 @@ def _get_more_options(parser):
default=None,
help="Random seed (for development and debugging purposes only)")
+ parser.add_argument("--referenceStride", action="store", type=int,
+ default=1000,
+ help="Size of reference window in internal "+
+ "parallelization. For testing purposes only.")
+
return parser
class KineticsToolsRunner(object):
def __init__(self, args):
self.args = args
- self._sharedAlignmentSet = None
+ self.alignments = None
def start(self):
self.validateArgs()
@@ -508,7 +516,7 @@ class KineticsToolsRunner(object):
for i in xrange(self.options.numWorkers):
p = WorkerType(self.options, self._workQueue, self._resultsQueue,
self.ipdModel,
- sharedAlignmentSet=self._sharedAlignmentSet)
+ sharedAlignmentSet=self.alignments)
self._workers.append(p)
p.start()
logging.info("Launched worker processes.")
@@ -532,11 +540,11 @@ class KineticsToolsRunner(object):
pass
def loadReferenceAndModel(self, referencePath):
- assert self._sharedAlignmentSet is not None
+ assert self.alignments is not None and self.referenceWindows is not None
# Load the reference contigs - annotated with their refID from the cmp.h5
logging.info("Loading reference contigs %s" % referencePath)
contigs = ReferenceUtils.loadReferenceContigs(referencePath,
- alignmentSet=self._sharedAlignmentSet)
+ alignmentSet=self.alignments, windows=self.referenceWindows)
# There are three different ways the ipdModel can be loaded.
# In order of precedence they are:
@@ -558,8 +566,7 @@ class KineticsToolsRunner(object):
logging.error("Params path doesn't exist: %s" % self.args.paramsPath)
sys.exit(1)
- majorityChem = ReferenceUtils.loadAlignmentChemistry(
- self._sharedAlignmentSet)
+ majorityChem = ReferenceUtils.loadAlignmentChemistry(self.alignments)
ipdModel = os.path.join(self.args.paramsPath, majorityChem + ".h5")
if majorityChem == 'unknown':
logging.error("Chemistry cannot be identified---cannot perform kinetic analysis")
@@ -580,11 +587,11 @@ class KineticsToolsRunner(object):
"""
logging.info("Reading AlignmentSet: %s" % cmpH5Filename)
logging.info(" reference: %s" % self.args.reference)
- self._sharedAlignmentSet = AlignmentSet(cmpH5Filename,
- referenceFastaFname=self.args.reference)
+ self.alignments = AlignmentSet(cmpH5Filename,
+ referenceFastaFname=self.args.reference)
# XXX this should ensure that the file(s) get opened, including any
# .pbi indices - but need to confirm this
- self.refInfo = self._sharedAlignmentSet.referenceInfoTable
+ self.refInfo = self.alignments.referenceInfoTable
def _mainLoop(self):
"""
@@ -603,31 +610,16 @@ class KineticsToolsRunner(object):
# interpreter crashes sometimes. See Bug 19704. Since we
# don't leak garbage cycles, disabling the cyclic GC is
# essentially harmless.
- gc.disable()
+ #gc.disable()
- # Load a copy of the cmpH5 alignment index to share with the slaves
self.loadSharedAlignmentSet(self.args.alignment_set)
- # Load reference and IpdModel
- self.loadReferenceAndModel(self.args.reference)
-
- # Spawn workers
- self._launchSlaveProcesses()
-
- # WARNING -- cmp.h5 file must be opened AFTER worker processes have been spawned
- # cmp.h5 we're using -- use this to orchestrate the work
- self.cmph5 = self._sharedAlignmentSet
- logging.info('Generating kinetics summary for [%s]' % self.args.alignment_set)
-
- #self.referenceMap = self.cmph5['/RefGroup'].asDict('RefInfoID', 'ID')
- #self.alnInfo = self.cmph5['/AlnInfo'].asRecArray()
-
# Resolve the windows that will be visited.
if self.args.referenceWindowsAsString is not None:
self.referenceWindows = []
for s in self.args.referenceWindowsAsString.split(","):
try:
- win = ReferenceUtils.parseReferenceWindow(s, self.cmph5.referenceInfo)
+ win = ReferenceUtils.parseReferenceWindow(s, self.alignments.referenceInfo)
self.referenceWindows.append(win)
except:
if self.args.skipUnrecognizedContigs:
@@ -635,11 +627,25 @@ class KineticsToolsRunner(object):
else:
raise Exception, "Unrecognized contig!"
elif self.args.referenceWindowsFromAlignment:
- self.referenceWindows = ReferenceUtils.referenceWindowsFromAlignment(self._sharedAlignmentSet, self.cmph5.referenceInfo)
+ self.referenceWindows = ReferenceUtils.referenceWindowsFromAlignment(self.alignments, self.alignments.referenceInfo)
+ refNames = set([rw.refName for rw in self.referenceWindows])
+ # limit output to contigs that overlap with reference windows
+ self.refInfo = [r for r in self.refInfo if r.Name in refNames]
else:
self.referenceWindows = ReferenceUtils.createReferenceWindows(
self.refInfo)
+ # Load reference and IpdModel
+ self.loadReferenceAndModel(self.args.reference)
+
+ # Spawn workers
+ self._launchSlaveProcesses()
+
+ logging.info('Generating kinetics summary for [%s]' % self.args.alignment_set)
+
+ #self.referenceMap = self.alignments['/RefGroup'].asDict('RefInfoID', 'ID')
+ #self.alnInfo = self.alignments['/AlnInfo'].asRecArray()
+
# Main loop -- we loop over ReferenceGroups in the cmp.h5. For each contig we will:
# 1. Load the sequence into the main memory of the parent process
# 2. Fork the workers
@@ -650,7 +656,7 @@ class KineticsToolsRunner(object):
# Iterate over references
for window in self.referenceWindows:
logging.info('Processing window/contig: %s' % (window,))
- for chunk in ReferenceUtils.enumerateChunks(1000, window):
+ for chunk in ReferenceUtils.enumerateChunks(self.args.referenceStride, window):
self._workQueue.put((self.workChunkCounter, chunk))
self.workChunkCounter += 1
@@ -667,7 +673,7 @@ class KineticsToolsRunner(object):
self._resultsQueue.join()
self._resultCollectorProcess.join()
logging.info("ipdSummary.py finished. Exiting.")
- del self.cmph5
+ self.alignments.close()
return 0
@@ -715,12 +721,14 @@ def resolved_tool_contract_runner(resolved_contract):
alignment_path = rc.task.input_files[0]
reference_path = rc.task.input_files[1]
gff_path = rc.task.output_files[0]
- csv_path = rc.task.output_files[1]
+ bigwig_path = rc.task.output_files[1]
+ h5_path = rc.task.output_files[2]
args = [
alignment_path,
"--reference", reference_path,
"--gff", gff_path,
- "--csv", csv_path,
+ "--bigwig", bigwig_path,
+ "--csv_h5", h5_path,
"--numWorkers", str(rc.task.nproc),
"--pvalue", str(rc.task.options[Constants.PVALUE_ID]),
"--alignmentSetRefWindows",
@@ -737,6 +745,7 @@ def resolved_tool_contract_runner(resolved_contract):
args.extend([
"--identify", rc.task.options[Constants.IDENTIFY_ID],
])
+ log.info("Converted arguments: ipdSummary {c}".format(c=" ".join(args)))
args_ = get_parser().arg_parser.parser.parse_args(args)
return args_runner(args_)
diff --git a/kineticsTools/summarizeModifications.py b/kineticsTools/summarizeModifications.py
index deb2768..2f2c669 100755
--- a/kineticsTools/summarizeModifications.py
+++ b/kineticsTools/summarizeModifications.py
@@ -178,7 +178,7 @@ def get_parser():
name="GFF file",
description="Alignment summary GFF")
p.add_output_file_type(FileTypes.GFF, "gff_out",
- name="GFF file",
+ name="Alignment summary with basemods",
description="Modified alignment summary file",
default_name="alignment_summary_with_basemods")
return p
diff --git a/kineticsTools/tasks/__init__.py b/kineticsTools/tasks/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/kineticsTools/tasks/gather_kinetics_h5.py b/kineticsTools/tasks/gather_kinetics_h5.py
new file mode 100644
index 0000000..76703f5
--- /dev/null
+++ b/kineticsTools/tasks/gather_kinetics_h5.py
@@ -0,0 +1,172 @@
+
+"""
+pbsmrtpipe task to gather chunked hdf5 files containing raw kinetics data.
+"""
+
+
+from collections import defaultdict
+import logging
+import sys
+
+import h5py
+
+from pbcommand.pb_io.common import load_pipeline_chunks_from_json
+from pbcommand.cli import pbparser_runner
+from pbcommand.models import get_gather_pbparser, FileTypes
+from pbcommand.utils import setup_log
+
+log = logging.getLogger(__name__)
+
+
+class Constants(object):
+ TASK_ID = "kinetics_tools.tasks.gather_kinetics_h5"
+ VERSION = "0.1.0"
+ DRIVER_EXE = "python -m kineticsTools.tasks.gather_kinetics_h5 --resolved-tool-contract "
+ CHUNK_KEY = "$chunk.h5_id"
+ OPT_CHUNK_KEY = "kinetics_tools.tasks.gather_h5_chunk_key"
+
+
+def get_parser():
+ p = get_gather_pbparser(Constants.TASK_ID,
+ Constants.VERSION,
+ "Dev Kinetics HDF5 Gather",
+ "General Chunk Kinetics HDF5 Gather",
+ Constants.DRIVER_EXE,
+ is_distributed=True,
+ default_level=logging.INFO)
+ p.add_input_file_type(FileTypes.CHUNK, "cjson_in", "GCHUNK Json",
+ "Gathered CHUNK Json with BigWig chunk key")
+
+ p.add_output_file_type(FileTypes.H5, "h5_out",
+ "Kinetics HDF5 file",
+ "Gathered HDF5 file", "gathered")
+
+ # Only need to add to argparse layer for the commandline
+ p.arg_parser.add_str(Constants.OPT_CHUNK_KEY,
+ "chunk_key",
+ Constants.CHUNK_KEY,
+ "Chunk key",
+ "Chunk key to use (format $chunk.{chunk-key}")
+
+ return p
+
+
+def gather_kinetics_h5(chunked_files, output_file):
+ """
+ Gather a set of 'flat' kinetics hdf5 chunks. This will not scale well for
+ large (i.e. non-bacterial) genomes.
+ """
+ log.info("creating {f}...".format(f=output_file))
+ out = h5py.File(output_file, "w")
+ first = h5py.File(chunked_files[0])
+ dataLength = len(first[first.keys()[0]])
+ chunkSize = min(dataLength, 8192 * 8)
+ datasets = {}
+ def _add_chunk(chunk):
+ # FIXME this is insanely inefficient
+ log.debug(" getting mask")
+ mask = chunk['base'].__array__() != ''
+ for key in datasets.keys():
+ log.debug(" copying '{k}'".format(k=key))
+ datasets[key][mask] = chunk[key][mask]
+ del mask
+ for key in first.keys():
+ ds = out.create_dataset(key, (dataLength,),
+ dtype=first[key].dtype,
+ compression="gzip",
+ chunks=(chunkSize,),
+ compression_opts=2)
+ datasets[key] = ds
+ log.info("adding first chunk in {f}".format(f=chunked_files[0]))
+ _add_chunk(first)
+ out.flush()
+ for file_name in chunked_files[1:]:
+ #out = h5py.File(output_file, "r+")
+ chunk = h5py.File(file_name)
+ log.info("adding chunk in {f}".format(f=file_name))
+ _add_chunk(chunk)
+ out.close()
+ return 0
+
+
+def gather_kinetics_h5_byref(chunked_files, output_file):
+ """
+ Gather a set of hdf5 files containing with per-reference datasets. This
+ implementation sacrifices some computational efficiency to limit the
+ memory overhead.
+ """
+ log.info("creating {f}...".format(f=output_file))
+ out = h5py.File(output_file, "w")
+ first = h5py.File(chunked_files[0])
+ refs_by_file = defaultdict(list)
+ ds_types = {}
+ ref_sizes = {}
+ for file_name in chunked_files:
+ chunk = h5py.File(file_name)
+ for rname in chunk.keys():
+ refs_by_file[rname].append(file_name)
+ grp = chunk[rname]
+ for ds_id in grp.keys():
+ if not ds_id in ds_types:
+ ds_types[ds_id] = grp[ds_id].dtype
+ else:
+ assert ds_types[ds_id] == grp[ds_id].dtype
+ if not rname in ref_sizes:
+ ref_sizes[rname] = len(grp[ds_id])
+ else:
+ assert ref_sizes[rname] == len(grp[ds_id])
+ chunk.close()
+ for ref_name, file_names in refs_by_file.iteritems():
+ log.info("creating group {r} (dataset length {s})".format(
+ r=ref_name,
+ s=ref_sizes[ref_name]))
+ grp = out.create_group(ref_name)
+ dataLength = ref_sizes[ref_name]
+ chunkSize = min(dataLength, 8192)
+ for ds_id, ds_type in ds_types.iteritems():
+ log.debug(" dataset {i} ({t})".format(i=ds_id, t=ds_type))
+ ds = grp.create_dataset(ds_id, (dataLength,), dtype=ds_type,
+ compression="gzip", chunks=(chunkSize,))
+ for file_name in file_names:
+ log.debug(" reading from {f}".format(f=file_name))
+ chunk = h5py.File(file_name)
+ grp_chk = chunk[ref_name]
+ mask = grp_chk['base'].__array__() != ''
+ ds[mask] = grp_chk[ds_id][mask]
+ del mask
+ chunk.close()
+ out.flush()
+ out.close()
+ return 0
+
+
+def _run_main(chunk_input_json, output_file, chunk_key):
+ chunks = load_pipeline_chunks_from_json(chunk_input_json)
+ chunked_files = []
+ for chunk in chunks:
+ if chunk_key in chunk.chunk_keys:
+ chunked_files.append(chunk.chunk_d[chunk_key])
+ else:
+ raise KeyError("Unable to find chunk key '{i}' in {p}".format(i=chunk_key, p=chunk))
+ return gather_kinetics_h5_byref(chunked_files, output_file)
+
+
+def args_runner(args):
+ return _run_main(args.cjson_in, args.bigwig_out, args.chunk_key)
+
+
+def rtc_runner(rtc):
+ return _run_main(rtc.task.input_files[0], rtc.task.output_files[0], rtc.task.chunk_key)
+
+
+def main(argv=sys.argv):
+ return pbparser_runner(argv[1:],
+ get_parser(),
+ args_runner,
+ rtc_runner,
+ log,
+ setup_log)
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/requirements-ci.txt b/requirements-ci.txt
index a43e0c4..002d11d 100644
--- a/requirements-ci.txt
+++ b/requirements-ci.txt
@@ -1,12 +1,13 @@
cython
numpy
-h5py
+h5py >= 1.3.0
jinja2
networkx
jsonschema
xmlbuilder
functools32
pyxb
+pyBigWig
# Install from github
-e git://github.com/PacificBiosciences/pbcore.git@master#egg=pbcore
-e git://github.com/PacificBiosciences/pbcommand.git#egg=pbcommand
diff --git a/requirements-dev.txt b/requirements-dev.txt
index 80e9144..0af0380 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -1,3 +1,2 @@
--r requirements.txt
sphinx
nose
diff --git a/setup.py b/setup.py
index bc1f56b..e1eb4bf 100755
--- a/setup.py
+++ b/setup.py
@@ -4,12 +4,11 @@ import sys
setup(
name='kineticsTools',
- version='0.5.2',
+ version='0.6.0',
author='Pacific Biosciences',
author_email='devnet at pacificbiosciences.com',
license=open('LICENSES.txt').read(),
- packages=find_packages('.'),
- package_dir={'': '.'},
+ packages=find_packages("."),
package_data={'kineticsTools': ['resources/*.h5']},
ext_modules=[Extension('kineticsTools/tree_predict', ['kineticsTools/tree_predict.c'],
extra_compile_args=["-O3", "-shared", "-std=c99"],
@@ -21,6 +20,7 @@ setup(
'h5py >= 1.3.0',
'scipy >= 0.9.0',
'pbcommand >= 0.3.22',
+ #'pyBigWig'
],
entry_points={'console_scripts': [
"ipdSummary = kineticsTools.ipdSummary:main",
diff --git a/test/cram/extra/bigwig.t b/test/cram/extra/bigwig.t
new file mode 100644
index 0000000..5742c29
--- /dev/null
+++ b/test/cram/extra/bigwig.t
@@ -0,0 +1,15 @@
+Test support for BigWig output of IpdRatio.
+
+ $ . $TESTDIR/../portability.sh
+
+Load in data:
+
+ $ DATA=/pbi/dept/secondary/siv/testdata/kineticsTools
+ $ INPUT=$DATA/Hpyl_1_5000.xml
+ $ REFERENCE=/pbi/dept/secondary/siv/references/Helicobacter_pylori_J99/sequence/Helicobacter_pylori_J99.fasta
+
+Run basic ipdSummary:
+
+ $ ipdSummary --log-level=WARNING --bigwig tmp1.bw --numWorkers 12 --pvalue 0.001 --identify m6A,m4C --reference $REFERENCE --referenceWindows="gi|12057207|gb|AE001439.1|:0-5000" $INPUT
+ $ ls tmp1.bw
+ tmp1.bw
diff --git a/test/cram/version.t b/test/cram/version.t
index ef00278..91ce3dd 100644
--- a/test/cram/version.t
+++ b/test/cram/version.t
@@ -9,14 +9,15 @@ A simple test of the version and help options:
[--log-file LOG_FILE]
[--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL} | --debug | --quiet | -v]
--reference REFERENCE [--gff GFF] [--csv CSV]
- [--numWorkers NUMWORKERS] [--pvalue PVALUE]
- [--maxLength MAXLENGTH] [--identify IDENTIFY]
- [--methylFraction] [--outfile OUTFILE] [--m5Cgff M5CGFF]
- [--m5Cclassifer M5CCLASSIFIER] [--csv_h5 CSV_H5]
- [--pickle PICKLE] [--summary_h5 SUMMARY_H5]
- [--ms_csv MS_CSV] [--control CONTROL] [--useLDA]
- [--paramsPath PARAMSPATH] [--minCoverage MINCOVERAGE]
- [--maxQueueSize MAXQUEUESIZE] [--maxCoverage MAXCOVERAGE]
+ [--bigwig BIGWIG] [--numWorkers NUMWORKERS]
+ [--pvalue PVALUE] [--maxLength MAXLENGTH]
+ [--identify IDENTIFY] [--methylFraction] [--outfile OUTFILE]
+ [--m5Cgff M5CGFF] [--m5Cclassifer M5CCLASSIFIER]
+ [--csv_h5 CSV_H5] [--pickle PICKLE]
+ [--summary_h5 SUMMARY_H5] [--ms_csv MS_CSV]
+ [--control CONTROL] [--useLDA] [--paramsPath PARAMSPATH]
+ [--minCoverage MINCOVERAGE] [--maxQueueSize MAXQUEUESIZE]
+ [--maxCoverage MAXCOVERAGE]
[--mapQvThreshold MAPQVTHRESHOLD] [--ipdModel IPDMODEL]
[--modelIters MODELITERS] [--cap_percentile CAP_PERCENTILE]
[--methylMinCov METHYLMINCOV]
@@ -27,7 +28,7 @@ A simple test of the version and help options:
[-W REFERENCEWINDOWSASSTRING]
[--skipUnrecognizedContigs SKIPUNRECOGNIZEDCONTIGS]
[--alignmentSetRefWindows] [--threaded] [--profile] [--pdb]
- [--seed RANDOMSEED]
+ [--seed RANDOMSEED] [--referenceStride REFERENCESTRIDE]
alignment_set
ipdSummary: error: too few arguments
[2]
diff --git a/test/test_gather_h5.py b/test/test_gather_h5.py
new file mode 100644
index 0000000..954fa26
--- /dev/null
+++ b/test/test_gather_h5.py
@@ -0,0 +1,77 @@
+
+import unittest
+import tempfile
+
+import numpy as np
+import h5py
+
+import pbcommand.testkit.core
+from pbcommand.models import PipelineChunk
+from pbcommand.pb_io.common import write_pipeline_chunks
+
+from kineticsTools.tasks.gather_kinetics_h5 import gather_kinetics_h5
+
+class SetUpHDF5(object):
+ DATALENGTH = 10000
+ CHUNKSIZE = 2048
+ CHUNKED_FILES = [
+ tempfile.NamedTemporaryFile(suffix=".h5").name,
+ tempfile.NamedTemporaryFile(suffix=".h5").name
+ ]
+
+ @classmethod
+ def makeInputs(cls):
+ for k, ifn in enumerate(cls.CHUNKED_FILES):
+ f = h5py.File(cls.CHUNKED_FILES[k], "w")
+ a = f.create_dataset("base", (cls.DATALENGTH,),
+ dtype="a1", compression="gzip",
+ chunks=(cls.CHUNKSIZE,), compression_opts=2)
+ b = f.create_dataset("score", (cls.DATALENGTH,),
+ dtype="u4", compression="gzip",
+ chunks=(cls.CHUNKSIZE,), compression_opts=2)
+ c = f.create_dataset("tMean", (cls.DATALENGTH,),
+ dtype="f4", compression="gzip",
+ chunks=(cls.CHUNKSIZE,), compression_opts=2)
+ start = k * (cls.DATALENGTH / 2)
+ end = (k + 1) * (cls.DATALENGTH / 2)
+ a[start:end] = np.array(['A' for x in range(start, end)])
+ b[start:end] = np.array([2*(k+1) for x in range(start, end)])
+ c[start:end] = np.sqrt(np.array(range(start+1, end+1)))
+ f.close()
+
+
+class TestGatherHDF5(unittest.TestCase, SetUpHDF5):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.makeInputs()
+
+ def test_gather_kinetics_h5(self):
+ ofn = tempfile.NamedTemporaryFile(suffix=".h5").name
+ gather_kinetics_h5(self.CHUNKED_FILES, ofn)
+ f = h5py.File(ofn)
+ self.assertTrue(all(f['base'].__array__() == 'A'))
+ self.assertTrue(all(f['score'].__array__() > 0))
+ self.assertEqual(f['score'].__array__().mean(), 3)
+ self.assertTrue(all(f['tMean'].__array__() > 0))
+ d = np.round(f['tMean'].__array__() ** 2).astype("u4")
+ self.assertTrue(all(d == np.array(range(1, self.DATALENGTH+1))))
+
+
+class TestGatherH5ToolContract(pbcommand.testkit.core.PbTestGatherApp, SetUpHDF5):
+ DRIVER_BASE = "python -m kineticsTools.tasks.gather_kinetics_h5 "
+ INPUT_FILES = [tempfile.NamedTemporaryFile(suffix=".json").name]
+ CHUNK_KEY = "$chunk.h5_id"
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestGatherH5ToolContract, cls).setUpClass()
+ cls.makeInputs()
+ chunks = [PipelineChunk(chunk_id="chunk_data_{i}".format(i=i),
+ **({cls.CHUNK_KEY:fn}))
+ for i, fn in enumerate(cls.CHUNKED_FILES)]
+ write_pipeline_chunks(chunks, cls.INPUT_FILES[0], None)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_outputs.py b/test/test_outputs.py
new file mode 100644
index 0000000..03430de
--- /dev/null
+++ b/test/test_outputs.py
@@ -0,0 +1,105 @@
+
+"""
+Test sanity of various output formats for a minimal real-world example.
+"""
+
+import subprocess
+import tempfile
+import unittest
+import os.path
+import csv
+import re
+
+import h5py
+
+
+os.environ["PACBIO_TEST_ENV"] = "1" # turns off --verbose
+
+DATA_DIR = "/pbi/dept/secondary/siv/testdata/kineticsTools"
+REF_DIR = "/pbi/dept/secondary/siv/references/Helicobacter_pylori_J99"
+ALIGNMENTS = os.path.join(DATA_DIR, "Hpyl_1_5000.xml")
+REFERENCE = os.path.join(REF_DIR, "sequence", "Helicobacter_pylori_J99.fasta")
+
+skip_if_no_data = unittest.skipUnless(
+ os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
+ "%s or %s not available" % (DATA_DIR, REF_DIR))
+
+ at skip_if_no_data
+class TestOutputs(unittest.TestCase):
+
+ @classmethod
+ @skip_if_no_data
+ def setUpClass(cls):
+ prefix = tempfile.NamedTemporaryFile().name
+ cls.h5_file = "{p}.h5".format(p=prefix)
+ cls.csv_file = "{p}.csv".format(p=prefix)
+ cls.gff_file = "{p}.gff".format(p=prefix)
+ cls.bw_file = "{p}.bw".format(p=prefix)
+ args = [
+ "ipdSummary", "--log-level", "WARNING",
+ "--csv_h5", cls.h5_file,
+ "--csv", cls.csv_file,
+ "--gff", cls.gff_file,
+ "--bigwig", cls.bw_file,
+ "--numWorkers", "12",
+ "--pvalue", "0.001",
+ "--identify", "m6A,m4C",
+ "--referenceStride", "100",
+ "--referenceWindows", "gi|12057207|gb|AE001439.1|:0-200",
+ "--reference", REFERENCE,
+ ALIGNMENTS
+ ]
+ print " ".join(args)
+ assert subprocess.call(args) == 0
+ with open(cls.csv_file) as f:
+ cls.csv_records = [l.split(",") for l in f.read().splitlines()][1:]
+
+ @classmethod
+ def tearDownClass(cls):
+ return
+ for fn in [cls.h5_file, cls.csv_file, cls.gff_file, cls.bw_file]:
+ if os.path.exists(fn):
+ os.remove(fn)
+
+ def test_h5_output(self):
+ f = h5py.File(self.h5_file)
+ bases = f['base'].__array__()
+ self.assertEqual(bases[0], "A")
+ self.assertEqual(bases[100], "T")
+ self.assertEqual(list(bases[0:400] != "").count(True), 400)
+ seqh_fwd = ''.join([f['base'][x*2] for x in range(200)])
+ seqc_fwd = ''.join([self.csv_records[x*2][3] for x in range(200)])
+ self.assertEqual(seqh_fwd, seqc_fwd)
+ seqh_rev = ''.join([f['base'][(x*2)+1] for x in range(200)])
+ seqc_rev = ''.join([self.csv_records[(x*2)+1][3] for x in range(200)])
+ self.assertEqual(seqh_rev, seqc_rev)
+ tpl_fwd = [f['tpl'][x*2] for x in range(200)]
+ self.assertEqual(tpl_fwd, range(1, 201))
+ f = h5py.File(self.h5_file)
+ for i_rec, rec in enumerate(self.csv_records):
+ self.assertEqual(str(f['tpl'][i_rec]), self.csv_records[i_rec][1])
+ self.assertEqual("%.3f"%f['ipdRatio'][i_rec],
+ self.csv_records[i_rec][8])
+
+ def test_csv_output(self):
+ self.assertEqual(len(self.csv_records), 400)
+ self.assertEqual(self.csv_records[0][3], "A")
+ self.assertEqual(self.csv_records[100][3], "T")
+
+ def test_bigwig(self):
+ import pyBigWig
+ f = pyBigWig.open(self.bw_file)
+ for i_rec, rec in enumerate(self.csv_records):
+ seqid = re.sub('\"', "", rec[0])
+ tpl = int(rec[1]) - 1
+ s = int(f.values(seqid, tpl, tpl+1)[0])
+ ipd_minus = (s % 65536) / 100.0
+ ipd_plus = (s >> 16) / 100.0
+ if rec[2] == "1":
+ self.assertAlmostEqual(ipd_minus, float(rec[8]), places=1)
+ else:
+ self.assertAlmostEqual(ipd_plus, float(rec[8]), places=1)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_tool_contract.py b/test/test_tool_contract.py
index 5056b24..023d585 100755
--- a/test/test_tool_contract.py
+++ b/test/test_tool_contract.py
@@ -9,6 +9,8 @@ import logging
import os.path
import csv
+import h5py
+
import pbcommand.testkit
os.environ["PACBIO_TEST_ENV"] = "1" # turns off --verbose
@@ -29,6 +31,7 @@ gi|12057207|gb|AE001439.1|\tkinModCall\tm6A\t35\t35\t187\t-\t.\tcoverage=118;con
gi|12057207|gb|AE001439.1|\tkinModCall\tm4C\t60\t60\t49\t-\t.\tcoverage=112;context=AAAAAGCTCGCTCAAAAACCCTTGATTTAAGGGCGTTTTAT;IPDRatio=2.58;identificationQv=33
gi|12057207|gb|AE001439.1|\tkinModCall\tm6A\t89\t89\t223\t+\t.\tcoverage=139;context=AGCGAGCTTTTTGCTCAAAGAATCCAAGATAGCGTTTAAAA;IPDRatio=5.69;identificationQv=187"""
+# FIXME this is much too slow
@unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
"%s or %s not available" % (DATA_DIR, REF_DIR))
class TestIpdSummary(pbcommand.testkit.PbTestApp):
@@ -49,27 +52,29 @@ class TestIpdSummary(pbcommand.testkit.PbTestApp):
def run_after(self, rtc, output_dir):
gff_file = os.path.join(output_dir, rtc.task.output_files[0])
- csv_file = os.path.join(output_dir, rtc.task.output_files[1])
def lc(fn): return len(open(fn).readlines())
self.assertEqual(lc(gff_file), Constants.N_LINES_GFF)
- self.assertEqual(lc(csv_file), Constants.N_LINES_CSV)
- def head(fn,n): return "\n".join( open(fn).read().splitlines()[0:n] )
- self.assertEqual(head(csv_file, 3), Constants.INITIAL_LINES_CSV)
def head2(fn,n):
out = []
- i = 0
for line in open(fn).read().splitlines():
if line[0] != '#':
out.append(line)
- i += 1
- if i == n:
+ if len(out) == n:
break
return "\n".join(out)
self.assertEqual(head2(gff_file, 3), Constants.INITIAL_LINES_GFF)
-
-
- at unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
- "%s or %s not available" % (DATA_DIR, REF_DIR))
+ f = h5py.File(rtc.task.output_files[2])
+ seqh_fwd = ''.join([f['base'][x*2] for x in range(5000)])
+ seqh_rev = ''.join([f['base'][(x*2)+1] for x in range(5000)])
+ self.assertEqual(len(seqh_fwd), 5000)
+ self.assertEqual(len(seqh_rev), 5000)
+
+
+# FIXME this is covered by integration tests but the output is twitchy enough
+# that it would be good to test here as well
+#@unittest.skipUnless(os.path.isdir(DATA_DIR) and os.path.isdir(REF_DIR),
+# "%s or %s not available" % (DATA_DIR, REF_DIR))
+ at unittest.skip("FIXME")
class TestIpdSummaryChunk(TestIpdSummary):
"""
This test is identical to the above except for using an AlignmentSet with
@@ -84,7 +89,6 @@ class TestIpdSummaryChunk(TestIpdSummary):
def run_after(self, rtc, output_dir):
gff_file = os.path.join(output_dir, rtc.task.output_files[0])
- csv_file = os.path.join(output_dir, rtc.task.output_files[1])
logging.critical(gff_file)
logging.critical("%s %s" % (csv_file, os.path.getsize(csv_file)))
with open(csv_file) as f:
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/kineticstools.git
More information about the debian-med-commit
mailing list