[Pkg-javascript-commits] [node-leveldown] 279/492: auto cleanup of open iterators on db.close()

Andrew Kelley andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:14:08 UTC 2014


This is an automated email from the git hooks/post-receive script.

andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.

commit 0557d3297763663f75fffa6e05772c72241bf27c
Author: Rod Vagg <rod at vagg.org>
Date:   Thu Mar 21 13:49:24 2013 +1100

    auto cleanup of open iterators on db.close()
    
    closes #22
---
 src/database.cc                        | 71 ++++++++++++++++++++++++++++++-
 src/database.h                         |  8 +++-
 src/iterator.cc                        | 28 +++++++++----
 src/iterator.h                         | 14 ++++---
 src/iterator_async.cc                  |  6 +++
 src/iterator_async.h                   |  1 +
 test/cleanup-hanging-iterators-test.js | 77 ++++++++++++++++++++++++++++++++++
 test/common.js                         |  1 +
 8 files changed, 189 insertions(+), 17 deletions(-)

diff --git a/src/database.cc b/src/database.cc
index b46825f..2614643 100644
--- a/src/database.cc
+++ b/src/database.cc
@@ -21,6 +21,8 @@ namespace leveldown {
 
 Database::Database (char* location) : location(location) {
   db = NULL;
+  currentIteratorId = 0;
+  pendingCloseWorker = NULL;
 };
 
 Database::~Database () {
@@ -87,6 +89,19 @@ void Database::ReleaseSnapshot (const leveldb::Snapshot* snapshot) {
   return db->ReleaseSnapshot(snapshot);
 }
 
+void Database::ReleaseIterator (uint32_t id) {
+  // called each time an Iterator is End()ed, in the main thread
+  // we have to remove our reference to it and if it's the last iterator
+  // we have to invoke a pending CloseWorker if there is one
+  // if there is a pending CloseWorker it means that we're waiting for
+  // iterators to end before we can close them
+  iterators.erase(id);
+  if (iterators.size() == 0 && pendingCloseWorker != NULL) {
+    AsyncQueueWorker((AsyncWorker*)pendingCloseWorker);
+    pendingCloseWorker = NULL;
+  }
+}
+
 void Database::CloseDatabase () {
   delete db;
   db = NULL;
@@ -206,7 +221,45 @@ v8::Handle<v8::Value> Database::Close (const v8::Arguments& args) {
   LD_METHOD_SETUP_COMMON_ONEARG(close)
 
   CloseWorker* worker = new CloseWorker(database, callback);
-  AsyncQueueWorker(worker);
+
+  if (database->iterators.size() > 0) {
+    // yikes, we still have iterators open! naughty naughty.
+    // we have to queue up a CloseWorker and manually close each of them.
+    // the CloseWorker will be invoked once they are all cleaned up
+    database->pendingCloseWorker = worker;
+
+    for (
+        std::map< uint32_t, v8::Persistent<v8::Object> >::iterator it
+            = database->iterators.begin()
+      ; it != database->iterators.end()
+      ; ++it) {
+
+        // for each iterator still open, first check if it's already in
+        // the process of ending (ended==true means an async End() is
+        // in progress), if not, then we call End() with an empty callback
+        // function and wait for it to hit ReleaseIterator() where our
+        // CloseWorker will be invoked
+
+        leveldown::Iterator* iterator =
+            node::ObjectWrap::Unwrap<leveldown::Iterator>(it->second);
+
+        if (!iterator->ended) {
+          v8::Local<v8::Function> end =
+              v8::Local<v8::Function>::Cast(it->second->Get(
+                  v8::String::NewSymbol("end")));
+          v8::Local<v8::Value> argv[] = {
+              v8::FunctionTemplate::New()->GetFunction() // empty callback
+          };
+          v8::TryCatch try_catch;
+          end->Call(it->second, 1, argv);
+          if (try_catch.HasCaught()) {
+            node::FatalException(try_catch);
+          }
+        }
+    }
+  } else {
+    AsyncQueueWorker(worker);
+  }
 
   return v8::Undefined();
 }
@@ -424,12 +477,26 @@ v8::Handle<v8::Value> Database::ApproximateSize (const v8::Arguments& args) {
 v8::Handle<v8::Value> Database::Iterator (const v8::Arguments& args) {
   v8::HandleScope scope;
 
+  Database* database = node::ObjectWrap::Unwrap<Database>(args.This());
+
   v8::Local<v8::Object> optionsObj;
   if (args.Length() > 0 && args[0]->IsObject()) {
     optionsObj = v8::Local<v8::Object>::Cast(args[0]);
   }
 
-  return scope.Close(Iterator::NewInstance(args.This(), optionsObj));
+  // each iterator gets a unique id for this Database, so we can
+  // easily store & lookup on our `iterators` map
+  uint32_t id = database->currentIteratorId++;
+  v8::Handle<v8::Object> iterator = Iterator::NewInstance(
+      args.This()
+    , v8::Number::New(id)
+    , optionsObj
+  );
+  // register our iterator
+  database->iterators[id] =
+      node::ObjectWrap::Unwrap<leveldown::Iterator>(iterator)->handle_;
+
+  return scope.Close(iterator);
 }
 
 
diff --git a/src/database.h b/src/database.h
index c2c1a0f..bf5027c 100644
--- a/src/database.h
+++ b/src/database.h
@@ -6,6 +6,7 @@
 #ifndef LD_DATABASE_H
 #define LD_DATABASE_H
 
+#include <map>
 #include <node.h>
 
 #include "leveldb/db.h"
@@ -56,13 +57,18 @@ public:
   void ReleaseSnapshot (const leveldb::Snapshot* snapshot);
   void CloseDatabase ();
   const char* Location() const;
+  void ReleaseIterator (uint32_t id);
 
-private:
   Database (char* location);
   ~Database ();
 
+private:
   leveldb::DB* db;
   char* location;
+  uint32_t currentIteratorId;
+  void(*pendingCloseWorker);
+
+  std::map< uint32_t, v8::Persistent<v8::Object> > iterators;
 
   static v8::Persistent<v8::Function> constructor;
   static void WriteDoing(uv_work_t *req);
diff --git a/src/iterator.cc b/src/iterator.cc
index b6b9cd4..e5fdd22 100644
--- a/src/iterator.cc
+++ b/src/iterator.cc
@@ -14,6 +14,7 @@ namespace leveldown {
 
 Iterator::Iterator (
     Database* database
+  , uint32_t id
   , leveldb::Slice* start
   , std::string* end
   , bool reverse
@@ -25,6 +26,7 @@ Iterator::Iterator (
   , bool valueAsBuffer
   , v8::Persistent<v8::Value> startPtr
 ) : database(database)
+  , id(id)
   , start(start)
   , end(end)
   , reverse(reverse)
@@ -104,6 +106,10 @@ void Iterator::IteratorEnd () {
   dbIterator = NULL;
 }
 
+void Iterator::Release () {
+  database->ReleaseIterator(id);
+}
+
 void checkEndCallback (Iterator* iterator) {
   iterator->nexting = false;
   if (iterator->endWorker != NULL) {
@@ -142,7 +148,7 @@ v8::Handle<v8::Value> Iterator::Next (const v8::Arguments& args) {
   iterator->nexting = true;
   AsyncQueueWorker(worker);
 
-  return v8::Undefined();
+  return scope.Close(args.Holder());
 }
 
 v8::Handle<v8::Value> Iterator::End (const v8::Arguments& args) {
@@ -175,7 +181,7 @@ v8::Handle<v8::Value> Iterator::End (const v8::Arguments& args) {
     AsyncQueueWorker(worker);
   }
 
-  return v8::Undefined();
+  return scope.Close(args.Holder());
 }
 
 v8::Persistent<v8::Function> Iterator::constructor;
@@ -197,8 +203,9 @@ void Iterator::Init () {
       tpl->GetFunction());
 }
 
-v8::Handle<v8::Value> Iterator::NewInstance (
+v8::Handle<v8::Object> Iterator::NewInstance (
         v8::Handle<v8::Object> database
+      , v8::Handle<v8::Number> id
       , v8::Handle<v8::Object> optionsObj
     ) {
 
@@ -206,11 +213,11 @@ v8::Handle<v8::Value> Iterator::NewInstance (
   v8::Local<v8::Object> instance;
 
   if (optionsObj.IsEmpty()) {
-    v8::Handle<v8::Value> argv[1] = { database };
-    instance = constructor->NewInstance(1, argv);
-  } else {
-    v8::Handle<v8::Value> argv[2] = { database, optionsObj };
+    v8::Handle<v8::Value> argv[2] = { database, id };
     instance = constructor->NewInstance(2, argv);
+  } else {
+    v8::Handle<v8::Value> argv[3] = { database, id, optionsObj };
+    instance = constructor->NewInstance(3, argv);
   }
 
   return scope.Close(instance);
@@ -229,10 +236,12 @@ v8::Handle<v8::Value> Iterator::New (const v8::Arguments& args) {
   std::string* end = NULL;
   int limit = -1;
 
+  v8::Local<v8::Value> id = args[1];
+
   v8::Local<v8::Object> optionsObj;
 
-  if (args.Length() > 1 && args[1]->IsObject()) {
-    optionsObj = v8::Local<v8::Object>::Cast(args[1]);
+  if (args.Length() > 1 && args[2]->IsObject()) {
+    optionsObj = v8::Local<v8::Object>::Cast(args[2]);
 
     if (optionsObj->Has(option_start)
         && (node::Buffer::HasInstance(optionsObj->Get(option_start))
@@ -268,6 +277,7 @@ v8::Handle<v8::Value> Iterator::New (const v8::Arguments& args) {
 
   Iterator* iterator = new Iterator(
       database
+    , (uint32_t)id->Int32Value()
     , start
     , end
     , reverse
diff --git a/src/iterator.h b/src/iterator.h
index 6df6957..fe9545c 100644
--- a/src/iterator.h
+++ b/src/iterator.h
@@ -28,17 +28,15 @@ v8::Handle<v8::Value> CreateIterator (const v8::Arguments& args);
 class Iterator : public node::ObjectWrap {
 public:
   static void Init ();
-  static v8::Handle<v8::Value> NewInstance (
+  static v8::Handle<v8::Object> NewInstance (
       v8::Handle<v8::Object> database
+    , v8::Handle<v8::Number> id
     , v8::Handle<v8::Object> optionsObj
   );
 
-  bool IteratorNext (std::string& key, std::string& value);
-  leveldb::Status IteratorStatus ();
-  void IteratorEnd ();
-
   Iterator (
       Database* database
+    , uint32_t id
     , leveldb::Slice* start
     , std::string* end
     , bool reverse
@@ -53,8 +51,14 @@ public:
 
   ~Iterator ();
 
+  bool IteratorNext (std::string& key, std::string& value);
+  leveldb::Status IteratorStatus ();
+  void IteratorEnd ();
+  void Release ();
+
 private:
   Database* database;
+  uint32_t id;
   leveldb::Iterator* dbIterator;
   leveldb::ReadOptions* options;
   leveldb::Slice* start;
diff --git a/src/iterator_async.cc b/src/iterator_async.cc
index 243048b..2e39953 100644
--- a/src/iterator_async.cc
+++ b/src/iterator_async.cc
@@ -81,4 +81,10 @@ void EndWorker::Execute () {
   iterator->IteratorEnd();
 }
 
+void EndWorker::HandleOKCallback () {
+  iterator->Release();
+  v8::Local<v8::Value> argv[0];
+  LD_RUN_CALLBACK(callback, argv, 0);  
+}
+
 } // namespace leveldown
diff --git a/src/iterator_async.h b/src/iterator_async.h
index 9d44593..1c5e3fb 100644
--- a/src/iterator_async.h
+++ b/src/iterator_async.h
@@ -42,6 +42,7 @@ public:
 
   virtual ~EndWorker ();
   virtual void Execute ();
+  virtual void HandleOKCallback ();
 
 private:
   Iterator* iterator;
diff --git a/test/cleanup-hanging-iterators-test.js b/test/cleanup-hanging-iterators-test.js
new file mode 100644
index 0000000..b97890c
--- /dev/null
+++ b/test/cleanup-hanging-iterators-test.js
@@ -0,0 +1,77 @@
+const test       = require('tap').test
+    , testCommon = require('./common')
+    , leveldown  = require('../')
+
+    , makeTest   = function (name, testFn) {
+        test(name, function (t) {
+          testCommon.cleanup(function () {
+            var db   = leveldown(testCommon.location())
+              , done = function () {
+                  db.close(function (err) {
+                    t.notOk(err, 'no error from close()')
+                    testCommon.cleanup(t.end.bind(t))
+                  })
+                }
+            db.open(function (err) {
+              t.notOk(err, 'no error from open()')
+              db.batch(
+                  [
+                      { type: 'put', key: 'one', value: '1' }
+                    , { type: 'put', key: 'two', value: '2' }
+                    , { type: 'put', key: 'three', value: '3' }
+                  ]
+                , function (err) {
+                    t.notOk(err, 'no error from batch()')
+                    testFn(db, t, done)
+                  }
+              )
+            })
+          })
+        })
+      }
+
+makeTest('test ended iterator', function (db, t, done) {
+  // standard iterator with an end() properly called, easy
+
+  var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false })
+  it.next(function (err, key, value) {
+    t.notOk(err, 'no error from next()')
+    t.equal(key, 'one', 'correct key')
+    t.equal(value, '1', 'correct value')
+    it.end(function (err) {
+      t.notOk(err, 'no error from next()')
+      done()
+    })
+  })
+})
+
+makeTest('test non-ended iterator', function (db, t, done) {
+  // no end() call on our iterator, cleanup should crash Node if not handled properly
+  var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false })
+  it.next(function (err, key, value) {
+    t.notOk(err, 'no error from next()')
+    t.equal(key, 'one', 'correct key')
+    t.equal(value, '1', 'correct value')
+    done()
+  })
+})
+
+makeTest('test multiple non-ended iterators', function (db, t, done) {
+  // no end() call on our iterator, cleanup should crash Node if not handled properly
+  db.iterator()
+  db.iterator().next(function () {})
+  db.iterator().next(function () {})
+  db.iterator().next(function () {})
+  setTimeout(done, 50)
+})
+
+makeTest('test ending iterators', function (db, t, done) {
+  // no end() call on our iterator, cleanup should crash Node if not handled properly
+  var it1 = db.iterator().next(function () {
+        it1.end(function () {})
+      })
+    , it2 = db.iterator().next(function () {
+        it2.end(function () {})
+        done()
+      })
+})
\ No newline at end of file
diff --git a/test/common.js b/test/common.js
index 204ec92..331ad83 100644
--- a/test/common.js
+++ b/test/common.js
@@ -65,6 +65,7 @@ var dbidx = 0
 
 module.exports = {
     location       : location
+  , cleanup        : cleanup
   , lastLocation   : lastLocation
   , setUp          : setUp
   , tearDown       : tearDown

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git



More information about the Pkg-javascript-commits mailing list