[Pkg-javascript-commits] [node-leveldown] 98/492: added useBatch=false option on writeStream

Andrew Kelley andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:48 UTC 2014


This is an automated email from the git hooks/post-receive script.

andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.

commit 469b095afa0e1bc2a2ee34c511009e86da964057
Author: Rod Vagg <rod at vagg.org>
Date:   Fri Dec 14 18:58:00 2012 +1100

    added useBatch=false option on writeStream
---
 README.md                 |  3 +++
 lib/util.js               |  5 ++++-
 lib/write-stream.js       | 28 +++++++++++++++++-----------
 test/write-stream-test.js | 35 +++++++++++++++++++++++++++++++++++
 4 files changed, 59 insertions(+), 12 deletions(-)

diff --git a/README.md b/README.md
index 94bfba6..a945a31 100644
--- a/README.md
+++ b/README.md
@@ -319,6 +319,9 @@ db.writeStream()
 
 The standard `write()`, `end()`, `destroy()` and `destroySoon()` methods are implemented on the WriteStream. `'drain'`, `'error'`, `'close'` and `'pipe'` events are emitted.
 
+Additionally, you can supply an options object as the first parameter to `writeStream()` with the following option:
+
+* `'useBatch'`: a boolean (defaults to `true`) if set to `false`, your *WriteStream* will avoid the use of `batch()` and use `put()` to write all data to the database. Since `batch()` is much quicker than multiple `put()` operations, you are advised to leave this as `true` unless you have a good reason to change it.
 
 #### Pipes and Node Stream compatibility
 
diff --git a/lib/util.js b/lib/util.js
index 4531c5e..6c8550b 100644
--- a/lib/util.js
+++ b/lib/util.js
@@ -22,7 +22,10 @@ var toBuffer = function (data, encoding) {
     }
 
   , copy = function (srcdb, dstdb, callback) {
-      srcdb.readStream().pipe(dstdb.writeStream().on('close', callback))
+      srcdb.readStream()
+        .pipe(dstdb.writeStream({useBatch:false}))
+        .on('close', callback ? callback : function () {})
+        .on('error', callback ? callback : function (err) { throw err })
     }
 
 module.exports = {
diff --git a/lib/write-stream.js b/lib/write-stream.js
index 0eeece9..7ab9f17 100644
--- a/lib/write-stream.js
+++ b/lib/write-stream.js
@@ -2,6 +2,7 @@
 
 var Stream       = require('stream').Stream
   , concatStream = require('concat-stream')
+  , async        = require('async')
 
   , extend       = require('./util').extend
 
@@ -77,8 +78,7 @@ WriteStream.prototype = {
     }
 
   , _process: function() {
-      var entry
-        , cb = function (err) {
+      var cb = function (err) {
             if (!this.writable)
               return
             if (this._status != 'closed')
@@ -89,6 +89,7 @@ WriteStream.prototype = {
             }
             this._process()
           }.bind(this)
+        , buffer
 
       if (this._status != 'ready' && this.writable) {
         if (this._buffer.length && this._status != 'closed')
@@ -97,18 +98,23 @@ WriteStream.prototype = {
       }
 
       if (this._buffer.length && this.writable) {
-        if (this._buffer.length == 1) {
-          entry = this._buffer.pop()
-          if (entry.key !== undefined && entry.value !== undefined) {
-            this._status = 'writing'
-            this._db.put(entry.key, entry.value, cb)
-          }
+        this._status = 'writing'
+        buffer = this._buffer
+        this._buffer = []
+        if ((this._options.useBatch != null && !this._options.useBatch) || this._buffer.length == 1) {
+          async.forEach(
+              buffer
+            , function (entry, cb) {
+                if (entry.key === undefined || entry.value === undefined)
+                  return cb()
+                this._db.put(entry.key, entry.value, cb)
+              }.bind(this)
+            , cb
+          )
         } else {
-          this._status = 'writing'
-          this._db.batch(this._buffer.map(function (d) {
+          this._db.batch(buffer.map(function (d) {
             return { type: 'put', key: d.key, value: d.value }
           }), cb)
-          this._buffer = []
         }
         if (this._writeBlock) {
           this._writeBlock = false
diff --git a/test/write-stream-test.js b/test/write-stream-test.js
index ee4446d..1e36232 100644
--- a/test/write-stream-test.js
+++ b/test/write-stream-test.js
@@ -86,6 +86,41 @@ buster.testCase('WriteStream', {
       }.bind(this))
     }
 
+    // exactly the same as previous but should avoid batch() writes
+  , 'test WriteStream with async writes and useBatch=false': function (done) {
+      this.openTestDatabase(function (db) {
+        db.batch = function () {
+          Array.prototype.slice.call(arguments).forEach(function (a) {
+            if (typeof a == 'function') a('Should not call batch()')
+          })
+        }
+
+        var ws = db.writeStream({ useBatch: false })
+
+        ws.on('error', function (err) {
+          refute(err)
+        })
+        ws.on('close', this.verify.bind(this, ws, db, done))
+        async.forEachSeries(
+            this.sourceData
+          , function (d, callback) {
+              if (d.key % 3) {
+                setTimeout(function () {
+                  ws.write(d)
+                  callback()
+                }, 10)
+              } else {
+                ws.write(d)
+                callback()
+              }
+            }
+          , function () {
+              ws.end()
+            }
+        )
+      }.bind(this))
+    }
+
     // at the moment, destroySoon() is basically just end()
   , 'test destroySoon()': function (done) {
       this.openTestDatabase(function (db) {

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git



More information about the Pkg-javascript-commits mailing list