[Pkg-javascript-commits] [node-leveldown] 02/492: async all the things! better C++ organisation
Andrew Kelley
andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:38 UTC 2014
This is an automated email from the git hooks/post-receive script.
andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.
commit 7ee13ba11387db666e397e85f1dcb80c028b9f24
Author: Rod Vagg <rod at vagg.org>
Date: Thu Jul 12 12:44:10 2012 +1000
async all the things! better C++ organisation
---
binding.gyp | 3 +-
lib/levelup.js | 43 ++++++----
src/async.cc | 145 ++++++++++++++++++++++++++++++++
src/async.h | 76 +++++++++++++++++
src/database.cc | 223 +++++++++++++++++---------------------------------
src/database.h | 12 +--
src/database_async.cc | 20 -----
src/database_async.h | 58 -------------
test/simple-test.js | 40 +++++----
9 files changed, 352 insertions(+), 268 deletions(-)
diff --git a/binding.gyp b/binding.gyp
index 67aca17..032df9c 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -42,7 +42,7 @@
, "sources": [
"src/levelup.cc"
, "src/database.cc"
- , "src/database_async.cc"
+ , "src/async.cc"
]
, "include_dirs": [
"<(module_root_dir)/deps/leveldb-1.5.0/include/"
@@ -52,6 +52,7 @@
"libraries": [
"<(module_root_dir)/deps/leveldb-1.5.0/libleveldb.a"
]
+ , "cflags": ["-g"]
}]
]
}
diff --git a/lib/levelup.js b/lib/levelup.js
index 1ae6984..6b690af 100644
--- a/lib/levelup.js
+++ b/lib/levelup.js
@@ -13,28 +13,35 @@ var bridge = require('../build/Release/levelup')
, Database = {
open: function (callback) {
var options = {}
-
- this.close()
- Object.keys(defaultOptions).forEach(function (p) {
- options[p] = this[p]
- }.bind(this))
- this.db = bridge.createDatabase()
- this.db.open(this.location, options, function (err) {
- if (err) {
- this.close()
- err = new errors.OpenError(err)
- }
- if (callback)
- callback.apply(null, err ? [ err ] : [])
- else if (err)
- throw err
- }.bind(this))
+ , execute = function () {
+ Object.keys(defaultOptions).forEach(function (p) {
+ options[p] = this[p]
+ }.bind(this))
+ this.db = bridge.createDatabase()
+ this.db.open(this.location, options, function (err) {
+ if (err) {
+ this.db = null
+ err = new errors.OpenError(err)
+ }
+ if (callback)
+ callback.apply(null, err ? [ err ] : [])
+ else if (err)
+ throw err
+ }.bind(this))
+ }.bind(this)
+
+ if (this.isOpen())
+ this.close(execute)
+ else
+ execute()
}
- , close: function () {
+ , close: function (callback) {
if (this.isOpen()) {
- this.db.close()
+ this.db.close(callback)
this.db = null
+ } else {
+ callback()
}
}
diff --git a/src/async.cc b/src/async.cc
new file mode 100644
index 0000000..9bd2b41
--- /dev/null
+++ b/src/async.cc
@@ -0,0 +1,145 @@
+#include <cstdlib>
+#include <node.h>
+#include <iostream>
+#include <pthread.h>
+
+#include "database.h"
+#include "async.h"
+
+using namespace std;
+using namespace v8;
+using namespace node;
+using namespace leveldb;
+
+void runCallback (Persistent<Function> callback, Local<Value> argv[], int length);
+
+/** ASYNC BASE **/
+
+void AsyncWorker::WorkComplete () {
+ HandleScope scope;
+ if (status.ok())
+ HandleOKCallback();
+ else
+ HandleErrorCallback();
+ callback.Dispose();
+}
+
+void AsyncWorker::HandleOKCallback () {
+ Local<Value> argv[0];
+ runCallback(callback, argv, 0);
+}
+
+void AsyncWorker::HandleErrorCallback () {
+ Local<Value> argv[] = {
+ Local<Value>::New(Exception::Error(String::New(status.ToString().c_str())))
+ };
+ runCallback(callback, argv, 1);
+}
+
+/** OPEN WORKER **/
+
+OpenWorker::OpenWorker (Database* database, Persistent<Function> callback, string location, bool createIfMissing, bool errorIfExists) {
+ request.data = this;
+ this->database = database;
+ this->location = location;
+ this->callback = callback;
+ options = new Options();
+ options->create_if_missing = createIfMissing;
+ options->error_if_exists = errorIfExists;
+}
+
+OpenWorker::~OpenWorker () {
+ delete options;
+}
+
+void OpenWorker::Execute() {
+ status = database->OpenDatabase(options, location);
+}
+
+/** CLOSE WORKER **/
+
+CloseWorker::CloseWorker (Database* database, Persistent<Function> callback) {
+ request.data = this;
+ this->database = database;
+ this->callback = callback;
+}
+
+void CloseWorker::Execute() {
+ database->CloseDatabase();
+}
+
+void CloseWorker::WorkComplete () {
+ HandleScope scope;
+ HandleOKCallback();
+ callback.Dispose();
+}
+
+/** WRITE WORKER **/
+
+WriteWorker::WriteWorker (Database* database, Persistent<Function> callback, string key, string value, bool sync) {
+ request.data = this;
+ this->database = database;
+ this->callback = callback;
+ this->key = key;
+ this->value = value;
+ options = new WriteOptions();
+ options->sync = sync;
+}
+
+WriteWorker::~WriteWorker () {
+ delete options;
+}
+
+void WriteWorker::Execute() {
+ status = database->WriteToDatabase(options, key, value);
+}
+
+/** READ WORKER **/
+
+ReadWorker::ReadWorker (Database* database, Persistent<Function> callback, string key) {
+ request.data = this;
+ this->database = database;
+ this->callback = callback;
+ this->key = key;
+ options = new ReadOptions();
+}
+
+ReadWorker::~ReadWorker () {
+ delete options;
+}
+
+void ReadWorker::Execute () {
+ status = database->ReadFromDatabase(options, key, value);
+}
+
+void ReadWorker::HandleOKCallback () {
+ Local<Value> argv[] = {
+ Local<Value>::New(Null())
+ , Local<Value>::New(String::New(value.c_str()))
+ };
+ runCallback(callback, argv, 2);
+}
+
+/** UTIL **/
+
+void runCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
+ TryCatch try_catch;
+ callback->Call(Context::GetCurrent()->Global(), length, argv);
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+}
+
+void AsyncExecute (uv_work_t* req) {
+ static_cast<AsyncWorker*>(req->data)->Execute();
+}
+
+void AsyncExecuteComplete (uv_work_t* req) {
+ AsyncWorker* worker = static_cast<AsyncWorker*>(req->data);
+ worker->WorkComplete();
+ delete worker;
+}
+
+void AsyncQueueWorker (AsyncWorker* worker) {
+ uv_queue_work(uv_default_loop(), &worker->request, AsyncExecute, AsyncExecuteComplete);
+}
\ No newline at end of file
diff --git a/src/async.h b/src/async.h
new file mode 100644
index 0000000..3a77688
--- /dev/null
+++ b/src/async.h
@@ -0,0 +1,76 @@
+#ifndef LU_ASYNC_H
+#define LU_ASYNC_H
+
+#include <cstdlib>
+#include <node.h>
+
+using namespace std;
+using namespace v8;
+using namespace leveldb;
+
+class AsyncWorker {
+public:
+ uv_work_t request;
+ Database* database;
+ Persistent<Function> callback;
+ Status status;
+ void WorkComplete ();
+ virtual void Execute () {};
+
+protected:
+ virtual void HandleOKCallback ();
+ virtual void HandleErrorCallback ();
+};
+
+class OpenWorker : public AsyncWorker {
+public:
+ OpenWorker (Database* database, Persistent<Function> callback, string location, bool createIfMissing, bool errorIfExists);
+ ~OpenWorker ();
+
+ string location;
+ Options* options;
+ virtual void Execute ();
+};
+
+class CloseWorker : public AsyncWorker {
+public:
+ CloseWorker (Database* database, Persistent<Function> callback);
+
+ virtual void Execute ();
+
+private:
+ virtual void WorkComplete ();
+};
+
+class IOWorker : public AsyncWorker {
+public:
+ string key;
+ string value;
+};
+
+class WriteWorker : public IOWorker {
+public:
+ WriteWorker (Database* database, Persistent<Function> callback, string key, string value, bool sync);
+ ~WriteWorker ();
+
+ WriteOptions* options;
+ virtual void Execute ();
+};
+
+class ReadWorker : public IOWorker {
+public:
+ ReadWorker (Database* database, Persistent<Function> callback, string key);
+ ~ReadWorker ();
+
+ ReadOptions* options;
+ virtual void Execute ();
+
+protected:
+ virtual void HandleOKCallback ();
+};
+
+void AsyncExecute (uv_work_t* req);
+void AsyncExecuteComplete (uv_work_t* req);
+void AsyncQueueWorker (AsyncWorker* worker);
+
+#endif
\ No newline at end of file
diff --git a/src/database.cc b/src/database.cc
index 1008706..ee9c904 100644
--- a/src/database.cc
+++ b/src/database.cc
@@ -7,48 +7,55 @@
#include "levelup.h"
#include "database.h"
-#include "database_async.h"
+#include "async.h"
using namespace std;
using namespace v8;
using namespace node;
using namespace leveldb;
-LU_OPTION( createIfMissing );
-LU_OPTION( errorIfExists );
+LU_OPTION ( createIfMissing ); // for open()
+LU_OPTION ( errorIfExists ); // for open()
+LU_OPTION ( sync ); // for write()
-Handle<Value> CreateDatabase (const Arguments& args) {
- HandleScope scope;
- return scope.Close(Database::NewInstance(args));
-}
+Database::Database () {
+ db = NULL;
+};
-void runCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
- TryCatch try_catch;
- callback->Call(Context::GetCurrent()->Global(), length, argv);
+Database::~Database () {
+ if (db != NULL)
+ delete db;
+};
- if (try_catch.HasCaught()) {
- FatalException(try_catch);
- }
+/* expect these to be called from worker threads, no v8 here */
+
+Status Database::OpenDatabase (Options* options, string location) {
+ return DB::Open(*options, location, &db);
+}
+
+Status Database::WriteToDatabase (WriteOptions* options, string key, string value) {
+ return db->Put(*options, key, value);
}
-// Database class
+Status Database::ReadFromDatabase (ReadOptions* options, string key, string& value) {
+ return db->Get(*options, key, &value);
+}
-Database::Database () {};
-Database::~Database () {};
+void Database::CloseDatabase () {
+ delete db;
+ db = NULL;
+}
Persistent<Function> Database::constructor;
void Database::Init () {
- // Prepare constructor template
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(String::NewSymbol("Database"));
- tpl->InstanceTemplate()->SetInternalFieldCount(1);
- // Prototype
+ tpl->InstanceTemplate()->SetInternalFieldCount(4);
tpl->PrototypeTemplate()->Set(String::NewSymbol("open"), FunctionTemplate::New(Open)->GetFunction());
tpl->PrototypeTemplate()->Set(String::NewSymbol("close"), FunctionTemplate::New(Close)->GetFunction());
tpl->PrototypeTemplate()->Set(String::NewSymbol("write"), FunctionTemplate::New(Write)->GetFunction());
tpl->PrototypeTemplate()->Set(String::NewSymbol("read"), FunctionTemplate::New(Read)->GetFunction());
-
constructor = Persistent<Function>::New(tpl->GetFunction());
}
@@ -64,123 +71,44 @@ Handle<Value> Database::New (const Arguments& args) {
Handle<Value> Database::NewInstance (const Arguments& args) {
HandleScope scope;
- const unsigned argc = 0;
Handle<Value> argv[0];
- Local<Object> instance = constructor->NewInstance(argc, argv);
+ Local<Object> instance = constructor->NewInstance(0, argv);
return scope.Close(instance);
}
-/** OPEN (async) **/
-
-Status Database::OpenDatabase(Options options, string location) {
- //cout << "open() in " << pthread_self() << endl;
- return DB::Open(options, location, &db);
-}
-
-Status Database::WriteToDatabase(Options options, string key, string value) {
- //cout << "write() in " << pthread_self() << endl;
- return db->Put(leveldb::WriteOptions(), key, value);
-}
-
-Status Database::ReadFromDatabase(Options options, string key, string* value) {
- //cout << "read() in " << pthread_self() << endl;
- return db->Get(leveldb::ReadOptions(), key, value);
-}
-
-void AsyncOpenComplete (uv_work_t* req) {
- HandleScope scope;
- AsyncBatonOpen* baton = static_cast<AsyncBatonOpen*>(req->data);
-
- if (baton->status.ok()) {
- Local<Value> argv[0];
- runCallback(baton->callback, argv, 0);
- } else {
- Local<Value> argv[] = {
- Local<Value>::New(Exception::Error(String::New(baton->status.ToString().c_str())))
- };
- runCallback(baton->callback, argv, 1);
- }
-
- baton->callback.Dispose();
- delete baton;
-}
-
-void AsyncWriteComplete (uv_work_t* req) {
- HandleScope scope;
- AsyncBatonWrite* baton = static_cast<AsyncBatonWrite*>(req->data);
-
- if (baton->status.ok()) {
- Local<Value> argv[0];
- runCallback(baton->callback, argv, 0);
- } else {
- Local<Value> argv[] = {
- Local<Value>::New(Exception::Error(String::New(baton->status.ToString().c_str())))
- };
- runCallback(baton->callback, argv, 1);
- }
-
- baton->callback.Dispose();
- delete baton;
-}
-
-void AsyncReadComplete (uv_work_t* req) {
- HandleScope scope;
- AsyncBatonRead* baton = static_cast<AsyncBatonRead*>(req->data);
-
- if (baton->status.ok()) {
- Local<Value> argv[] = {
- Local<Value>::New(Null())
- , Local<Value>::New(String::New(baton->value.c_str()))
- };
- runCallback(baton->callback, argv, 2);
- } else {
- Local<Value> argv[] = {
- Local<Value>::New(Exception::Error(String::New(baton->status.ToString().c_str())))
- };
- runCallback(baton->callback, argv, 1);
- }
-
- baton->callback.Dispose();
- delete baton;
-}
-
Handle<Value> Database::Open (const Arguments& args) {
HandleScope scope;
Database* database = ObjectWrap::Unwrap<Database>(args.This());
- AsyncBatonOpen* baton = new AsyncBatonOpen();
- baton->request.data = baton;
- baton->database = database;
-
String::Utf8Value location(args[0]->ToString());
Local<Object> optionsObj = Local<Object>::Cast(args[1]);
- Local<Function> callback = Local<Function>::Cast(args[2]);
-
- baton->callback = Persistent<Function>::New(callback);
-
- baton->location = *location;
-
- if (optionsObj->Has(option_createIfMissing))
- baton->options.create_if_missing = optionsObj->Get(option_createIfMissing)->BooleanValue();
-
- if (optionsObj->Has(option_errorIfExists))
- baton->options.error_if_exists = optionsObj->Get(option_errorIfExists)->BooleanValue();
-
- uv_queue_work(uv_default_loop(), &baton->request, AsyncOpen, AsyncOpenComplete);
+ Persistent<Function> callback;
+ if (args.Length() > 1)
+ callback = Persistent<Function>::New(Local<Function>::Cast(args[2]));
+
+ OpenWorker* worker = new OpenWorker(
+ database
+ , callback
+ , *location
+ , optionsObj->Has(option_createIfMissing) && optionsObj->Get(option_createIfMissing)->BooleanValue()
+ , optionsObj->Has(option_errorIfExists) && optionsObj->Get(option_errorIfExists)->BooleanValue()
+ );
+ AsyncQueueWorker(worker);
return Undefined();
}
-/** CLOSE **/
-
Handle<Value> Database::Close (const Arguments& args) {
HandleScope scope;
Database* database = ObjectWrap::Unwrap<Database>(args.This());
+ Persistent<Function> callback;
+ if (args.Length() > 0)
+ callback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
- delete database->db;
- database->db = NULL;
+ CloseWorker* worker = new CloseWorker(database, callback);
+ AsyncQueueWorker(worker);
return Undefined();
}
@@ -189,26 +117,25 @@ Handle<Value> Database::Write (const Arguments& args) {
HandleScope scope;
Database* database = ObjectWrap::Unwrap<Database>(args.This());
- AsyncBatonWrite* baton = new AsyncBatonWrite();
- baton->request.data = baton;
- baton->database = database;
-
String::Utf8Value key(args[0]->ToString());
String::Utf8Value value(args[1]->ToString());
-
- baton->key = *key;
- baton->value = *value;
-
-/* Local<Object> optionsObj = NULL;
- if (args.Length() > 3)
- optionsObj = Local<Object>::Cast(args[2]);
-*/
- if (args.Length() > 2) {
- Local<Function> callback = Local<Function>::Cast(args[args.Length() > 3 ? 3 : 2]);
- baton->callback = Persistent<Function>::New(callback);
+ Persistent<Function> callback;
+ if (args.Length() > 2)
+ callback = Persistent<Function>::New(Local<Function>::Cast(args[args.Length() > 3 ? 3 : 2]));
+ bool sync = false;
+ if (args.Length() > 3) { // an options object
+ Local<Object> optionsObj = Local<Object>::Cast(args[2]);
+ sync = optionsObj->Has(option_sync) && optionsObj->Get(option_sync)->BooleanValue();
}
- uv_queue_work(uv_default_loop(), &baton->request, AsyncWrite, AsyncWriteComplete);
+ WriteWorker* worker = new WriteWorker(
+ database
+ , callback
+ , *key
+ , *value
+ , sync
+ );
+ AsyncQueueWorker(worker);
return Undefined();
}
@@ -217,24 +144,22 @@ Handle<Value> Database::Read (const Arguments& args) {
HandleScope scope;
Database* database = ObjectWrap::Unwrap<Database>(args.This());
- AsyncBatonRead* baton = new AsyncBatonRead();
- baton->request.data = baton;
- baton->database = database;
-
String::Utf8Value key(args[0]->ToString());
+ Persistent<Function> callback;
+ if (args.Length() > 1)
+ callback = Persistent<Function>::New(Local<Function>::Cast(args[args.Length() > 2 ? 2 : 1]));
- baton->key = *key;
-
-/* Local<Object> optionsObj = NULL;
- if (args.Length() > 2)
- optionsObj = Local<Object>::Cast(args[1]);
-*/
- if (args.Length() > 1) {
- Local<Function> callback = Local<Function>::Cast(args[args.Length() > 2 ? 2 : 1]);
- baton->callback = Persistent<Function>::New(callback);
- }
-
- uv_queue_work(uv_default_loop(), &baton->request, AsyncRead, AsyncReadComplete);
+ ReadWorker* worker = new ReadWorker(
+ database
+ , callback
+ , *key
+ );
+ AsyncQueueWorker(worker);
return Undefined();
+}
+
+Handle<Value> CreateDatabase (const Arguments& args) {
+ HandleScope scope;
+ return scope.Close(Database::NewInstance(args));
}
\ No newline at end of file
diff --git a/src/database.h b/src/database.h
index 5dfdd17..5851e22 100644
--- a/src/database.h
+++ b/src/database.h
@@ -9,21 +9,22 @@
#define LU_V8_METHOD(name) \
static v8::Handle<v8::Value> name (const v8::Arguments& args);
-
using namespace std;
using namespace v8;
using namespace leveldb;
+struct AsyncDescriptor;
+
Handle<Value> CreateDatabase (const Arguments& args);
class Database : public node::ObjectWrap {
public:
static void Init ();
static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
-
- Status OpenDatabase (Options options, string location);
- Status WriteToDatabase (Options options, string key, string value);
- Status ReadFromDatabase (Options options, string key, string* value);
+ Status OpenDatabase (Options* options, string location);
+ Status WriteToDatabase (WriteOptions* options, string key, string value);
+ Status ReadFromDatabase (ReadOptions* options, string key, string& value);
+ void CloseDatabase ();
private:
Database ();
@@ -32,6 +33,7 @@ class Database : public node::ObjectWrap {
DB* db;
static v8::Persistent<v8::Function> constructor;
+
LU_V8_METHOD( New )
LU_V8_METHOD( Open )
LU_V8_METHOD( Close )
diff --git a/src/database_async.cc b/src/database_async.cc
deleted file mode 100644
index d1f6893..0000000
--- a/src/database_async.cc
+++ /dev/null
@@ -1,20 +0,0 @@
-#include <node.h>
-
-#include "database_async.h"
-
-using namespace node;
-
-void AsyncOpen (uv_work_t* req) {
- AsyncBatonOpen* baton = static_cast<AsyncBatonOpen*>(req->data);
- baton->status = baton->database->OpenDatabase(baton->options, baton->location);
-}
-
-void AsyncWrite (uv_work_t* req) {
- AsyncBatonWrite* baton = static_cast<AsyncBatonWrite*>(req->data);
- baton->status = baton->database->WriteToDatabase(baton->options, baton->key, baton->value);
-}
-
-void AsyncRead (uv_work_t* req) {
- AsyncBatonRead* baton = static_cast<AsyncBatonRead*>(req->data);
- baton->status = baton->database->ReadFromDatabase(baton->options, baton->key, &baton->value);
-}
\ No newline at end of file
diff --git a/src/database_async.h b/src/database_async.h
deleted file mode 100644
index 67a2b79..0000000
--- a/src/database_async.h
+++ /dev/null
@@ -1,58 +0,0 @@
-#ifndef LU_DATABASE_ASYNC_H
-#define LU_DATABASE_ASYNC_H
-
-#include <cstdlib>
-#include <node.h>
-#include <v8.h>
-
-#include "leveldb/db.h"
-
-#include "database.h"
-
-using namespace node;
-using namespace v8;
-using namespace std;
-using namespace leveldb;
-
-struct AsyncBatonOpen {
- uv_work_t request;
- Database* database;
- // in
- string location;
- Options options;
- Persistent<Function> callback;
- // out
- Status status;
-};
-
-void AsyncOpen (uv_work_t* req);
-
-struct AsyncBatonWrite {
- uv_work_t request;
- Database* database;
- // in
- string key;
- string value;
- Options options;
- Persistent<Function> callback;
- // out
- Status status;
-};
-
-void AsyncWrite (uv_work_t* req);
-
-struct AsyncBatonRead {
- uv_work_t request;
- Database* database;
- // in
- string key;
- Options options;
- Persistent<Function> callback;
- // out
- Status status;
- string value;
-};
-
-void AsyncRead (uv_work_t* req);
-
-#endif
\ No newline at end of file
diff --git a/test/simple-test.js b/test/simple-test.js
index e759f2b..23a2511 100644
--- a/test/simple-test.js
+++ b/test/simple-test.js
@@ -13,10 +13,15 @@ buster.testCase('Basic API', {
}
, 'tearDown': function (done) {
- async.forEach(this.cleanupDirs, rimraf, done)
- this.closeableDatabases.forEach(function (db) {
- db.close()
- })
+ async.forEach(
+ this.closeableDatabases
+ , function (db, callback) {
+ db.close(callback)
+ }
+ , function () {
+ async.forEach(this.cleanupDirs, rimraf, done)
+ }.bind(this)
+ )
}
, 'createDatabase()': function () {
@@ -123,19 +128,20 @@ buster.testCase('Basic API', {
refute(err) // sanity
assert.isTrue(db.isOpen())
- db.close()
- assert.isFalse(db.isOpen())
-
- db = levelup.createDatabase(
- this.cleanupDirs[0]
- , { errorIfExists : false }
- )
- this.closeableDatabases.push(db)
- db.open(function (err) {
- refute(err)
- assert.isTrue(db.isOpen())
- done()
- })
+ db.close(function () {
+ assert.isFalse(db.isOpen())
+
+ db = levelup.createDatabase(
+ this.cleanupDirs[0]
+ , { errorIfExists : false }
+ )
+ this.closeableDatabases.push(db)
+ db.open(function (err) {
+ refute(err)
+ assert.isTrue(db.isOpen())
+ done()
+ })
+ }.bind(this))
}.bind(this))
}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git
More information about the Pkg-javascript-commits
mailing list