[Pkg-javascript-commits] [node-leveldown] 09/492: iterator -> ReadStream, WriteStream for put/batch
Andrew Kelley
andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:38 UTC 2014
This is an automated email from the git hooks/post-receive script.
andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.
commit fb58afe851d3aee0bee910c27270d85cf1e2ffdd
Author: Rod Vagg <rod at vagg.org>
Date: Sat Aug 11 22:56:14 2012 +1000
iterator -> ReadStream, WriteStream for put/batch
---
binding.gyp | 3 +
lib/levelup.js | 155 ++++++++++++++++++++++++-------------
lib/read-stream.js | 45 +++++++++++
lib/util.js | 12 +++
lib/write-stream.js | 94 +++++++++++++++++++++++
package.json | 3 +-
src/async.cc | 105 +++----------------------
src/async.h | 156 +-------------------------------------
src/database.cc | 59 ++++++++++++--
src/database.h | 32 ++++----
src/database_async.cc | 151 ++++++++++++++++++++++++++++++++++++
src/{async.h => database_async.h} | 38 ++--------
src/iterator.cc | 114 ++++++++++++++++++++++++++++
src/iterator.h | 63 +++++++++++++++
src/iterator_async.cc | 37 +++++++++
src/iterator_async.h | 57 ++++++++++++++
src/levelup.cc | 4 +
test/read-stream-test.js | 95 +++++++++++++++++++++++
test/simple-test.js | 4 +-
test/write-stream-test.js | 118 ++++++++++++++++++++++++++++
20 files changed, 986 insertions(+), 359 deletions(-)
diff --git a/binding.gyp b/binding.gyp
index bad4981..2feb539 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -43,6 +43,9 @@
"src/async.cc"
, "src/batch.cc"
, "src/database.cc"
+ , "src/database_async.cc"
+ , "src/iterator.cc"
+ , "src/iterator_async.cc"
, "src/levelup.cc"
]
, "include_dirs": [
diff --git a/lib/levelup.js b/lib/levelup.js
index bfc5777..93c069a 100644
--- a/lib/levelup.js
+++ b/lib/levelup.js
@@ -1,5 +1,11 @@
-var bridge = require('../build/Release/levelup')
- , errors = require('./errors')
+var bridge = require('../build/Release/levelup')
+
+ , errors = require('./errors')
+ , ReadStream = require('./read-stream')
+ , WriteStream = require('./write-stream')
+ , toEncoding = require('./util').toEncoding
+ , toBuffer = require('./util').toBuffer
+ , EventEmitter = require('events').EventEmitter
, defaultOptions = {
createIfMissing : false
@@ -7,10 +13,6 @@ var bridge = require('../build/Release/levelup')
, encoding : 'utf8'
}
- , toString = function () {
- return "LevelUPDatabase"
- }
-
, encodingOpts = (function (enc) {
var eo = {}
enc.forEach(function (e) { eo[e] = { encoding: e } })
@@ -38,31 +40,25 @@ var bridge = require('../build/Release/levelup')
return callback_
}
- , toBuffer = function (data, encoding) {
- return data === undefined || data === null || Buffer.isBuffer(data) ? data : new Buffer('' + data, encoding)
- }
-
- , toEncoding = function (buffer, encoding) {
- return encoding == 'binary' ? buffer : buffer.toString(encoding)
- }
-
, Database = {
open: function (callback) {
var options = {}
, execute = function () {
+ var db = bridge.createDatabase()
Object.keys(defaultOptions).forEach(function (p) {
options[p] = this[p]
}.bind(this))
- this.db = bridge.createDatabase()
- this.db.open(this.location, options, function (err) {
+ db.open(this.location, options, function (err) {
if (err) {
- this.db = null
err = new errors.OpenError(err)
+ if (callback)
+ return callback(err)
+ this.ee.emit('error', err)
+ } else {
+ this.db = db
+ callback()
+ this.ee.emit('ready')
}
- if (callback)
- callback.apply(null, err ? [ err ] : [])
- else if (err)
- throw err
}.bind(this))
}.bind(this)
@@ -74,7 +70,10 @@ var bridge = require('../build/Release/levelup')
, close: function (callback) {
if (this.isOpen()) {
- this.db.close(callback)
+ this.db.close(function () {
+ this.ee.emit('closed')
+ callback.apply(null, arguments)
+ }.bind(this))
this.db = null
} else {
callback()
@@ -85,28 +84,35 @@ var bridge = require('../build/Release/levelup')
return !!this.db
}
- , get: function (key, options_, callback_) {
+ , get: function (key_, options_, callback_) {
var callback = getCallback(options_, callback_)
- , options
+ , options, key, err
+
if (this.isOpen()) {
options = getOptions(options_, this.encoding)
- key = toBuffer(key, options.keyEncoding || options.encoding)
+ key = toBuffer(key_, options.keyEncoding || options.encoding)
this.db.get(key, options, function (err, value) {
if (err) {
- err = new errors.NotFoundError('Key not found in database [' + key + ']')
+ err = new errors.NotFoundError('Key not found in database [' + key_ + ']')
if (callback)
return callback(err)
throw err
}
- callback && callback(null, toEncoding(value, options.valueEncoding || options.encoding), key)
+ callback && callback(null, toEncoding(value, options.valueEncoding || options.encoding), key_)
})
- } else
- callback(new errors.ReadError('Database has not been opened'))
+ } else {
+ err = new errors.ReadError('Database has not been opened')
+ if (callback)
+ callback(err)
+ else
+ throw err
+ }
}
, put: function (key, value, options_, callback_) {
var callback = getCallback(options_, callback_)
- , options
+ , options, err
+
if (this.isOpen()) {
options = getOptions(options_, this.encoding)
key = toBuffer(key, options.keyEncoding || options.encoding)
@@ -116,17 +122,25 @@ var bridge = require('../build/Release/levelup')
err = new errors.WriteError(err)
if (callback)
return callback(err)
- throw err
+ this.ee.emit('error', err)
+ } else {
+ this.ee.emit('put', key, value)
+ callback && callback(null, key, value)
}
- callback && callback(null, key, value)
- })
- } else
- callback(new errors.ReadError('Database has not been opened'))
+ }.bind(this))
+ } else {
+ err = new errors.WriteError('Database has not been opened')
+ if (callback)
+ callback(err)
+ else
+ throw err
+ }
}
, del: function (key, options_, callback_) {
var callback = getCallback(options_, callback_)
- , options
+ , options, err
+
if (this.isOpen()) {
options = getOptions(options_, this.encoding)
key = toBuffer(key, options.keyEncoding || options.encoding)
@@ -135,32 +149,44 @@ var bridge = require('../build/Release/levelup')
err = new errors.WriteError(err)
if (callback)
return callback(err)
- throw err
+ this.ee.emit('error', err)
+ } else {
+ this.ee.emit('del', key)
+ callback && callback(null, key)
}
- callback && callback(null, key)
- })
- } else
- callback(new errors.ReadError('Database has not been opened'))
+ }.bind(this))
+ } else {
+ err = new errors.WriteError('Database has not been opened')
+ if (callback)
+ callback(err)
+ else
+ throw err
+ }
}
, batch: function (arr, options_, callback_) {
var callback = getCallback(options_, callback_)
, empty = {}
- , options, keyEncoding, valueEncoding
+ , options, keyEncoding, valueEncoding, err
- if (!this.isOpen())
- return callback(new errors.ReadError('Database has not been opened'))
+ if (!this.isOpen()) {
+ err = new errors.WriteError('Database has not been opened')
+ if (callback)
+ return callback(err)
+ else
+ throw err
+ }
options = getOptions(options_, this.encoding)
keyEncoding = options.keyEncoding || options.encoding
valueEncoding = options.valueEncoding || options.encoding
arr = arr.map(function (e) {
- if (e.type && e.key) {
+ if (e.type !== undefined && e.key !== undefined) {
var o = {
type : e.type
, key : toBuffer(e.key, keyEncoding)
}
- if (e.value)
+ if (e.value !== undefined)
o.value = toBuffer(e.value, valueEncoding)
return o
}
@@ -171,10 +197,33 @@ var bridge = require('../build/Release/levelup')
err = new errors.WriteError(err)
if (callback)
return callback(err)
- throw err
+ this.ee.emit('error', err)
+ } else {
+ this.ee.emit('batch', arr)
+ callback && callback()
}
- callback && callback()
- })
+ }.bind(this))
+ }
+
+ , readStream: function (options) {
+ return new ReadStream(
+ options || {}
+ , this
+ , function () {
+ return bridge.createIterator(this.db)
+ }.bind(this)
+ )
+ }
+
+ , writeStream: function (options) {
+ return new WriteStream(
+ options || {}
+ , this
+ )
+ }
+
+ , toString: function () {
+ return "LevelUPDatabase"
}
}
@@ -193,11 +242,13 @@ var bridge = require('../build/Release/levelup')
if (typeof location != 'string')
throw new errors.InitializationError('Must provide a location for the database')
- database = Object.create({ toString: toString })
- ctx = {}
+ database = new EventEmitter()
+ ctx = {
+ ee : database
+ , location : location
+ }
defineReadOnlyProperty(database, 'location', location)
- ctx.location = location
Object.keys(defaultOptions).forEach(function (p) {
var value = (options && options[p]) || defaultOptions[p]
defineReadOnlyProperty(database, p, value)
diff --git a/lib/read-stream.js b/lib/read-stream.js
new file mode 100644
index 0000000..560dd2c
--- /dev/null
+++ b/lib/read-stream.js
@@ -0,0 +1,45 @@
+var Stream = require("stream").Stream
+ , inherits = require("inherits")
+
+ , toEncoding = require('./util').toEncoding
+
+function ReadStream (options, db, iteratorFactory) {
+ Stream.call(this)
+ this._options = options
+ this._status = 'ready'
+
+ var ready = function () {
+ this._iterator = iteratorFactory()
+ this.emit('ready')
+ this._read()
+ }.bind(this)
+
+ if (db.isOpen())
+ process.nextTick(ready)
+ else
+ db.ee.once('ready', ready)
+}
+
+inherits(ReadStream, Stream)
+
+ReadStream.prototype._read = function () {
+ if (this._status == 'ready' || this._status == 'reading') {
+ this._iterator.next(this._onEnd.bind(this), this._onData.bind(this))
+ }
+}
+
+ReadStream.prototype._onData = function (key, value) {
+ if (this._status == 'ready') this._status = 'reading'
+ this.emit('data', {
+ key : toEncoding(key , this._options.keyEncoding || this._options.encoding)
+ , value : toEncoding(value , this._options.valueEncoding || this._options.encoding)
+ })
+ this._read()
+}
+
+ReadStream.prototype._onEnd = function () {
+ this._status = 'ended'
+ this.emit('end')
+}
+
+module.exports = ReadStream
\ No newline at end of file
diff --git a/lib/util.js b/lib/util.js
new file mode 100644
index 0000000..566bd3d
--- /dev/null
+++ b/lib/util.js
@@ -0,0 +1,12 @@
+var toBuffer = function (data, encoding) {
+ return data === undefined || data === null || Buffer.isBuffer(data) ? data : new Buffer('' + data, encoding)
+ }
+
+ , toEncoding = function (buffer, encoding) {
+ return encoding == 'binary' ? buffer : buffer.toString(encoding)
+ }
+
+module.exports = {
+ toBuffer : toBuffer
+ , toEncoding : toEncoding
+}
\ No newline at end of file
diff --git a/lib/write-stream.js b/lib/write-stream.js
new file mode 100644
index 0000000..3d53014
--- /dev/null
+++ b/lib/write-stream.js
@@ -0,0 +1,94 @@
+var Stream = require("stream").Stream
+ , inherits = require("inherits")
+
+ , toEncoding = require('./util').toEncoding
+
+function WriteStream (options, db) {
+ Stream.call(this)
+ this._options = options
+ this._db = db
+ this._buffer = []
+ this._status = 'init'
+ this._end = false
+
+ var ready = function () {
+ this._status = 'ready'
+ this.emit('ready')
+ this._process()
+ }.bind(this)
+
+ if (db.isOpen())
+ process.nextTick(ready)
+ else
+ db.ee.once('ready', ready)
+}
+
+inherits(WriteStream, Stream)
+
+WriteStream.prototype.write = function (data) {
+ this._buffer.push(data)
+ if (this._status != 'init')
+ this._processDelayed()
+ if (this._options.maxBufferLength && this._buffer.length > this._options.maxBufferLength) {
+ this._writeBlock = true
+ return false
+ }
+ return true
+}
+
+WriteStream.prototype._processDelayed = function() {
+ process.nextTick(this._process.bind(this))
+}
+
+WriteStream.prototype._process = function() {
+ var entry
+ , cb = function (err) {
+ if (this._status != 'closed')
+ this._status = 'ready'
+ if (err)
+ return this.emit('error', err)
+ this._process()
+ }.bind(this)
+
+ if (this._status != 'ready') {
+ if (this._buffer.length && this._status != 'closed')
+ this._processDelayed()
+ return
+ }
+
+ if (this._end) {
+ this.emit('close')
+ this._status = 'closed'
+ return
+ }
+
+ if (!this._buffer.length)
+ return
+
+ if (this._buffer.length == 1) {
+ entry = this._buffer.pop()
+ if (entry.key !== undefined && entry.value !== undefined) {
+ this._status = 'writing'
+ this._db.put(entry.key, entry.value, cb)
+ }
+ } else {
+ this._status = 'writing'
+ this._db.batch(this._buffer.map(function (d) {
+ return { type: 'put', key: d.key, value: d.value }
+ }), cb)
+ this._buffer = []
+ }
+ if (this._writeBlock) {
+ this._writeBlock = false
+ this.emit('drain')
+ }
+}
+
+WriteStream.prototype.end = function() {
+ process.nextTick(function () {
+ this._end = true
+ this._process()
+ }.bind(this))
+}
+
+module.exports = WriteStream
\ No newline at end of file
diff --git a/package.json b/package.json
index 6bf3b52..ca6d77f 100644
--- a/package.json
+++ b/package.json
@@ -3,7 +3,8 @@
, "version" : "0.0.0"
, "main" : "lib/levelup.js"
, "dependencies" : {
- "node-errno" : "*"
+ "errno" : "*"
+ , "inherits" : "~1.0.0"
}
, "devDependencies" : {
"buster" : "*"
diff --git a/src/async.cc b/src/async.cc
index d7b678b..4183e79 100644
--- a/src/async.cc
+++ b/src/async.cc
@@ -14,10 +14,17 @@ using namespace v8;
using namespace node;
using namespace leveldb;
-void runCallback (Persistent<Function> callback, Local<Value> argv[], int length);
-
/** ASYNC BASE **/
+void runCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
+ TryCatch try_catch;
+
+ callback->Call(Context::GetCurrent()->Global(), length, argv);
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+}
+
void AsyncWorker::WorkComplete () {
HandleScope scope;
if (status.ok())
@@ -39,100 +46,6 @@ void AsyncWorker::HandleErrorCallback () {
runCallback(callback, argv, 1);
}
-/** OPEN WORKER **/
-
-void OpenWorker::Execute() {
- status = database->OpenDatabase(options, location);
-}
-
-/** CLOSE WORKER **/
-
-void CloseWorker::Execute() {
- database->CloseDatabase();
-}
-
-void CloseWorker::WorkComplete () {
- HandleScope scope;
- HandleOKCallback();
- callback.Dispose();
-}
-
-/** IO WORKER (abstract) **/
-
-void IOWorker::WorkComplete() {
- AsyncWorker::WorkComplete();
- keyPtr.Dispose();
-}
-
-/** WRITE WORKER **/
-
-WriteWorker::~WriteWorker () {
- delete options;
-}
-
-void WriteWorker::Execute() {
- status = database->PutToDatabase(options, key, value);
-}
-
-void WriteWorker::WorkComplete() {
- IOWorker::WorkComplete();
- valuePtr.Dispose();
-}
-
-/** READ WORKER **/
-
-ReadWorker::~ReadWorker () {
- delete options;
-}
-
-void ReadWorker::Execute () {
- status = database->GetFromDatabase(options, key, value);
-}
-
-void ReadWorker::HandleOKCallback () {
- Local<Value> argv[] = {
- Local<Value>::New(Null())
- , Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
- };
- runCallback(callback, argv, 2);
-}
-
-/** DELETE WORKER **/
-
-DeleteWorker::~DeleteWorker () {
- delete options;
-}
-
-void DeleteWorker::Execute() {
- status = database->DeleteFromDatabase(options, key);
-}
-
-/** BATCH WORKER **/
-
-BatchWorker::~BatchWorker () {
- for (unsigned int i = 0; i < operations.size(); i++)
- delete operations[i];
- operations.clear();
-}
-
-void BatchWorker::Execute() {
- WriteBatch batch;
- for (unsigned int i = 0; i < operations.size(); i++)
- operations[i]->Execute(&batch);
- status = database->WriteBatchToDatabase(options, &batch);
-}
-
-/** UTIL **/
-
-void runCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
- TryCatch try_catch;
-
- callback->Call(Context::GetCurrent()->Global(), length, argv);
- if (try_catch.HasCaught()) {
- FatalException(try_catch);
- }
-}
-
void AsyncExecute (uv_work_t* req) {
static_cast<AsyncWorker*>(req->data)->Execute();
}
diff --git a/src/async.h b/src/async.h
index 1537723..6948a90 100644
--- a/src/async.h
+++ b/src/async.h
@@ -5,12 +5,12 @@
#include <vector>
#include <node.h>
-#include "batch.h"
-
using namespace std;
using namespace v8;
using namespace leveldb;
+void runCallback (Persistent<Function> callback, Local<Value> argv[], int length);
+
class AsyncWorker {
public:
AsyncWorker (
@@ -33,158 +33,6 @@ protected:
virtual void HandleErrorCallback ();
};
-class OpenWorker : public AsyncWorker {
-public:
- OpenWorker (
- Database* database
- , Persistent<Function> callback
- , string location
- , bool createIfMissing
- , bool errorIfExists
- ) : AsyncWorker(database, callback)
- , location(location)
- {
- options = new Options();
- options->create_if_missing = createIfMissing;
- options->error_if_exists = errorIfExists;
- };
-
- ~OpenWorker () {
- delete options;
- }
-
- string location;
- Options* options;
- virtual void Execute ();
-};
-
-class CloseWorker : public AsyncWorker {
-public:
- CloseWorker (
- Database* database
- , Persistent<Function> callback
- ) : AsyncWorker(database, callback)
- {};
-
- virtual void Execute ();
-
-private:
- virtual void WorkComplete ();
-};
-
-class IOWorker : public AsyncWorker {
-public:
- IOWorker (
- Database* database
- , Persistent<Function> callback
- , Slice key
- , Persistent<Object> keyPtr
- ) : AsyncWorker(database, callback)
- , key(key)
- , keyPtr(keyPtr)
- {};
-
- virtual void WorkComplete ();
-
-protected:
- Slice key;
- Persistent<Object> keyPtr;
-};
-
-class ReadWorker : public IOWorker {
-public:
- ReadWorker(
- Database* database
- , Persistent<Function> callback
- , Slice key
- , Persistent<Object> keyPtr
- ) : IOWorker(database, callback, key, keyPtr)
- {
- options = new ReadOptions();
- };
-
- ~ReadWorker ();
-
- ReadOptions* options;
- virtual void Execute ();
-
-protected:
- virtual void HandleOKCallback ();
-
-private:
- string value;
-};
-
-class DeleteWorker : public IOWorker {
-public:
- DeleteWorker(
- Database* database
- , Persistent<Function> callback
- , Slice key
- , bool sync
- , Persistent<Object> keyPtr
- ) : IOWorker(database, callback, key, keyPtr)
- {
- options = new WriteOptions();
- options->sync = sync;
- };
-
- ~DeleteWorker ();
-
- virtual void Execute ();
-
-protected:
- WriteOptions* options;
-};
-
-class WriteWorker : public DeleteWorker {
-public:
- WriteWorker (
- Database* database
- , Persistent<Function> callback
- , Slice key
- , Slice value
- , bool sync
- , Persistent<Object> keyPtr
- , Persistent<Object> valuePtr
- ) : DeleteWorker(database, callback, key, sync, keyPtr)
- , value(value)
- , valuePtr(valuePtr)
- {};
-
- ~WriteWorker ();
-
- virtual void Execute ();
- virtual void WorkComplete ();
-
-private:
- Slice value;
- Persistent<Object> valuePtr;
-};
-
-class BatchWorker : public AsyncWorker {
-public:
- BatchWorker(
- Database* database
- , Persistent<Function> callback
- , vector<BatchOp*> operations
- , bool sync
- ) : AsyncWorker(database, callback)
- , operations(operations)
- {
- options = new WriteOptions();
- options->sync = sync;
- };
-
- ~BatchWorker ();
-
- virtual void Execute ();
-
-private:
- WriteOptions* options;
- vector<BatchOp*> operations;
-};
-
void AsyncExecute (uv_work_t* req);
void AsyncExecuteComplete (uv_work_t* req);
void AsyncQueueWorker (AsyncWorker* worker);
diff --git a/src/database.cc b/src/database.cc
index 12b9242..70a3b51 100644
--- a/src/database.cc
+++ b/src/database.cc
@@ -10,7 +10,9 @@
#include "levelup.h"
#include "database.h"
#include "async.h"
+#include "database_async.h"
#include "batch.h"
+#include "iterator.h"
using namespace std;
using namespace v8;
@@ -57,6 +59,10 @@ Status Database::WriteBatchToDatabase (WriteOptions* options, WriteBatch* batch)
return db->Write(*options, batch);
}
+leveldb::Iterator* Database::NewIterator (ReadOptions* options) {
+ return db->NewIterator(*options);
+}
+
void Database::CloseDatabase () {
delete db;
db = NULL;
@@ -67,13 +73,14 @@ Persistent<Function> Database::constructor;
void Database::Init () {
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(String::NewSymbol("Database"));
- tpl->InstanceTemplate()->SetInternalFieldCount(5);
- tpl->PrototypeTemplate()->Set(String::NewSymbol("open") , FunctionTemplate::New(Open)->GetFunction());
- tpl->PrototypeTemplate()->Set(String::NewSymbol("close") , FunctionTemplate::New(Close)->GetFunction());
- tpl->PrototypeTemplate()->Set(String::NewSymbol("put") , FunctionTemplate::New(Put)->GetFunction());
- tpl->PrototypeTemplate()->Set(String::NewSymbol("get") , FunctionTemplate::New(Get)->GetFunction());
- tpl->PrototypeTemplate()->Set(String::NewSymbol("del") , FunctionTemplate::New(Delete)->GetFunction());
- tpl->PrototypeTemplate()->Set(String::NewSymbol("batch") , FunctionTemplate::New(Batch)->GetFunction());
+ tpl->InstanceTemplate()->SetInternalFieldCount(7);
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("open") , FunctionTemplate::New(Open)->GetFunction());
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("close") , FunctionTemplate::New(Close)->GetFunction());
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("put") , FunctionTemplate::New(Put)->GetFunction());
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("get") , FunctionTemplate::New(Get)->GetFunction());
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("del") , FunctionTemplate::New(Delete)->GetFunction());
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("batch") , FunctionTemplate::New(Batch)->GetFunction());
+// tpl->PrototypeTemplate()->Set(String::NewSymbol("iterator") , FunctionTemplate::New(Iterator)->GetFunction());
constructor = Persistent<Function>::New(tpl->GetFunction());
}
@@ -239,6 +246,44 @@ Handle<Value> Database::Batch (const Arguments& args) {
return Undefined();
}
+/*
+Handle<Value> Database::Iterator (const Arguments& args) {
+ HandleScope scope;
+
+ cout << "Database::Iterator" << endl;
+
+ Database* database = ObjectWrap::Unwrap<Database>(args.This());
+ Persistent<Function> dataCallback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
+ Persistent<Function> callback = Persistent<Function>::New(Local<Function>::Cast(args[1]));
+
+ cout << "Database::Iterator making worker" << endl;
+
+ levelup::Iterator iterator = new Iterator(
+ this
+ , callback
+ , dataCallback
+ , NULL
+ , NULL
+ );
+*/
+/*
+ IteratorWorker* worker = new IteratorWorker(
+ database
+ , callback
+ , dataCallback
+ , NULL
+ , NULL
+ );
+
+ cout << "Queueing iterator worker..." << endl;
+
+ AsyncQueueWorker(worker);
+*/
+/*
+ return Undefined();
+}
+*/
+
Handle<Value> CreateDatabase (const Arguments& args) {
HandleScope scope;
return scope.Close(Database::NewInstance(args));
diff --git a/src/database.h b/src/database.h
index 4a3f8a6..b27e897 100644
--- a/src/database.h
+++ b/src/database.h
@@ -18,18 +18,19 @@ struct AsyncDescriptor;
Handle<Value> CreateDatabase (const Arguments& args);
class Database : public node::ObjectWrap {
- public:
+public:
static void Init ();
static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
- Status OpenDatabase (Options* options, string location);
- Status PutToDatabase (WriteOptions* options, Slice key, Slice value);
- Status GetFromDatabase (ReadOptions* options, Slice key, string& value);
- Status DeleteFromDatabase (WriteOptions* options, Slice key);
- Status WriteBatchToDatabase (WriteOptions* options, WriteBatch* batch);
- void CloseDatabase ();
+ Status OpenDatabase (Options* options, string location);
+ Status PutToDatabase (WriteOptions* options, Slice key, Slice value);
+ Status GetFromDatabase (ReadOptions* options, Slice key, string& value);
+ Status DeleteFromDatabase (WriteOptions* options, Slice key);
+ Status WriteBatchToDatabase (WriteOptions* options, WriteBatch* batch);
+ leveldb::Iterator* NewIterator (ReadOptions* options);
+ void CloseDatabase ();
- private:
+private:
Database ();
~Database ();
@@ -37,13 +38,14 @@ class Database : public node::ObjectWrap {
static v8::Persistent<v8::Function> constructor;
- LU_V8_METHOD( New )
- LU_V8_METHOD( Open )
- LU_V8_METHOD( Close )
- LU_V8_METHOD( Put )
- LU_V8_METHOD( Delete )
- LU_V8_METHOD( Get )
- LU_V8_METHOD( Batch )
+ LU_V8_METHOD( New )
+ LU_V8_METHOD( Open )
+ LU_V8_METHOD( Close )
+ LU_V8_METHOD( Put )
+ LU_V8_METHOD( Delete )
+ LU_V8_METHOD( Get )
+ LU_V8_METHOD( Batch )
+// LU_V8_METHOD( Iterator )
};
#endif
\ No newline at end of file
diff --git a/src/database_async.cc b/src/database_async.cc
new file mode 100644
index 0000000..a3e4088
--- /dev/null
+++ b/src/database_async.cc
@@ -0,0 +1,151 @@
+#include <cstdlib>
+#include <node.h>
+#include <node_buffer.h>
+#include <iostream>
+#include <pthread.h>
+
+#include "database.h"
+
+#include "async.h"
+#include "database_async.h"
+#include "batch.h"
+
+using namespace std;
+using namespace v8;
+using namespace node;
+using namespace leveldb;
+
+/** OPEN WORKER **/
+
+void OpenWorker::Execute () {
+ status = database->OpenDatabase(options, location);
+}
+
+/** CLOSE WORKER **/
+
+void CloseWorker::Execute () {
+ database->CloseDatabase();
+}
+
+void CloseWorker::WorkComplete () {
+ HandleScope scope;
+ HandleOKCallback();
+ callback.Dispose();
+}
+
+/** IO WORKER (abstract) **/
+
+void IOWorker::WorkComplete () {
+ AsyncWorker::WorkComplete();
+ keyPtr.Dispose();
+}
+
+/** WRITE WORKER **/
+
+WriteWorker::~WriteWorker () {
+ delete options;
+}
+
+void WriteWorker::Execute () {
+ status = database->PutToDatabase(options, key, value);
+}
+
+void WriteWorker::WorkComplete () {
+ IOWorker::WorkComplete();
+ valuePtr.Dispose();
+}
+
+/** READ WORKER **/
+
+ReadWorker::~ReadWorker () {
+ delete options;
+}
+
+void ReadWorker::Execute () {
+ status = database->GetFromDatabase(options, key, value);
+}
+
+void ReadWorker::HandleOKCallback () {
+ Local<Value> argv[] = {
+ Local<Value>::New(Null())
+ , Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
+ };
+ runCallback(callback, argv, 2);
+}
+
+/** DELETE WORKER **/
+
+DeleteWorker::~DeleteWorker () {
+ delete options;
+}
+
+void DeleteWorker::Execute () {
+ status = database->DeleteFromDatabase(options, key);
+}
+
+/** BATCH WORKER **/
+
+BatchWorker::~BatchWorker () {
+ for (unsigned int i = 0; i < operations.size(); i++)
+ delete operations[i];
+ operations.clear();
+}
+
+void BatchWorker::Execute () {
+ WriteBatch batch;
+ for (unsigned int i = 0; i < operations.size(); i++)
+ operations[i]->Execute(&batch);
+ status = database->WriteBatchToDatabase(options, &batch);
+}
+/*
+IteratorWorker::~IteratorWorker () {
+ delete options;
+}
+
+struct IteratorCallbackChunk {
+ Persistent<Function> callback;
+ Buffer* key;
+ Buffer* value;
+ uv_async_t async;
+};
+
+void dataCallbackProxy (void* ctx, Slice key, Slice value) {
+ IteratorWorker* worker = static_cast<IteratorWorker*>(ctx);
+ worker->DataCallback(key, value);
+}
+
+void handleDataCallback(uv_async_t *handle, int status) {
+cout<<"handleDataCallback..." << endl;
+ IteratorCallbackChunk *chunk = static_cast<IteratorCallbackChunk*>(handle->data);
+ uv_close((uv_handle_t*) &chunk->async, NULL); // necessary otherwise UV will block
+ */
+ /*
+ baton->callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); // call the JS callback method as usual
+ baton->callback.Dispose(); // delete the baton
+ baton->data.Dispose();
+ */
+/* delete chunk;
+}
+
+ */
+/*
+void IteratorWorker::DataCallback (Slice key, Slice value) {
+ cout << "IT cb: " << key.ToString() << " = " << value.ToString() << endl;
+
+ IteratorCallbackChunk* work = new IteratorCallbackChunk();
+ work->async.data = work;
+ work->callback = dataCallback;
+ work->key = Buffer::New((char*)key.data(), key.size());
+ work->value = Buffer::New((char*)value.data(), value.size());
+cout<<"uv_async_send..." << endl;
+ uv_async_init(uv_default_loop(), &work->async, handleDataCallback); // tell UV to call After_cb() async
+ uv_async_send(&work->async);
+}
+
+void IteratorWorker::Execute () {
+//Status Database::Iterator (ReadOptions* options, Slice start, Slice end, void (*callback)(Slice key, Slice value)) {
+
+cout<< "IT execute" << endl;
+ status = database->Iterator(options, start, end, this, &dataCallbackProxy);
+}
+*/
\ No newline at end of file
diff --git a/src/async.h b/src/database_async.h
similarity index 80%
copy from src/async.h
copy to src/database_async.h
index 1537723..0d1e0a0 100644
--- a/src/async.h
+++ b/src/database_async.h
@@ -1,38 +1,16 @@
-#ifndef LU_ASYNC_H
-#define LU_ASYNC_H
+#ifndef LU_DATABASE_ASYNC_H
+#define LU_DATABASE_ASYNC_H
#include <cstdlib>
#include <vector>
#include <node.h>
-
+#include "async.h"
#include "batch.h"
using namespace std;
using namespace v8;
using namespace leveldb;
-class AsyncWorker {
-public:
- AsyncWorker (
- Database* database
- , Persistent<Function> callback
- ) : database(database)
- , callback(callback) {
- request.data = this;
- };
-
- uv_work_t request;
- Database* database;
- Persistent<Function> callback;
- Status status;
- virtual void WorkComplete ();
- virtual void Execute () {};
-
-protected:
- virtual void HandleOKCallback ();
- virtual void HandleErrorCallback ();
-};
-
class OpenWorker : public AsyncWorker {
public:
OpenWorker (
@@ -93,7 +71,7 @@ protected:
class ReadWorker : public IOWorker {
public:
- ReadWorker(
+ ReadWorker (
Database* database
, Persistent<Function> callback
, Slice key
@@ -117,7 +95,7 @@ private:
class DeleteWorker : public IOWorker {
public:
- DeleteWorker(
+ DeleteWorker (
Database* database
, Persistent<Function> callback
, Slice key
@@ -164,7 +142,7 @@ private:
class BatchWorker : public AsyncWorker {
public:
- BatchWorker(
+ BatchWorker (
Database* database
, Persistent<Function> callback
, vector<BatchOp*> operations
@@ -185,8 +163,4 @@ private:
vector<BatchOp*> operations;
};
-void AsyncExecute (uv_work_t* req);
-void AsyncExecuteComplete (uv_work_t* req);
-void AsyncQueueWorker (AsyncWorker* worker);
-
#endif
\ No newline at end of file
diff --git a/src/iterator.cc b/src/iterator.cc
new file mode 100644
index 0000000..993b127
--- /dev/null
+++ b/src/iterator.cc
@@ -0,0 +1,114 @@
+#include <cstdlib>
+#include <node.h>
+#include <node_buffer.h>
+#include <iostream>
+#include <pthread.h>
+
+#include "database.h"
+#include "iterator.h"
+#include "iterator_async.h"
+
+using namespace std;
+using namespace v8;
+using namespace node;
+using namespace levelup;
+
+/*
+void dataCallbackProxy (void* ctx, Slice key, Slice value) {
+ IteratorWorker* worker = static_cast<IteratorWorker*>(ctx);
+ worker->DataCallback(key, value);
+}
+*/
+
+bool levelup::Iterator::GetIterator () {
+ if (dbIterator == NULL) {
+ dbIterator = database->NewIterator(options);
+ if (start != NULL)
+ dbIterator->Seek(*start);
+ else
+ dbIterator->SeekToFirst();
+ return true;
+ }
+ return false;
+}
+
+bool levelup::Iterator::IteratorNext (string& key, string& value) {
+ if (!GetIterator()) dbIterator->Next();
+ if (dbIterator->Valid()) { // && (end == NULL || dbIterator->key().data() < end->data())) {
+ key.assign(dbIterator->key().data(), dbIterator->key().size());
+ value.assign(dbIterator->value().data(), dbIterator->value().size());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void levelup::Iterator::IteratorEnd () {
+ //TODO: could return it->status()
+ delete dbIterator;
+ dbIterator = NULL;
+}
+
+//void *ctx, void (*callback)(void *ctx, Slice key, Slice value)
+Handle<Value> levelup::Iterator::Next (const Arguments& args) {
+ HandleScope scope;
+ Iterator* iterator = ObjectWrap::Unwrap<Iterator>(args.This());
+ Persistent<Function> endCallback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
+ Persistent<Function> dataCallback = Persistent<Function>::New(Local<Function>::Cast(args[1]));
+ NextWorker* worker = new NextWorker(
+ iterator
+ , dataCallback
+ , endCallback
+ );
+ AsyncQueueWorker(worker);
+ return Undefined();
+}
+
+Handle<Value> levelup::Iterator::End (const Arguments& args) {
+ HandleScope scope;
+ Iterator* iterator = ObjectWrap::Unwrap<Iterator>(args.This());
+ Persistent<Function> endCallback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
+ EndWorker* worker = new EndWorker(
+ iterator
+ , endCallback
+ );
+ AsyncQueueWorker(worker);
+ return Undefined();
+}
+
+Persistent<Function> levelup::Iterator::constructor;
+
+void levelup::Iterator::Init () {
+ Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
+ tpl->SetClassName(String::NewSymbol("Iterator"));
+ tpl->InstanceTemplate()->SetInternalFieldCount(2);
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("next") , FunctionTemplate::New(Next)->GetFunction());
+ tpl->PrototypeTemplate()->Set(String::NewSymbol("end") , FunctionTemplate::New(End)->GetFunction());
+ constructor = Persistent<Function>::New(tpl->GetFunction());
+}
+
+Handle<Value> levelup::Iterator::NewInstance (const Arguments& args) {
+ HandleScope scope;
+
+ Handle<Value> argv[1] = {
+ args[0]->ToObject()
+ };
+ Local<Object> instance = constructor->NewInstance(1, argv);
+
+ return scope.Close(instance);
+}
+
+Handle<Value> levelup::Iterator::New (const Arguments& args) {
+ HandleScope scope;
+
+ Database* database = ObjectWrap::Unwrap<Database>(args[0]->ToObject());
+ Iterator* iterator = new Iterator(database, NULL, NULL);
+ iterator->Wrap(args.This());
+
+ return args.This();
+}
+
+Handle<Value> levelup::CreateIterator (const Arguments& args) {
+ HandleScope scope;
+ return scope.Close(levelup::Iterator::NewInstance(args));
+}
\ No newline at end of file
diff --git a/src/iterator.h b/src/iterator.h
new file mode 100644
index 0000000..5adf9e9
--- /dev/null
+++ b/src/iterator.h
@@ -0,0 +1,63 @@
+#ifndef LU_ITERATOR_H
+#define LU_ITERATOR_H
+
+#include <cstdlib>
+#include <node.h>
+
+#include "database.h"
+
+using namespace std;
+using namespace v8;
+using namespace leveldb;
+
+namespace levelup {
+
+Handle<Value> CreateIterator (const Arguments& args);
+
+class Iterator : public node::ObjectWrap {
+public:
+ static void Init ();
+ static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
+
+ bool IteratorNext (string& key, string& value);
+ void IteratorEnd ();
+
+private:
+ Iterator (
+ Database* database
+ , Slice* start
+ , Slice* end
+ ) : database(database)
+ , start(start)
+ , end(end)
+ {
+ options = new ReadOptions();
+ dbIterator = NULL;
+ };
+
+ ~Iterator () {
+ delete options;
+ if (start != NULL)
+ delete start;
+ if (end != NULL)
+ delete end;
+ };
+
+ Database* database;
+ leveldb::Iterator* dbIterator;
+ ReadOptions* options;
+ Slice* start;
+ Slice* end;
+
+ bool GetIterator ();
+
+ static v8::Persistent<v8::Function> constructor;
+
+ LU_V8_METHOD( New )
+ LU_V8_METHOD( Next )
+ LU_V8_METHOD( End )
+};
+
+};
+
+#endif
\ No newline at end of file
diff --git a/src/iterator_async.cc b/src/iterator_async.cc
new file mode 100644
index 0000000..a7dad2e
--- /dev/null
+++ b/src/iterator_async.cc
@@ -0,0 +1,37 @@
+#include <cstdlib>
+#include <node.h>
+#include <node_buffer.h>
+#include <iostream>
+#include <pthread.h>
+
+#include "database.h"
+
+#include "async.h"
+#include "iterator_async.h"
+
+using namespace std;
+using namespace v8;
+using namespace node;
+using namespace leveldb;
+
+void NextWorker::Execute () {
+ ok = iterator->IteratorNext(key, value);
+}
+
+void NextWorker::HandleOKCallback () {
+ if (ok) {
+ Local<Value> argv[] = {
+ Local<Value>::New(Buffer::New((char*)key.data(), key.size())->handle_)
+ , Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
+ };
+ //delete value;
+ runCallback(callback, argv, 2);
+ } else {
+ Local<Value> argv[0];
+ runCallback(endCallback, argv, 0);
+ }
+}
+
+void EndWorker::Execute () {
+ iterator->IteratorEnd();
+}
\ No newline at end of file
diff --git a/src/iterator_async.h b/src/iterator_async.h
new file mode 100644
index 0000000..babae38
--- /dev/null
+++ b/src/iterator_async.h
@@ -0,0 +1,57 @@
+#ifndef LU_ITERATOR_ASYNC_H
+#define LU_ITERATOR_ASYNC_H
+
+#include <cstdlib>
+#include <vector>
+#include <node.h>
+#include "async.h"
+#include "iterator.h"
+
+using namespace std;
+using namespace v8;
+using namespace leveldb;
+
+class NextWorker : public AsyncWorker {
+public:
+ NextWorker (
+ levelup::Iterator* iterator
+ , Persistent<Function> dataCallback
+ , Persistent<Function> endCallback
+ ) : AsyncWorker(database, dataCallback)
+ , iterator(iterator)
+ , endCallback(endCallback)
+ { };
+
+ ~NextWorker () {};
+
+ virtual void Execute ();
+
+protected:
+ virtual void HandleOKCallback ();
+
+private:
+ levelup::Iterator* iterator;
+ Persistent<Function> endCallback;
+ string key;
+ string value;
+ bool ok;
+};
+
+class EndWorker : public AsyncWorker {
+public:
+ EndWorker (
+ levelup::Iterator* iterator
+ , Persistent<Function> endCallback
+ ) : AsyncWorker(database, endCallback)
+ , iterator(iterator)
+ {};
+
+ ~EndWorker () {};
+
+ virtual void Execute ();
+
+private:
+ levelup::Iterator* iterator;
+};
+
+#endif
\ No newline at end of file
diff --git a/src/levelup.cc b/src/levelup.cc
index d96be0f..ae77a5f 100644
--- a/src/levelup.cc
+++ b/src/levelup.cc
@@ -2,13 +2,17 @@
#include "levelup.h"
#include "database.h"
+#include "iterator.h"
using namespace v8;
+using namespace levelup;
void Init (Handle<Object> target) {
Database::Init();
+ levelup::Iterator::Init();
target->Set(String::NewSymbol("createDatabase"), FunctionTemplate::New(CreateDatabase)->GetFunction());
+ target->Set(String::NewSymbol("createIterator"), FunctionTemplate::New(CreateIterator)->GetFunction());
}
NODE_MODULE(levelup, Init)
diff --git a/test/read-stream-test.js b/test/read-stream-test.js
new file mode 100644
index 0000000..9172466
--- /dev/null
+++ b/test/read-stream-test.js
@@ -0,0 +1,95 @@
+/*global cleanUp:true, openTestDatabase:true*/
+
+var buster = require('buster')
+ , assert = buster.assert
+ , levelup = require('../lib/levelup.js')
+ , errors = require('../lib/errors.js')
+ , rimraf = require('rimraf')
+ , async = require('async')
+ , fs = require('fs')
+
+buster.testCase('ReadStream', {
+ 'setUp': function () {
+ this.cleanupDirs = []
+ this.closeableDatabases = []
+ this.openTestDatabase = openTestDatabase.bind(this)
+
+ this.readySpy = this.spy()
+ this.dataSpy = this.spy()
+ this.sourceData = []
+
+ for (var i = 0; i < 10; i++) {
+ this.sourceData.push({
+ type : 'put'
+ , key : i
+ , value : Math.random()
+ })
+ }
+
+ this.verify = function (done) {
+ assert.equals(this.readySpy.callCount, 1, 'ReadStream emitted single "ready" event')
+ assert.equals(this.dataSpy.callCount, this.sourceData.length, 'ReadStream emitted correct number of "data" events')
+ this.sourceData.forEach(function (d, i) {
+ var call = this.dataSpy.getCall(i)
+ if (call) {
+ assert.equals(call.args.length, 1, 'ReadStream "data" event #' + i + ' fired with 1 argument')
+ refute.isNull(call.args[0].key, 'ReadStream "data" event #' + i + ' argument has "key" property')
+ refute.isNull(call.args[0].value, 'ReadStream "data" event #' + i + ' argument has "value" property')
+ assert.equals(call.args[0].key, d.key, 'ReadStream "data" event #' + i + ' argument has correct "key"')
+ assert.equals(call.args[0].value, d.value, 'ReadStream "data" event #' + i + ' argument has correct "value"')
+ }
+ }.bind(this))
+ done()
+ }.bind(this)
+ }
+
+ , 'tearDown': function (done) {
+ cleanUp(this.closeableDatabases, this.cleanupDirs, done)
+ }
+
+ //TODO: test various encodings
+
+ , 'test simple ReadStream': function (done) {
+ this.openTestDatabase(function (db) {
+ // execute
+ db.batch(this.sourceData.slice(), function (err) {
+ refute(err)
+
+ var rs = db.readStream()
+ rs.on('ready' , this.readySpy)
+ rs.on('data' , this.dataSpy)
+ rs.on('end' , this.verify.bind(this, done))
+ }.bind(this))
+ }.bind(this))
+ }
+
+ , 'test delayed open': function (done) {
+ var execute = function () {
+ var db = levelup.createDatabase(
+ this.cleanupDirs[0] = '/tmp/levelup_test_db'
+ , { createIfMissing: true, errorIfExists: false }
+ )
+ this.closeableDatabases.push(db)
+ db.open(function (err) {
+ refute(err)
+
+ var rs = db.readStream()
+ rs.on('ready' , this.readySpy)
+ rs.on('data' , this.dataSpy)
+ rs.on('end' , this.verify.bind(this, done))
+ }.bind(this))
+ }.bind(this)
+
+ // setup -- open db, write stuff to it, close it again so we can reopen it
+ this.openTestDatabase(function (db) {
+ db.batch(this.sourceData.slice(), function (err) {
+ refute(err)
+ db.close(function () {
+ setTimeout(function () {
+ execute()
+ }, 10)
+ })
+ })
+ }.bind(this))
+ }
+})
\ No newline at end of file
diff --git a/test/simple-test.js b/test/simple-test.js
index 6d84994..54130c5 100644
--- a/test/simple-test.js
+++ b/test/simple-test.js
@@ -157,7 +157,7 @@ buster.testCase('Basic API', {
refute(value)
assert.isInstanceOf(err, Error)
assert.isInstanceOf(err, errors.LevelUPError)
- assert.isInstanceOf(err, errors.ReadError)
+ assert.isInstanceOf(err, errors.WriteError)
assert.match(err, /not .*open/)
done()
})
@@ -193,7 +193,7 @@ buster.testCase('Basic API', {
levelup.createDatabase('foobar').del('undefkey', function (err) {
assert.isInstanceOf(err, Error)
assert.isInstanceOf(err, errors.LevelUPError)
- assert.isInstanceOf(err, errors.ReadError)
+ assert.isInstanceOf(err, errors.WriteError)
assert.match(err, /not .*open/)
done()
})
diff --git a/test/write-stream-test.js b/test/write-stream-test.js
new file mode 100644
index 0000000..ed9968a
--- /dev/null
+++ b/test/write-stream-test.js
@@ -0,0 +1,118 @@
+/*global cleanUp:true, openTestDatabase:true*/
+
+var buster = require('buster')
+ , assert = buster.assert
+ , levelup = require('../lib/levelup.js')
+ , errors = require('../lib/errors.js')
+ , rimraf = require('rimraf')
+ , async = require('async')
+ , fs = require('fs')
+
+buster.testCase('WriteStream', {
+ 'setUp': function () {
+ this.cleanupDirs = []
+ this.closeableDatabases = []
+ this.openTestDatabase = openTestDatabase.bind(this)
+ this.timeout = 1000
+
+ this.sourceData = []
+
+ for (var i = 0; i < 10; i++) {
+ this.sourceData.push({
+ type : 'put'
+ , key : i
+ , value : Math.random()
+ })
+ }
+
+ this.verify = function (db, done) {
+ async.forEach(
+ this.sourceData
+ , function (data, callback) {
+ db.get(data.key, function (err, value) {
+ refute(err)
+ assert.equals(value, data.value, 'WriteStream data #' + data.key + ' has correct value')
+ callback()
+ })
+ }
+ , done
+ )
+ }
+ }
+
+ , 'tearDown': function (done) {
+ cleanUp(this.closeableDatabases, this.cleanupDirs, done)
+ }
+
+ //TODO: test various encodings
+
+ , 'test simple WriteStream': function (done) {
+ this.openTestDatabase(function (db) {
+ var ws = db.writeStream()
+ ws.on('error', function (err) {
+ refute(err)
+ })
+ ws.on('close', this.verify.bind(this, db, done))
+ this.sourceData.forEach(function (d) {
+ ws.write(d)
+ })
+ ws.once('ready', ws.end) // end after it's ready, nextTick makes this work OK
+ }.bind(this))
+ }
+
+ , 'test WriteStream with async writes': function (done) {
+ this.openTestDatabase(function (db) {
+ var ws = db.writeStream()
+
+ ws.on('error', function (err) {
+ refute(err)
+ })
+ ws.on('close', this.verify.bind(this, db, done))
+ async.forEachSeries(
+ this.sourceData
+ , function (d, callback) {
+ // some should batch() and some should put()
+ if (d.key % 3) {
+ setTimeout(function () {
+ ws.write(d)
+ callback()
+ }, 10)
+ } else {
+ ws.write(d)
+ callback()
+ }
+ }
+ , function () {
+ ws.end()
+ }
+ )
+ }.bind(this))
+ }
+
+ , 'test delayed open with maxBufferLength': function (done) {
+ var db = levelup.createDatabase(
+ this.cleanupDirs[0] = '/tmp/levelup_test_db'
+ , { createIfMissing: true, errorIfExists: false }
+ )
+ , ws = db.writeStream({ maxBufferLength: 1 })
+
+ this.closeableDatabases.push(db)
+ // should be able to push first element in just fine
+ assert.isTrue(ws.write(this.sourceData[0]))
+ // second element should warn that the buffer isn't being cleared
+ assert.isFalse(ws.write(this.sourceData[1]))
+
+ ws.once('close', this.verify.bind(this, db, done))
+ ws.once('drain', function () {
+ this.sourceData.slice(2).forEach(function (d, i) {
+ assert[i != 0 ? 'isFalse' : 'isTrue'](ws.write(d), 'correct return value for element #' + i)
+ })
+ ws.end()
+ }.bind(this))
+
+ db.open(function (err) {
+ // should lead to a 'drain' event
+ refute(err)
+ })
+ }
+})
\ No newline at end of file
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git
More information about the Pkg-javascript-commits
mailing list