[Pkg-javascript-commits] [node-leveldown] 10/492: (mostly) complete basic stream implementations

Andrew Kelley andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:38 UTC 2014


This is an automated email from the git hooks/post-receive script.

andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.

commit 13f4712ba66d24e9ae6af37e4311fd97d547277f
Author: Rod Vagg <rod at vagg.org>
Date:   Sun Aug 12 22:50:01 2012 +1000

    (mostly) complete basic stream implementations
---
 lib/read-stream.js        |  46 +++++++++++++++--
 lib/write-stream.js       |  67 +++++++++++++++---------
 src/iterator.cc           |   4 ++
 src/iterator.h            |   7 +--
 src/iterator_async.cc     |   8 +--
 test/read-stream-test.js  | 128 +++++++++++++++++++++++++++++++++++++++++++---
 test/write-stream-test.js |  65 +++++++++++++++++++++--
 7 files changed, 278 insertions(+), 47 deletions(-)

diff --git a/lib/read-stream.js b/lib/read-stream.js
index 560dd2c..c32d649 100644
--- a/lib/read-stream.js
+++ b/lib/read-stream.js
@@ -7,8 +7,12 @@ function ReadStream (options, db, iteratorFactory) {
   Stream.call(this)
   this._options = options
   this._status  = 'ready'
+  this.readable = true
+  this.writable = false
 
   var ready = function () {
+    if (this._status == 'ended')
+      return
     this._iterator = iteratorFactory()
     this.emit('ready')
     this._read()
@@ -24,22 +28,54 @@ inherits(ReadStream, Stream)
 
 ReadStream.prototype._read = function () {
   if (this._status == 'ready' || this._status == 'reading') {
-    this._iterator.next(this._onEnd.bind(this), this._onData.bind(this))
+    this._iterator.next(this._cleanup.bind(this), this._onData.bind(this))
   }
 }
 
-ReadStream.prototype._onData = function (key, value) {
+ReadStream.prototype._onData = function (err, key, value) {
+  if (err)
+    return this._cleanup(err)
+  if (this._status == 'ended')
+    return
   if (this._status == 'ready') this._status = 'reading'
+  this._read()
   this.emit('data', {
       key   : toEncoding(key   , this._options.keyEncoding   || this._options.encoding)
     , value : toEncoding(value , this._options.valueEncoding || this._options.encoding)
   })
-  this._read()
 }
 
-ReadStream.prototype._onEnd = function () {
+ReadStream.prototype._cleanup = function(err) {
+  var s = this._status
   this._status = 'ended'
-  this.emit('end')
+  this.readable = false
+  if (this._iterator) {
+    this._iterator.end(function () {
+      this.emit('close')
+    }.bind(this))
+  } else
+    this.emit('close')
+  if (err)
+    this.emit('error', err)
+  else (s != 'destroyed')
+    this.emit('end')
+}
+
+ReadStream.prototype.destroy = function() {
+  this._status = 'destroyed'
+  this._cleanup()
+}
+
+ReadStream.prototype.pause = function() {
+  if (this._status != 'ended')
+    this._status += '+paused' // preserve existing status
+}
+
+ReadStream.prototype.resume = function() {
+  if (this._status != 'ended') {
+    this._status = this._status.replace(/\+paused$/, '')
+    this._read()
+  }
 }
 
 module.exports = ReadStream
\ No newline at end of file
diff --git a/lib/write-stream.js b/lib/write-stream.js
index 3d53014..0d35c49 100644
--- a/lib/write-stream.js
+++ b/lib/write-stream.js
@@ -10,8 +10,12 @@ function WriteStream (options, db) {
   this._buffer  = []
   this._status  = 'init'
   this._end     = false
+  this.writable = true
+  this.readable = false
 
   var ready = function () {
+    if (!this.writable)
+      return
     this._status = 'ready'
     this.emit('ready')
     this._process()
@@ -26,6 +30,8 @@ function WriteStream (options, db) {
 inherits(WriteStream, Stream)
 
 WriteStream.prototype.write = function (data) {
+  if (!this.writable)
+    return false
   this._buffer.push(data)
   if (this._status != 'init')
     this._processDelayed()
@@ -43,44 +49,48 @@ WriteStream.prototype._processDelayed = function() {
 WriteStream.prototype._process = function() {
   var entry
     , cb = function (err) {
+        if (!this.writable)
+          return
         if (this._status != 'closed')
           this._status = 'ready'
-        if (err)
+        if (err) {
+          this.writable = false
           return this.emit('error', err)
+        }
         this._process()
       }.bind(this)
 
-  if (this._status != 'ready') {
+  if (this._status != 'ready' && this.writable) {
     if (this._buffer.length && this._status != 'closed')
       this._processDelayed()
     return
   }
 
-  if (this._end) {
-    this.emit('close')
-    this._status = 'closed'
-    return
-  }
-
-  if (!this._buffer.length)
-    return
-
-  if (this._buffer.length == 1) {
-    entry = this._buffer.pop()
-    if (entry.key !== undefined && entry.value !== undefined) {
+  if (this._buffer.length && this.writable) {
+    if (this._buffer.length == 1) {
+      entry = this._buffer.pop()
+      if (entry.key !== undefined && entry.value !== undefined) {
+        this._status = 'writing'
+        this._db.put(entry.key, entry.value, cb)
+      }
+    } else {
       this._status = 'writing'
-      this._db.put(entry.key, entry.value, cb)
+      this._db.batch(this._buffer.map(function (d) {
+        return { type: 'put', key: d.key, value: d.value }
+      }), cb)
+      this._buffer = []
+    }
+    if (this._writeBlock) {
+      this._writeBlock = false
+      this.emit('drain')
     }
-  } else {
-    this._status = 'writing'
-    this._db.batch(this._buffer.map(function (d) {
-      return { type: 'put', key: d.key, value: d.value }
-    }), cb)
-    this._buffer = []
   }
-  if (this._writeBlock) {
-    this._writeBlock = false
-    this.emit('drain')
+
+  if (this._end) {
+    this._status = 'closed'
+    this.writable = false
+    this.emit('close')
+    return
   }
 }
 
@@ -91,4 +101,13 @@ WriteStream.prototype.end = function() {
   }.bind(this))
 }
 
+WriteStream.prototype.destroy = function() {
+  this.writable = false
+  this.end()
+}
+
+WriteStream.prototype.destroySoon = function() {
+  this.end()
+}
+
 module.exports = WriteStream
\ No newline at end of file
diff --git a/src/iterator.cc b/src/iterator.cc
index 993b127..590c6be 100644
--- a/src/iterator.cc
+++ b/src/iterator.cc
@@ -43,6 +43,10 @@ bool levelup::Iterator::IteratorNext (string& key, string& value) {
   }
 }
 
+Status levelup::Iterator::IteratorStatus () {
+  return dbIterator->status();
+}
+
 void levelup::Iterator::IteratorEnd () {
   //TODO: could return it->status()
   delete dbIterator;
diff --git a/src/iterator.h b/src/iterator.h
index 5adf9e9..7d7a8a0 100644
--- a/src/iterator.h
+++ b/src/iterator.h
@@ -16,11 +16,12 @@ Handle<Value> CreateIterator (const Arguments& args);
 
 class Iterator : public node::ObjectWrap {
 public:
-  static void Init  ();
+  static void Init      ();
   static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
 
-  bool IteratorNext (string& key, string& value);
-  void IteratorEnd  ();
+  bool   IteratorNext   (string& key, string& value);
+  Status IteratorStatus ();
+  void   IteratorEnd    ();
 
 private:
   Iterator (
diff --git a/src/iterator_async.cc b/src/iterator_async.cc
index a7dad2e..122a8db 100644
--- a/src/iterator_async.cc
+++ b/src/iterator_async.cc
@@ -16,16 +16,18 @@ using namespace leveldb;
 
 void NextWorker::Execute () {
   ok = iterator->IteratorNext(key, value);
+  if (!ok)
+    status = iterator->IteratorStatus();
 }
 
 void NextWorker::HandleOKCallback () {
   if (ok) {
     Local<Value> argv[] = {
-        Local<Value>::New(Buffer::New((char*)key.data(), key.size())->handle_)
+        Local<Value>::New(Null())
+      , Local<Value>::New(Buffer::New((char*)key.data(), key.size())->handle_)
       , Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
     };
-    //delete value;
-    runCallback(callback, argv, 2);
+    runCallback(callback, argv, 3);
   } else {
     Local<Value> argv[0];
     runCallback(endCallback, argv, 0);
diff --git a/test/read-stream-test.js b/test/read-stream-test.js
index 9172466..93b4d7c 100644
--- a/test/read-stream-test.js
+++ b/test/read-stream-test.js
@@ -16,6 +16,7 @@ buster.testCase('ReadStream', {
 
       this.readySpy   = this.spy()
       this.dataSpy    = this.spy()
+      this.endSpy     = this.spy()
       this.sourceData = []
 
       for (var i = 0; i < 10; i++) {
@@ -26,8 +27,11 @@ buster.testCase('ReadStream', {
         })
       }
 
-      this.verify = function (done) {
+      this.verify = function (rs, done) {
+        assert.isFalse(rs.writable)
+        assert.isFalse(rs.readable)
         assert.equals(this.readySpy.callCount, 1, 'ReadStream emitted single "ready" event')
+        assert.equals(this.endSpy.callCount, 1, 'ReadStream emitted single "end" event')
         assert.equals(this.dataSpy.callCount, this.sourceData.length, 'ReadStream emitted correct number of "data" events')
         this.sourceData.forEach(function (d, i) {
           var call = this.dataSpy.getCall(i)
@@ -56,9 +60,12 @@ buster.testCase('ReadStream', {
           refute(err)
 
           var rs = db.readStream()
-          rs.on('ready' , this.readySpy)
-          rs.on('data'  , this.dataSpy)
-          rs.on('end'   , this.verify.bind(this, done))
+          assert.isFalse(rs.writable)
+          assert.isTrue(rs.readable)
+          rs.on('ready', this.readySpy)
+          rs.on('data' , this.dataSpy)
+          rs.on('end'  , this.endSpy)
+          rs.on('close', this.verify.bind(this, rs, done))
         }.bind(this))
       }.bind(this))
     }
@@ -74,9 +81,12 @@ buster.testCase('ReadStream', {
           refute(err)
 
           var rs = db.readStream()
-          rs.on('ready' , this.readySpy)
-          rs.on('data'  , this.dataSpy)
-          rs.on('end'   , this.verify.bind(this, done))
+          assert.isFalse(rs.writable)
+          assert.isTrue(rs.readable)
+          rs.on('ready', this.readySpy)
+          rs.on('data' , this.dataSpy)
+          rs.on('end'  , this.endSpy)
+          rs.on('close', this.verify.bind(this, rs, done))
         }.bind(this))
       }.bind(this)
 
@@ -92,4 +102,108 @@ buster.testCase('ReadStream', {
         })
       }.bind(this))
     }
+
+  , 'test pausing': function (done) {
+      var calls = 0
+        , rs
+        , pauseVerify = function () {
+            // NOTE: another one *will* slip through because the stream triggers an async read before triggering the event
+            assert.equals(calls, 6, 'stream should still be paused')
+            rs.resume()
+            pauseVerify.called = true
+          }
+        , onData = function () {
+            if (++calls == 5) {
+              rs.pause()
+              setTimeout(pauseVerify, 50)
+            }
+          }
+        , verify = function () {
+            assert.equals(calls, 10, 'onData was used in test')
+            assert(pauseVerify.called, 'pauseVerify was used in test')
+            this.verify(rs, done)
+          }.bind(this)
+
+      this.dataSpy = this.spy(onData) // so we can still verify
+
+      this.openTestDatabase(function (db) {
+        // execute
+        db.batch(this.sourceData.slice(), function (err) {
+          refute(err)
+
+          rs = db.readStream()
+          assert.isFalse(rs.writable)
+          assert.isTrue(rs.readable)
+          rs.on('ready', this.readySpy)
+          rs.on('data' , this.dataSpy)
+          rs.on('end'  , this.endSpy)
+          rs.on('close', verify.bind(this))
+
+        }.bind(this))
+      }.bind(this))
+    }
+
+  , 'test destroy() immediately': function (done) {
+      this.openTestDatabase(function (db) {
+        db.batch(this.sourceData.slice(), function (err) {
+          refute(err)
+
+          var rs = db.readStream()
+          assert.isFalse(rs.writable)
+          assert.isTrue(rs.readable)
+          rs.on('ready', this.readySpy)
+          rs.on('data' , this.dataSpy)
+          rs.on('end'  , this.endSpy)
+          rs.on('close', function () {
+            assert.isFalse(rs.writable)
+            assert.isFalse(rs.readable)
+            assert.equals(this.readySpy.callCount, 0, '"ready" event was not fired')
+            assert.equals(this.dataSpy.callCount , 0, '"data" event was not fired')
+            assert.equals(this.endSpy.callCount  , 0, '"end" event was not fired')
+            done()
+          }.bind(this))
+          rs.destroy()
+        }.bind(this))
+      }.bind(this))
+    }
+
+  , 'test destroy() half way through': function (done) {
+      this.openTestDatabase(function (db) {
+        db.batch(this.sourceData.slice(), function (err) {
+          refute(err)
+
+          var rs = db.readStream()
+            , endSpy = this.spy()
+            , calls = 0
+          this.dataSpy = this.spy(function () {
+            if (++calls == 5)
+              rs.destroy()
+          })
+          assert.isFalse(rs.writable)
+          assert.isTrue(rs.readable)
+          rs.on('ready', this.readySpy)
+          rs.on('data' , this.dataSpy)
+          rs.on('end'  , endSpy)
+          rs.on('close', function () {
+            assert.isFalse(rs.writable)
+            assert.isFalse(rs.readable)
+            assert.equals(this.readySpy.callCount, 1, 'ReadStream emitted single "ready" event')
+            // should do "data" 5 times ONLY
+            assert.equals(this.dataSpy.callCount, 5, 'ReadStream emitted correct number of "data" events (5)')
+            this.sourceData.slice(0, 5).forEach(function (d, i) {
+              var call = this.dataSpy.getCall(i)
+              assert(call)
+              if (call) {
+                assert.equals(call.args.length, 1, 'ReadStream "data" event #' + i + ' fired with 1 argument')
+                refute.isNull(call.args[0].key, 'ReadStream "data" event #' + i + ' argument has "key" property')
+                refute.isNull(call.args[0].value, 'ReadStream "data" event #' + i + ' argument has "value" property')
+                assert.equals(call.args[0].key, d.key, 'ReadStream "data" event #' + i + ' argument has correct "key"')
+                assert.equals(call.args[0].value, d.value, 'ReadStream "data" event #' + i + ' argument has correct "value"')
+              }
+            }.bind(this))
+            done()
+          }.bind(this))
+        }.bind(this))
+      }.bind(this))
+    }
 })
\ No newline at end of file
diff --git a/test/write-stream-test.js b/test/write-stream-test.js
index ed9968a..7997c85 100644
--- a/test/write-stream-test.js
+++ b/test/write-stream-test.js
@@ -25,7 +25,9 @@ buster.testCase('WriteStream', {
         })
       }
 
-      this.verify = function (db, done) {
+      this.verify = function (ws, db, done) {
+        assert.isFalse(ws.writable)
+        assert.isFalse(ws.readable)
         async.forEach(
             this.sourceData
           , function (data, callback) {
@@ -52,7 +54,7 @@ buster.testCase('WriteStream', {
         ws.on('error', function (err) {
           refute(err)
         })
-        ws.on('close', this.verify.bind(this, db, done))
+        ws.on('close', this.verify.bind(this, ws, db, done))
         this.sourceData.forEach(function (d) {
           ws.write(d)
         })
@@ -67,7 +69,7 @@ buster.testCase('WriteStream', {
         ws.on('error', function (err) {
           refute(err)
         })
-        ws.on('close', this.verify.bind(this, db, done))
+        ws.on('close', this.verify.bind(this, ws, db, done))
         async.forEachSeries(
             this.sourceData
           , function (d, callback) {
@@ -102,11 +104,13 @@ buster.testCase('WriteStream', {
       // second element should warn that the buffer isn't being cleared
       assert.isFalse(ws.write(this.sourceData[1]))
 
-      ws.once('close', this.verify.bind(this, db, done))
+      ws.once('close', this.verify.bind(this, ws, db, done))
       ws.once('drain', function () {
         this.sourceData.slice(2).forEach(function (d, i) {
-          assert[i != 0 ? 'isFalse' : 'isTrue'](ws.write(d), 'correct return value for element #' + i)
+          assert[i !== 0 ? 'isFalse' : 'isTrue'](ws.write(d), 'correct return value for element #' + i)
         })
+        assert.isTrue(ws.writable)
+        assert.isFalse(ws.readable)
         ws.end()
       }.bind(this))
 
@@ -115,4 +119,55 @@ buster.testCase('WriteStream', {
         refute(err)
       })
     }
+
+    // at the moment, destroySoon() is basically just end()
+  , 'test destroySoon()': function (done) {
+      this.openTestDatabase(function (db) {
+        var ws = db.writeStream()
+        ws.on('error', function (err) {
+          refute(err)
+        })
+        ws.on('close', this.verify.bind(this, ws, db, done))
+        this.sourceData.forEach(function (d) {
+          ws.write(d)
+        })
+        ws.once('ready', ws.destroySoon) // end after it's ready, nextTick makes this work OK
+      }.bind(this))
+    }
+
+  , 'test destroy()': function (done) {
+      var verify = function (ws, db) {
+        assert.isFalse(ws.writable)
+        async.forEach(
+            this.sourceData
+          , function (data, callback) {
+              db.get(data.key, function (err, value) {
+                // none of them should exist
+                assert(err)
+                refute(value)
+                callback()
+              })
+            }
+          , done
+        )
+      }
+
+      this.openTestDatabase(function (db) {
+        var ws = db.writeStream()
+        ws.on('error', function (err) {
+          refute(err)
+        })
+        assert.isTrue(ws.writable)
+        assert.isFalse(ws.readable)
+        ws.on('close', verify.bind(this, ws, db))
+        this.sourceData.forEach(function (d) {
+          ws.write(d)
+          assert.isTrue(ws.writable)
+          assert.isFalse(ws.readable)
+        })
+        assert.isTrue(ws.writable)
+        assert.isFalse(ws.readable)
+        ws.once('ready', ws.destroy)
+      }.bind(this))
+    }
 })
\ No newline at end of file

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git



More information about the Pkg-javascript-commits mailing list