[Pkg-javascript-commits] [node-leveldown] 279/492: auto cleanup of open iterators on db.close()
Andrew Kelley
andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:14:08 UTC 2014
This is an automated email from the git hooks/post-receive script.
andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.
commit 0557d3297763663f75fffa6e05772c72241bf27c
Author: Rod Vagg <rod at vagg.org>
Date: Thu Mar 21 13:49:24 2013 +1100
auto cleanup of open iterators on db.close()
closes #22
---
src/database.cc | 71 ++++++++++++++++++++++++++++++-
src/database.h | 8 +++-
src/iterator.cc | 28 +++++++++----
src/iterator.h | 14 ++++---
src/iterator_async.cc | 6 +++
src/iterator_async.h | 1 +
test/cleanup-hanging-iterators-test.js | 77 ++++++++++++++++++++++++++++++++++
test/common.js | 1 +
8 files changed, 189 insertions(+), 17 deletions(-)
diff --git a/src/database.cc b/src/database.cc
index b46825f..2614643 100644
--- a/src/database.cc
+++ b/src/database.cc
@@ -21,6 +21,8 @@ namespace leveldown {
Database::Database (char* location) : location(location) {
db = NULL;
+ currentIteratorId = 0;
+ pendingCloseWorker = NULL;
};
Database::~Database () {
@@ -87,6 +89,19 @@ void Database::ReleaseSnapshot (const leveldb::Snapshot* snapshot) {
return db->ReleaseSnapshot(snapshot);
}
+void Database::ReleaseIterator (uint32_t id) {
+ // called each time an Iterator is End()ed, in the main thread
+ // we have to remove our reference to it and if it's the last iterator
+ // we have to invoke a pending CloseWorker if there is one
+ // if there is a pending CloseWorker it means that we're waiting for
+ // iterators to end before we can close them
+ iterators.erase(id);
+ if (iterators.size() == 0 && pendingCloseWorker != NULL) {
+ AsyncQueueWorker((AsyncWorker*)pendingCloseWorker);
+ pendingCloseWorker = NULL;
+ }
+}
+
void Database::CloseDatabase () {
delete db;
db = NULL;
@@ -206,7 +221,45 @@ v8::Handle<v8::Value> Database::Close (const v8::Arguments& args) {
LD_METHOD_SETUP_COMMON_ONEARG(close)
CloseWorker* worker = new CloseWorker(database, callback);
- AsyncQueueWorker(worker);
+
+ if (database->iterators.size() > 0) {
+ // yikes, we still have iterators open! naughty naughty.
+ // we have to queue up a CloseWorker and manually close each of them.
+ // the CloseWorker will be invoked once they are all cleaned up
+ database->pendingCloseWorker = worker;
+
+ for (
+ std::map< uint32_t, v8::Persistent<v8::Object> >::iterator it
+ = database->iterators.begin()
+ ; it != database->iterators.end()
+ ; ++it) {
+
+ // for each iterator still open, first check if it's already in
+ // the process of ending (ended==true means an async End() is
+ // in progress), if not, then we call End() with an empty callback
+ // function and wait for it to hit ReleaseIterator() where our
+ // CloseWorker will be invoked
+
+ leveldown::Iterator* iterator =
+ node::ObjectWrap::Unwrap<leveldown::Iterator>(it->second);
+
+ if (!iterator->ended) {
+ v8::Local<v8::Function> end =
+ v8::Local<v8::Function>::Cast(it->second->Get(
+ v8::String::NewSymbol("end")));
+ v8::Local<v8::Value> argv[] = {
+ v8::FunctionTemplate::New()->GetFunction() // empty callback
+ };
+ v8::TryCatch try_catch;
+ end->Call(it->second, 1, argv);
+ if (try_catch.HasCaught()) {
+ node::FatalException(try_catch);
+ }
+ }
+ }
+ } else {
+ AsyncQueueWorker(worker);
+ }
return v8::Undefined();
}
@@ -424,12 +477,26 @@ v8::Handle<v8::Value> Database::ApproximateSize (const v8::Arguments& args) {
v8::Handle<v8::Value> Database::Iterator (const v8::Arguments& args) {
v8::HandleScope scope;
+ Database* database = node::ObjectWrap::Unwrap<Database>(args.This());
+
v8::Local<v8::Object> optionsObj;
if (args.Length() > 0 && args[0]->IsObject()) {
optionsObj = v8::Local<v8::Object>::Cast(args[0]);
}
- return scope.Close(Iterator::NewInstance(args.This(), optionsObj));
+ // each iterator gets a unique id for this Database, so we can
+ // easily store & lookup on our `iterators` map
+ uint32_t id = database->currentIteratorId++;
+ v8::Handle<v8::Object> iterator = Iterator::NewInstance(
+ args.This()
+ , v8::Number::New(id)
+ , optionsObj
+ );
+ // register our iterator
+ database->iterators[id] =
+ node::ObjectWrap::Unwrap<leveldown::Iterator>(iterator)->handle_;
+
+ return scope.Close(iterator);
}
diff --git a/src/database.h b/src/database.h
index c2c1a0f..bf5027c 100644
--- a/src/database.h
+++ b/src/database.h
@@ -6,6 +6,7 @@
#ifndef LD_DATABASE_H
#define LD_DATABASE_H
+#include <map>
#include <node.h>
#include "leveldb/db.h"
@@ -56,13 +57,18 @@ public:
void ReleaseSnapshot (const leveldb::Snapshot* snapshot);
void CloseDatabase ();
const char* Location() const;
+ void ReleaseIterator (uint32_t id);
-private:
Database (char* location);
~Database ();
+private:
leveldb::DB* db;
char* location;
+ uint32_t currentIteratorId;
+ void(*pendingCloseWorker);
+
+ std::map< uint32_t, v8::Persistent<v8::Object> > iterators;
static v8::Persistent<v8::Function> constructor;
static void WriteDoing(uv_work_t *req);
diff --git a/src/iterator.cc b/src/iterator.cc
index b6b9cd4..e5fdd22 100644
--- a/src/iterator.cc
+++ b/src/iterator.cc
@@ -14,6 +14,7 @@ namespace leveldown {
Iterator::Iterator (
Database* database
+ , uint32_t id
, leveldb::Slice* start
, std::string* end
, bool reverse
@@ -25,6 +26,7 @@ Iterator::Iterator (
, bool valueAsBuffer
, v8::Persistent<v8::Value> startPtr
) : database(database)
+ , id(id)
, start(start)
, end(end)
, reverse(reverse)
@@ -104,6 +106,10 @@ void Iterator::IteratorEnd () {
dbIterator = NULL;
}
+void Iterator::Release () {
+ database->ReleaseIterator(id);
+}
+
void checkEndCallback (Iterator* iterator) {
iterator->nexting = false;
if (iterator->endWorker != NULL) {
@@ -142,7 +148,7 @@ v8::Handle<v8::Value> Iterator::Next (const v8::Arguments& args) {
iterator->nexting = true;
AsyncQueueWorker(worker);
- return v8::Undefined();
+ return scope.Close(args.Holder());
}
v8::Handle<v8::Value> Iterator::End (const v8::Arguments& args) {
@@ -175,7 +181,7 @@ v8::Handle<v8::Value> Iterator::End (const v8::Arguments& args) {
AsyncQueueWorker(worker);
}
- return v8::Undefined();
+ return scope.Close(args.Holder());
}
v8::Persistent<v8::Function> Iterator::constructor;
@@ -197,8 +203,9 @@ void Iterator::Init () {
tpl->GetFunction());
}
-v8::Handle<v8::Value> Iterator::NewInstance (
+v8::Handle<v8::Object> Iterator::NewInstance (
v8::Handle<v8::Object> database
+ , v8::Handle<v8::Number> id
, v8::Handle<v8::Object> optionsObj
) {
@@ -206,11 +213,11 @@ v8::Handle<v8::Value> Iterator::NewInstance (
v8::Local<v8::Object> instance;
if (optionsObj.IsEmpty()) {
- v8::Handle<v8::Value> argv[1] = { database };
- instance = constructor->NewInstance(1, argv);
- } else {
- v8::Handle<v8::Value> argv[2] = { database, optionsObj };
+ v8::Handle<v8::Value> argv[2] = { database, id };
instance = constructor->NewInstance(2, argv);
+ } else {
+ v8::Handle<v8::Value> argv[3] = { database, id, optionsObj };
+ instance = constructor->NewInstance(3, argv);
}
return scope.Close(instance);
@@ -229,10 +236,12 @@ v8::Handle<v8::Value> Iterator::New (const v8::Arguments& args) {
std::string* end = NULL;
int limit = -1;
+ v8::Local<v8::Value> id = args[1];
+
v8::Local<v8::Object> optionsObj;
- if (args.Length() > 1 && args[1]->IsObject()) {
- optionsObj = v8::Local<v8::Object>::Cast(args[1]);
+ if (args.Length() > 1 && args[2]->IsObject()) {
+ optionsObj = v8::Local<v8::Object>::Cast(args[2]);
if (optionsObj->Has(option_start)
&& (node::Buffer::HasInstance(optionsObj->Get(option_start))
@@ -268,6 +277,7 @@ v8::Handle<v8::Value> Iterator::New (const v8::Arguments& args) {
Iterator* iterator = new Iterator(
database
+ , (uint32_t)id->Int32Value()
, start
, end
, reverse
diff --git a/src/iterator.h b/src/iterator.h
index 6df6957..fe9545c 100644
--- a/src/iterator.h
+++ b/src/iterator.h
@@ -28,17 +28,15 @@ v8::Handle<v8::Value> CreateIterator (const v8::Arguments& args);
class Iterator : public node::ObjectWrap {
public:
static void Init ();
- static v8::Handle<v8::Value> NewInstance (
+ static v8::Handle<v8::Object> NewInstance (
v8::Handle<v8::Object> database
+ , v8::Handle<v8::Number> id
, v8::Handle<v8::Object> optionsObj
);
- bool IteratorNext (std::string& key, std::string& value);
- leveldb::Status IteratorStatus ();
- void IteratorEnd ();
-
Iterator (
Database* database
+ , uint32_t id
, leveldb::Slice* start
, std::string* end
, bool reverse
@@ -53,8 +51,14 @@ public:
~Iterator ();
+ bool IteratorNext (std::string& key, std::string& value);
+ leveldb::Status IteratorStatus ();
+ void IteratorEnd ();
+ void Release ();
+
private:
Database* database;
+ uint32_t id;
leveldb::Iterator* dbIterator;
leveldb::ReadOptions* options;
leveldb::Slice* start;
diff --git a/src/iterator_async.cc b/src/iterator_async.cc
index 243048b..2e39953 100644
--- a/src/iterator_async.cc
+++ b/src/iterator_async.cc
@@ -81,4 +81,10 @@ void EndWorker::Execute () {
iterator->IteratorEnd();
}
+void EndWorker::HandleOKCallback () {
+ iterator->Release();
+ v8::Local<v8::Value> argv[0];
+ LD_RUN_CALLBACK(callback, argv, 0);
+}
+
} // namespace leveldown
diff --git a/src/iterator_async.h b/src/iterator_async.h
index 9d44593..1c5e3fb 100644
--- a/src/iterator_async.h
+++ b/src/iterator_async.h
@@ -42,6 +42,7 @@ public:
virtual ~EndWorker ();
virtual void Execute ();
+ virtual void HandleOKCallback ();
private:
Iterator* iterator;
diff --git a/test/cleanup-hanging-iterators-test.js b/test/cleanup-hanging-iterators-test.js
new file mode 100644
index 0000000..b97890c
--- /dev/null
+++ b/test/cleanup-hanging-iterators-test.js
@@ -0,0 +1,77 @@
+const test = require('tap').test
+ , testCommon = require('./common')
+ , leveldown = require('../')
+
+ , makeTest = function (name, testFn) {
+ test(name, function (t) {
+ testCommon.cleanup(function () {
+ var db = leveldown(testCommon.location())
+ , done = function () {
+ db.close(function (err) {
+ t.notOk(err, 'no error from close()')
+ testCommon.cleanup(t.end.bind(t))
+ })
+ }
+ db.open(function (err) {
+ t.notOk(err, 'no error from open()')
+ db.batch(
+ [
+ { type: 'put', key: 'one', value: '1' }
+ , { type: 'put', key: 'two', value: '2' }
+ , { type: 'put', key: 'three', value: '3' }
+ ]
+ , function (err) {
+ t.notOk(err, 'no error from batch()')
+ testFn(db, t, done)
+ }
+ )
+ })
+ })
+ })
+ }
+
+makeTest('test ended iterator', function (db, t, done) {
+ // standard iterator with an end() properly called, easy
+
+ var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false })
+ it.next(function (err, key, value) {
+ t.notOk(err, 'no error from next()')
+ t.equal(key, 'one', 'correct key')
+ t.equal(value, '1', 'correct value')
+ it.end(function (err) {
+ t.notOk(err, 'no error from next()')
+ done()
+ })
+ })
+})
+
+makeTest('test non-ended iterator', function (db, t, done) {
+ // no end() call on our iterator, cleanup should crash Node if not handled properly
+ var it = db.iterator({ keyAsBuffer: false, valueAsBuffer: false })
+ it.next(function (err, key, value) {
+ t.notOk(err, 'no error from next()')
+ t.equal(key, 'one', 'correct key')
+ t.equal(value, '1', 'correct value')
+ done()
+ })
+})
+
+makeTest('test multiple non-ended iterators', function (db, t, done) {
+ // no end() call on our iterator, cleanup should crash Node if not handled properly
+ db.iterator()
+ db.iterator().next(function () {})
+ db.iterator().next(function () {})
+ db.iterator().next(function () {})
+ setTimeout(done, 50)
+})
+
+makeTest('test ending iterators', function (db, t, done) {
+ // no end() call on our iterator, cleanup should crash Node if not handled properly
+ var it1 = db.iterator().next(function () {
+ it1.end(function () {})
+ })
+ , it2 = db.iterator().next(function () {
+ it2.end(function () {})
+ done()
+ })
+})
\ No newline at end of file
diff --git a/test/common.js b/test/common.js
index 204ec92..331ad83 100644
--- a/test/common.js
+++ b/test/common.js
@@ -65,6 +65,7 @@ var dbidx = 0
module.exports = {
location : location
+ , cleanup : cleanup
, lastLocation : lastLocation
, setUp : setUp
, tearDown : tearDown
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git
More information about the Pkg-javascript-commits
mailing list