[Pkg-javascript-commits] [node-leveldown] 10/492: (mostly) complete basic stream implementations
Andrew Kelley
andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:38 UTC 2014
This is an automated email from the git hooks/post-receive script.
andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.
commit 13f4712ba66d24e9ae6af37e4311fd97d547277f
Author: Rod Vagg <rod at vagg.org>
Date: Sun Aug 12 22:50:01 2012 +1000
(mostly) complete basic stream implementations
---
lib/read-stream.js | 46 +++++++++++++++--
lib/write-stream.js | 67 +++++++++++++++---------
src/iterator.cc | 4 ++
src/iterator.h | 7 +--
src/iterator_async.cc | 8 +--
test/read-stream-test.js | 128 +++++++++++++++++++++++++++++++++++++++++++---
test/write-stream-test.js | 65 +++++++++++++++++++++--
7 files changed, 278 insertions(+), 47 deletions(-)
diff --git a/lib/read-stream.js b/lib/read-stream.js
index 560dd2c..c32d649 100644
--- a/lib/read-stream.js
+++ b/lib/read-stream.js
@@ -7,8 +7,12 @@ function ReadStream (options, db, iteratorFactory) {
Stream.call(this)
this._options = options
this._status = 'ready'
+ this.readable = true
+ this.writable = false
var ready = function () {
+ if (this._status == 'ended')
+ return
this._iterator = iteratorFactory()
this.emit('ready')
this._read()
@@ -24,22 +28,54 @@ inherits(ReadStream, Stream)
ReadStream.prototype._read = function () {
if (this._status == 'ready' || this._status == 'reading') {
- this._iterator.next(this._onEnd.bind(this), this._onData.bind(this))
+ this._iterator.next(this._cleanup.bind(this), this._onData.bind(this))
}
}
-ReadStream.prototype._onData = function (key, value) {
+ReadStream.prototype._onData = function (err, key, value) {
+ if (err)
+ return this._cleanup(err)
+ if (this._status == 'ended')
+ return
if (this._status == 'ready') this._status = 'reading'
+ this._read()
this.emit('data', {
key : toEncoding(key , this._options.keyEncoding || this._options.encoding)
, value : toEncoding(value , this._options.valueEncoding || this._options.encoding)
})
- this._read()
}
-ReadStream.prototype._onEnd = function () {
+ReadStream.prototype._cleanup = function(err) {
+ var s = this._status
this._status = 'ended'
- this.emit('end')
+ this.readable = false
+ if (this._iterator) {
+ this._iterator.end(function () {
+ this.emit('close')
+ }.bind(this))
+ } else
+ this.emit('close')
+ if (err)
+ this.emit('error', err)
+ else (s != 'destroyed')
+ this.emit('end')
+}
+
+ReadStream.prototype.destroy = function() {
+ this._status = 'destroyed'
+ this._cleanup()
+}
+
+ReadStream.prototype.pause = function() {
+ if (this._status != 'ended')
+ this._status += '+paused' // preserve existing status
+}
+
+ReadStream.prototype.resume = function() {
+ if (this._status != 'ended') {
+ this._status = this._status.replace(/\+paused$/, '')
+ this._read()
+ }
}
module.exports = ReadStream
\ No newline at end of file
diff --git a/lib/write-stream.js b/lib/write-stream.js
index 3d53014..0d35c49 100644
--- a/lib/write-stream.js
+++ b/lib/write-stream.js
@@ -10,8 +10,12 @@ function WriteStream (options, db) {
this._buffer = []
this._status = 'init'
this._end = false
+ this.writable = true
+ this.readable = false
var ready = function () {
+ if (!this.writable)
+ return
this._status = 'ready'
this.emit('ready')
this._process()
@@ -26,6 +30,8 @@ function WriteStream (options, db) {
inherits(WriteStream, Stream)
WriteStream.prototype.write = function (data) {
+ if (!this.writable)
+ return false
this._buffer.push(data)
if (this._status != 'init')
this._processDelayed()
@@ -43,44 +49,48 @@ WriteStream.prototype._processDelayed = function() {
WriteStream.prototype._process = function() {
var entry
, cb = function (err) {
+ if (!this.writable)
+ return
if (this._status != 'closed')
this._status = 'ready'
- if (err)
+ if (err) {
+ this.writable = false
return this.emit('error', err)
+ }
this._process()
}.bind(this)
- if (this._status != 'ready') {
+ if (this._status != 'ready' && this.writable) {
if (this._buffer.length && this._status != 'closed')
this._processDelayed()
return
}
- if (this._end) {
- this.emit('close')
- this._status = 'closed'
- return
- }
-
- if (!this._buffer.length)
- return
-
- if (this._buffer.length == 1) {
- entry = this._buffer.pop()
- if (entry.key !== undefined && entry.value !== undefined) {
+ if (this._buffer.length && this.writable) {
+ if (this._buffer.length == 1) {
+ entry = this._buffer.pop()
+ if (entry.key !== undefined && entry.value !== undefined) {
+ this._status = 'writing'
+ this._db.put(entry.key, entry.value, cb)
+ }
+ } else {
this._status = 'writing'
- this._db.put(entry.key, entry.value, cb)
+ this._db.batch(this._buffer.map(function (d) {
+ return { type: 'put', key: d.key, value: d.value }
+ }), cb)
+ this._buffer = []
+ }
+ if (this._writeBlock) {
+ this._writeBlock = false
+ this.emit('drain')
}
- } else {
- this._status = 'writing'
- this._db.batch(this._buffer.map(function (d) {
- return { type: 'put', key: d.key, value: d.value }
- }), cb)
- this._buffer = []
}
- if (this._writeBlock) {
- this._writeBlock = false
- this.emit('drain')
+
+ if (this._end) {
+ this._status = 'closed'
+ this.writable = false
+ this.emit('close')
+ return
}
}
@@ -91,4 +101,13 @@ WriteStream.prototype.end = function() {
}.bind(this))
}
+WriteStream.prototype.destroy = function() {
+ this.writable = false
+ this.end()
+}
+
+WriteStream.prototype.destroySoon = function() {
+ this.end()
+}
+
module.exports = WriteStream
\ No newline at end of file
diff --git a/src/iterator.cc b/src/iterator.cc
index 993b127..590c6be 100644
--- a/src/iterator.cc
+++ b/src/iterator.cc
@@ -43,6 +43,10 @@ bool levelup::Iterator::IteratorNext (string& key, string& value) {
}
}
+Status levelup::Iterator::IteratorStatus () {
+ return dbIterator->status();
+}
+
void levelup::Iterator::IteratorEnd () {
//TODO: could return it->status()
delete dbIterator;
diff --git a/src/iterator.h b/src/iterator.h
index 5adf9e9..7d7a8a0 100644
--- a/src/iterator.h
+++ b/src/iterator.h
@@ -16,11 +16,12 @@ Handle<Value> CreateIterator (const Arguments& args);
class Iterator : public node::ObjectWrap {
public:
- static void Init ();
+ static void Init ();
static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
- bool IteratorNext (string& key, string& value);
- void IteratorEnd ();
+ bool IteratorNext (string& key, string& value);
+ Status IteratorStatus ();
+ void IteratorEnd ();
private:
Iterator (
diff --git a/src/iterator_async.cc b/src/iterator_async.cc
index a7dad2e..122a8db 100644
--- a/src/iterator_async.cc
+++ b/src/iterator_async.cc
@@ -16,16 +16,18 @@ using namespace leveldb;
void NextWorker::Execute () {
ok = iterator->IteratorNext(key, value);
+ if (!ok)
+ status = iterator->IteratorStatus();
}
void NextWorker::HandleOKCallback () {
if (ok) {
Local<Value> argv[] = {
- Local<Value>::New(Buffer::New((char*)key.data(), key.size())->handle_)
+ Local<Value>::New(Null())
+ , Local<Value>::New(Buffer::New((char*)key.data(), key.size())->handle_)
, Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
};
- //delete value;
- runCallback(callback, argv, 2);
+ runCallback(callback, argv, 3);
} else {
Local<Value> argv[0];
runCallback(endCallback, argv, 0);
diff --git a/test/read-stream-test.js b/test/read-stream-test.js
index 9172466..93b4d7c 100644
--- a/test/read-stream-test.js
+++ b/test/read-stream-test.js
@@ -16,6 +16,7 @@ buster.testCase('ReadStream', {
this.readySpy = this.spy()
this.dataSpy = this.spy()
+ this.endSpy = this.spy()
this.sourceData = []
for (var i = 0; i < 10; i++) {
@@ -26,8 +27,11 @@ buster.testCase('ReadStream', {
})
}
- this.verify = function (done) {
+ this.verify = function (rs, done) {
+ assert.isFalse(rs.writable)
+ assert.isFalse(rs.readable)
assert.equals(this.readySpy.callCount, 1, 'ReadStream emitted single "ready" event')
+ assert.equals(this.endSpy.callCount, 1, 'ReadStream emitted single "end" event')
assert.equals(this.dataSpy.callCount, this.sourceData.length, 'ReadStream emitted correct number of "data" events')
this.sourceData.forEach(function (d, i) {
var call = this.dataSpy.getCall(i)
@@ -56,9 +60,12 @@ buster.testCase('ReadStream', {
refute(err)
var rs = db.readStream()
- rs.on('ready' , this.readySpy)
- rs.on('data' , this.dataSpy)
- rs.on('end' , this.verify.bind(this, done))
+ assert.isFalse(rs.writable)
+ assert.isTrue(rs.readable)
+ rs.on('ready', this.readySpy)
+ rs.on('data' , this.dataSpy)
+ rs.on('end' , this.endSpy)
+ rs.on('close', this.verify.bind(this, rs, done))
}.bind(this))
}.bind(this))
}
@@ -74,9 +81,12 @@ buster.testCase('ReadStream', {
refute(err)
var rs = db.readStream()
- rs.on('ready' , this.readySpy)
- rs.on('data' , this.dataSpy)
- rs.on('end' , this.verify.bind(this, done))
+ assert.isFalse(rs.writable)
+ assert.isTrue(rs.readable)
+ rs.on('ready', this.readySpy)
+ rs.on('data' , this.dataSpy)
+ rs.on('end' , this.endSpy)
+ rs.on('close', this.verify.bind(this, rs, done))
}.bind(this))
}.bind(this)
@@ -92,4 +102,108 @@ buster.testCase('ReadStream', {
})
}.bind(this))
}
+
+ , 'test pausing': function (done) {
+ var calls = 0
+ , rs
+ , pauseVerify = function () {
+ // NOTE: another one *will* slip through because the stream triggers an async read before triggering the event
+ assert.equals(calls, 6, 'stream should still be paused')
+ rs.resume()
+ pauseVerify.called = true
+ }
+ , onData = function () {
+ if (++calls == 5) {
+ rs.pause()
+ setTimeout(pauseVerify, 50)
+ }
+ }
+ , verify = function () {
+ assert.equals(calls, 10, 'onData was used in test')
+ assert(pauseVerify.called, 'pauseVerify was used in test')
+ this.verify(rs, done)
+ }.bind(this)
+
+ this.dataSpy = this.spy(onData) // so we can still verify
+
+ this.openTestDatabase(function (db) {
+ // execute
+ db.batch(this.sourceData.slice(), function (err) {
+ refute(err)
+
+ rs = db.readStream()
+ assert.isFalse(rs.writable)
+ assert.isTrue(rs.readable)
+ rs.on('ready', this.readySpy)
+ rs.on('data' , this.dataSpy)
+ rs.on('end' , this.endSpy)
+ rs.on('close', verify.bind(this))
+
+ }.bind(this))
+ }.bind(this))
+ }
+
+ , 'test destroy() immediately': function (done) {
+ this.openTestDatabase(function (db) {
+ db.batch(this.sourceData.slice(), function (err) {
+ refute(err)
+
+ var rs = db.readStream()
+ assert.isFalse(rs.writable)
+ assert.isTrue(rs.readable)
+ rs.on('ready', this.readySpy)
+ rs.on('data' , this.dataSpy)
+ rs.on('end' , this.endSpy)
+ rs.on('close', function () {
+ assert.isFalse(rs.writable)
+ assert.isFalse(rs.readable)
+ assert.equals(this.readySpy.callCount, 0, '"ready" event was not fired')
+ assert.equals(this.dataSpy.callCount , 0, '"data" event was not fired')
+ assert.equals(this.endSpy.callCount , 0, '"end" event was not fired')
+ done()
+ }.bind(this))
+ rs.destroy()
+ }.bind(this))
+ }.bind(this))
+ }
+
+ , 'test destroy() half way through': function (done) {
+ this.openTestDatabase(function (db) {
+ db.batch(this.sourceData.slice(), function (err) {
+ refute(err)
+
+ var rs = db.readStream()
+ , endSpy = this.spy()
+ , calls = 0
+ this.dataSpy = this.spy(function () {
+ if (++calls == 5)
+ rs.destroy()
+ })
+ assert.isFalse(rs.writable)
+ assert.isTrue(rs.readable)
+ rs.on('ready', this.readySpy)
+ rs.on('data' , this.dataSpy)
+ rs.on('end' , endSpy)
+ rs.on('close', function () {
+ assert.isFalse(rs.writable)
+ assert.isFalse(rs.readable)
+ assert.equals(this.readySpy.callCount, 1, 'ReadStream emitted single "ready" event')
+ // should do "data" 5 times ONLY
+ assert.equals(this.dataSpy.callCount, 5, 'ReadStream emitted correct number of "data" events (5)')
+ this.sourceData.slice(0, 5).forEach(function (d, i) {
+ var call = this.dataSpy.getCall(i)
+ assert(call)
+ if (call) {
+ assert.equals(call.args.length, 1, 'ReadStream "data" event #' + i + ' fired with 1 argument')
+ refute.isNull(call.args[0].key, 'ReadStream "data" event #' + i + ' argument has "key" property')
+ refute.isNull(call.args[0].value, 'ReadStream "data" event #' + i + ' argument has "value" property')
+ assert.equals(call.args[0].key, d.key, 'ReadStream "data" event #' + i + ' argument has correct "key"')
+ assert.equals(call.args[0].value, d.value, 'ReadStream "data" event #' + i + ' argument has correct "value"')
+ }
+ }.bind(this))
+ done()
+ }.bind(this))
+ }.bind(this))
+ }.bind(this))
+ }
})
\ No newline at end of file
diff --git a/test/write-stream-test.js b/test/write-stream-test.js
index ed9968a..7997c85 100644
--- a/test/write-stream-test.js
+++ b/test/write-stream-test.js
@@ -25,7 +25,9 @@ buster.testCase('WriteStream', {
})
}
- this.verify = function (db, done) {
+ this.verify = function (ws, db, done) {
+ assert.isFalse(ws.writable)
+ assert.isFalse(ws.readable)
async.forEach(
this.sourceData
, function (data, callback) {
@@ -52,7 +54,7 @@ buster.testCase('WriteStream', {
ws.on('error', function (err) {
refute(err)
})
- ws.on('close', this.verify.bind(this, db, done))
+ ws.on('close', this.verify.bind(this, ws, db, done))
this.sourceData.forEach(function (d) {
ws.write(d)
})
@@ -67,7 +69,7 @@ buster.testCase('WriteStream', {
ws.on('error', function (err) {
refute(err)
})
- ws.on('close', this.verify.bind(this, db, done))
+ ws.on('close', this.verify.bind(this, ws, db, done))
async.forEachSeries(
this.sourceData
, function (d, callback) {
@@ -102,11 +104,13 @@ buster.testCase('WriteStream', {
// second element should warn that the buffer isn't being cleared
assert.isFalse(ws.write(this.sourceData[1]))
- ws.once('close', this.verify.bind(this, db, done))
+ ws.once('close', this.verify.bind(this, ws, db, done))
ws.once('drain', function () {
this.sourceData.slice(2).forEach(function (d, i) {
- assert[i != 0 ? 'isFalse' : 'isTrue'](ws.write(d), 'correct return value for element #' + i)
+ assert[i !== 0 ? 'isFalse' : 'isTrue'](ws.write(d), 'correct return value for element #' + i)
})
+ assert.isTrue(ws.writable)
+ assert.isFalse(ws.readable)
ws.end()
}.bind(this))
@@ -115,4 +119,55 @@ buster.testCase('WriteStream', {
refute(err)
})
}
+
+ // at the moment, destroySoon() is basically just end()
+ , 'test destroySoon()': function (done) {
+ this.openTestDatabase(function (db) {
+ var ws = db.writeStream()
+ ws.on('error', function (err) {
+ refute(err)
+ })
+ ws.on('close', this.verify.bind(this, ws, db, done))
+ this.sourceData.forEach(function (d) {
+ ws.write(d)
+ })
+ ws.once('ready', ws.destroySoon) // end after it's ready, nextTick makes this work OK
+ }.bind(this))
+ }
+
+ , 'test destroy()': function (done) {
+ var verify = function (ws, db) {
+ assert.isFalse(ws.writable)
+ async.forEach(
+ this.sourceData
+ , function (data, callback) {
+ db.get(data.key, function (err, value) {
+ // none of them should exist
+ assert(err)
+ refute(value)
+ callback()
+ })
+ }
+ , done
+ )
+ }
+
+ this.openTestDatabase(function (db) {
+ var ws = db.writeStream()
+ ws.on('error', function (err) {
+ refute(err)
+ })
+ assert.isTrue(ws.writable)
+ assert.isFalse(ws.readable)
+ ws.on('close', verify.bind(this, ws, db))
+ this.sourceData.forEach(function (d) {
+ ws.write(d)
+ assert.isTrue(ws.writable)
+ assert.isFalse(ws.readable)
+ })
+ assert.isTrue(ws.writable)
+ assert.isFalse(ws.readable)
+ ws.once('ready', ws.destroy)
+ }.bind(this))
+ }
})
\ No newline at end of file
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git
More information about the Pkg-javascript-commits
mailing list