[Pkg-javascript-commits] [node-leveldown] 200/492: ensure iterator end & next don't conflict

Andrew Kelley andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:59 UTC 2014


This is an automated email from the git hooks/post-receive script.

andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.

commit f4c7ee07a3241eaf3d6e3779da6e56fa1a2886fd
Author: Rod Vagg <rod at vagg.org>
Date:   Wed Feb 13 18:58:33 2013 +1100

    ensure iterator end & next don't conflict
    
    Race condition possible when you have a next() executing but end() is
    also called. In the main JS thread we can create a barrier to protect
    against this condition.
---
 lib/read-stream.js       |  5 +++++
 package.json             |  2 +-
 src/async.cc             |  5 -----
 src/async.h              |  2 --
 src/batch.cc             |  3 ---
 src/batch.h              |  3 +--
 src/database.cc          |  4 ----
 src/database.h           |  1 -
 src/database_async.cc    |  4 ----
 src/database_async.h     |  5 +++--
 src/iterator.cc          | 29 +++++++++++++++++++++++++----
 src/iterator.h           | 12 +++++++++---
 src/iterator_async.cc    |  8 ++++----
 src/iterator_async.h     |  5 +++--
 src/levelup.cc           | 28 +++++++++++++++++-----------
 test/read-stream-test.js | 27 ++++++++++++++++++++++++++-
 16 files changed, 94 insertions(+), 49 deletions(-)

diff --git a/lib/read-stream.js b/lib/read-stream.js
index c149b7a..23c5a21 100644
--- a/lib/read-stream.js
+++ b/lib/read-stream.js
@@ -133,15 +133,20 @@ ReadStream.prototype._onData = function (err, key, value) {
 }
 
 ReadStream.prototype._cleanup = function (err) {
+  if (this._status == 'ended')
+    return err && this.emit('error', err)
+
   var s = this._status
   this._status = 'ended'
   this.readable = false
+
   if (this._iterator) {
     this._iterator.end(function () {
       this.emit('close')
     }.bind(this))
   } else
     this.emit('close')
+
   if (err)
     this.emit('error', err)
   else (s != 'destroyed')
diff --git a/package.json b/package.json
index 8083dbd..c3ac6c6 100644
--- a/package.json
+++ b/package.json
@@ -37,7 +37,7 @@
       , "tar"             : "*"
       , "mkfiletree"      : "*"
       , "readfiletree"    : "*"
-      , "slow-stream"     : ">=0.0.3"
+      , "slow-stream"     : ">=0.0.4"
       , "delayed"         : "*"
       , "boganipsum"      : "*"
       , "du"              : "*"
diff --git a/src/async.cc b/src/async.cc
index 3642ffe..20353ab 100644
--- a/src/async.cc
+++ b/src/async.cc
@@ -3,14 +3,9 @@
  * MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
  */
 
-#include <cstdlib>
 #include <node.h>
-#include <node_buffer.h>
-#include <iostream>
-#include <pthread.h>
 
 #include "database.h"
-
 #include "levelup.h"
 #include "async.h"
 #include "batch.h"
diff --git a/src/async.h b/src/async.h
index 739b3f6..5379b6d 100644
--- a/src/async.h
+++ b/src/async.h
@@ -6,8 +6,6 @@
 #ifndef LU_ASYNC_H
 #define LU_ASYNC_H
 
-#include <cstdlib>
-#include <vector>
 #include <node.h>
 
 using namespace std;
diff --git a/src/batch.cc b/src/batch.cc
index 7ae3a90..bb6cb5e 100644
--- a/src/batch.cc
+++ b/src/batch.cc
@@ -3,9 +3,6 @@
  * MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
  */
 
-#include <cstdlib>
-#include <iostream>
-
 #include "leveldb/write_batch.h"
 
 #include "batch.h"
diff --git a/src/batch.h b/src/batch.h
index 09f918b..3bcc749 100644
--- a/src/batch.h
+++ b/src/batch.h
@@ -6,10 +6,9 @@
 #ifndef LU_BATCH_H
 #define LU_BATCH_H
 
-#include <cstdlib>
+#include "leveldb/write_batch.h"
 
 #include "database.h"
-#include "leveldb/write_batch.h"
 
 class BatchOp {
 public:
diff --git a/src/database.cc b/src/database.cc
index 25957b0..1e97fff 100644
--- a/src/database.cc
+++ b/src/database.cc
@@ -3,12 +3,8 @@
  * MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
  */
 
-#include <cstdlib>
-#include <vector>
 #include <node.h>
 #include <node_buffer.h>
-#include <iostream>
-#include <pthread.h>
 
 #include "leveldb/db.h"
 
diff --git a/src/database.h b/src/database.h
index 3d10bb5..1b953ff 100644
--- a/src/database.h
+++ b/src/database.h
@@ -6,7 +6,6 @@
 #ifndef LU_DATABASE_H
 #define LU_DATABASE_H
 
-#include <cstdlib>
 #include <node.h>
 
 #include "leveldb/db.h"
diff --git a/src/database_async.cc b/src/database_async.cc
index 4a527ee..311644d 100644
--- a/src/database_async.cc
+++ b/src/database_async.cc
@@ -3,14 +3,10 @@
  * MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
  */
 
-#include <cstdlib>
 #include <node.h>
 #include <node_buffer.h>
-#include <iostream>
-#include <pthread.h>
 
 #include "database.h"
-
 #include "levelup.h"
 #include "async.h"
 #include "database_async.h"
diff --git a/src/database_async.h b/src/database_async.h
index d3f2fb2..cf97dd5 100644
--- a/src/database_async.h
+++ b/src/database_async.h
@@ -6,12 +6,13 @@
 #ifndef LU_DATABASE_ASYNC_H
 #define LU_DATABASE_ASYNC_H
 
-#include <cstdlib>
 #include <vector>
 #include <node.h>
+
+#include "leveldb/cache.h"
+
 #include "async.h"
 #include "batch.h"
-#include "leveldb/cache.h"
 
 using namespace std;
 using namespace v8;
diff --git a/src/iterator.cc b/src/iterator.cc
index 76220b2..7775855 100644
--- a/src/iterator.cc
+++ b/src/iterator.cc
@@ -3,11 +3,8 @@
  * MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
  */
 
-#include <cstdlib>
 #include <node.h>
 #include <node_buffer.h>
-#include <iostream>
-#include <pthread.h>
 
 #include "database.h"
 #include "iterator.h"
@@ -68,17 +65,36 @@ void levelup::Iterator::IteratorEnd () {
   dbIterator = NULL;
 }
 
+void checkEndCallback (levelup::Iterator* iterator) {
+  iterator->nextCalls--;
+  if (iterator->nextCalls == 0 && iterator->endWorker != NULL) {
+    AsyncQueueWorker(iterator->endWorker);
+    iterator->endWorker = NULL;
+  }
+}
+
 //void *ctx, void (*callback)(void *ctx, Slice key, Slice value)
 Handle<Value> levelup::Iterator::Next (const Arguments& args) {
   HandleScope scope;
   Iterator* iterator = ObjectWrap::Unwrap<Iterator>(args.This());
   Persistent<Function> endCallback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
   Persistent<Function> dataCallback = Persistent<Function>::New(Local<Function>::Cast(args[1]));
+
+  if (iterator->ended) {
+    Local<Value> argv[] = {
+        Local<Value>::New(Exception::Error(String::New("Cannot call next() after end()")))
+    };
+    RunCallback(dataCallback, argv, 1);
+    return Undefined();
+  }
+
   NextWorker* worker = new NextWorker(
       iterator
     , dataCallback
     , endCallback
+    , checkEndCallback
   );
+  iterator->nextCalls++;
   AsyncQueueWorker(worker);
   return Undefined();
 }
@@ -91,7 +107,12 @@ Handle<Value> levelup::Iterator::End (const Arguments& args) {
       iterator
     , endCallback
   );
-  AsyncQueueWorker(worker);
+  iterator->ended = true;
+  if (iterator->nextCalls == 0) {
+    AsyncQueueWorker(worker);
+  } else {
+    iterator->endWorker = worker;
+  }
   return Undefined();
 }
 
diff --git a/src/iterator.h b/src/iterator.h
index bbefb3a..c49f6d9 100644
--- a/src/iterator.h
+++ b/src/iterator.h
@@ -6,11 +6,11 @@
 #ifndef LU_ITERATOR_H
 #define LU_ITERATOR_H
 
-#include <cstdlib>
 #include <node.h>
 
 #include "levelup.h"
 #include "database.h"
+#include "async.h"
 
 using namespace std;
 using namespace v8;
@@ -60,10 +60,13 @@ public:
     , keyAsBuffer(keyAsBuffer)
     , valueAsBuffer(valueAsBuffer)
   {
-    options = new ReadOptions();
+    options    = new ReadOptions();
     options->fill_cache = fillCache;
     dbIterator = NULL;
-    count = 0;
+    count      = 0;
+    nextCalls  = 0;
+    ended      = false;
+    endWorker  = NULL;
   };
 
   ~Iterator () {
@@ -89,6 +92,9 @@ private:
 public:
   bool keyAsBuffer;
   bool valueAsBuffer;
+  int nextCalls;
+  bool ended;
+  AsyncWorker* endWorker;
 
 private:
   bool GetIterator ();
diff --git a/src/iterator_async.cc b/src/iterator_async.cc
index 08fd5c4..ab8dc6a 100644
--- a/src/iterator_async.cc
+++ b/src/iterator_async.cc
@@ -3,14 +3,10 @@
  * MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
  */
 
-#include <cstdlib>
 #include <node.h>
 #include <node_buffer.h>
-#include <iostream>
-#include <pthread.h>
 
 #include "database.h"
-
 #include "levelup.h"
 #include "async.h"
 #include "iterator_async.h"
@@ -26,9 +22,11 @@ NextWorker::NextWorker (
     levelup::Iterator* iterator
   , Persistent<Function> dataCallback
   , Persistent<Function> endCallback
+  , void (*localCallback)(levelup::Iterator*)
 ) : AsyncWorker(database, dataCallback)
   , iterator(iterator)
   , endCallback(endCallback)
+  , localCallback(localCallback)
 {};
 
 NextWorker::~NextWorker () {}
@@ -61,6 +59,8 @@ void NextWorker::HandleOKCallback () {
     Local<Value> argv[0];
     RunCallback(endCallback, argv, 0);
   }
+
+  localCallback(iterator);
 }
 
 /** END WORKER **/
diff --git a/src/iterator_async.h b/src/iterator_async.h
index 8690730..d62ea31 100644
--- a/src/iterator_async.h
+++ b/src/iterator_async.h
@@ -6,9 +6,8 @@
 #ifndef LU_ITERATOR_ASYNC_H
 #define LU_ITERATOR_ASYNC_H
 
-#include <cstdlib>
-#include <vector>
 #include <node.h>
+
 #include "async.h"
 #include "iterator.h"
 
@@ -22,6 +21,7 @@ public:
       levelup::Iterator* iterator
     , Persistent<Function> dataCallback
     , Persistent<Function> endCallback
+    , void (*localCallback)(levelup::Iterator*)
   );
 
   virtual ~NextWorker ();
@@ -31,6 +31,7 @@ public:
 private:
   levelup::Iterator* iterator;
   Persistent<Function> endCallback;
+  void (*localCallback)(levelup::Iterator*);
   string key;
   string value;
   bool ok;
diff --git a/src/levelup.cc b/src/levelup.cc
index 7a893c5..e920959 100644
--- a/src/levelup.cc
+++ b/src/levelup.cc
@@ -9,27 +9,33 @@
 #include "database.h"
 #include "iterator.h"
 
-using namespace v8;
-using namespace node;
-using namespace levelup;
-
-void Init (Handle<Object> target) {
+void Init (v8::Handle<v8::Object> exports) {
   Database::Init();
   levelup::Iterator::Init();
 
-  target->Set(String::NewSymbol("createDatabase"), FunctionTemplate::New(CreateDatabase)->GetFunction());
-  target->Set(String::NewSymbol("createIterator"), FunctionTemplate::New(CreateIterator)->GetFunction());
+  exports->Set(
+      v8::String::NewSymbol("createDatabase")
+    , v8::FunctionTemplate::New(CreateDatabase)->GetFunction()
+  );
+  exports->Set(
+      v8::String::NewSymbol("createIterator")
+    , v8::FunctionTemplate::New(levelup::CreateIterator)->GetFunction()
+  );
 }
 
 NODE_MODULE(levelup, Init)
 
 // util
 
-void RunCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
-  TryCatch try_catch;
+void RunCallback (
+      v8::Persistent<v8::Function> callback
+    , v8::Local<v8::Value> argv[], int length
+  ) {
+
+  v8::TryCatch try_catch;
  
-  callback->Call(Context::GetCurrent()->Global(), length, argv);
+  callback->Call(v8::Context::GetCurrent()->Global(), length, argv);
   if (try_catch.HasCaught()) {
-    FatalException(try_catch);
+    node::FatalException(try_catch);
   }
 }
diff --git a/test/read-stream-test.js b/test/read-stream-test.js
index 680d4cd..ae4d95b 100644
--- a/test/read-stream-test.js
+++ b/test/read-stream-test.js
@@ -12,6 +12,8 @@ var buster     = require('buster')
   , rimraf     = require('rimraf')
   , async      = require('async')
 
+  , bigBlob    = Array.apply(null, Array(1024 * 100)).map(function () { return 'aaaaaaaaaa' }).join('')
+
 buster.testCase('ReadStream', {
     'setUp': common.readStreamSetUp
 
@@ -495,7 +497,7 @@ buster.testCase('ReadStream', {
             d.readStream
               .pipe(new SlowStream({ maxWriteInterval: 5 }))
               .on('data', d.spy)
-              .on('end', delayed.delayed(callback, 0.05))
+              .on('close', delayed.delayed(callback, 0.05))
           }
         , open       = function (reopen, location, callback) {
             levelup(location, { createIfMissing: !reopen, errorIfExists: !reopen }, callback)
@@ -630,4 +632,27 @@ buster.testCase('ReadStream', {
         }.bind(this))
       }.bind(this))
     }
+
+    // can, fairly reliably, trigger a core dump if next/end isn't
+    // protected properly
+    // the use of large blobs means that next() takes time to return
+    // so we should be able to slip in an end() while it's working
+  , 'test iterator next/end race condition': function (done) {
+      var data = []
+        , i = 5
+        , v
+
+      while (i--) {
+        v = bigBlob + i
+        data.push({ type: 'put', key: v, value: v })
+      }
+
+      this.openTestDatabase(function (db) {
+        db.batch(data, function (err) {
+          refute(!!err)
+          var rs = db.readStream().on('close', done)
+          rs.once('data', rs.destroy.bind(rs))
+        }.bind(this))
+      }.bind(this))
+    }
 })
\ No newline at end of file

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git



More information about the Pkg-javascript-commits mailing list