[Pkg-javascript-commits] [node-leveldown] 09/492: iterator -> ReadStream, WriteStream for put/batch

Andrew Kelley andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:38 UTC 2014


This is an automated email from the git hooks/post-receive script.

andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.

commit fb58afe851d3aee0bee910c27270d85cf1e2ffdd
Author: Rod Vagg <rod at vagg.org>
Date:   Sat Aug 11 22:56:14 2012 +1000

    iterator -> ReadStream, WriteStream for put/batch
---
 binding.gyp                       |   3 +
 lib/levelup.js                    | 155 ++++++++++++++++++++++++-------------
 lib/read-stream.js                |  45 +++++++++++
 lib/util.js                       |  12 +++
 lib/write-stream.js               |  94 +++++++++++++++++++++++
 package.json                      |   3 +-
 src/async.cc                      | 105 +++----------------------
 src/async.h                       | 156 +-------------------------------------
 src/database.cc                   |  59 ++++++++++++--
 src/database.h                    |  32 ++++----
 src/database_async.cc             | 151 ++++++++++++++++++++++++++++++++++++
 src/{async.h => database_async.h} |  38 ++--------
 src/iterator.cc                   | 114 ++++++++++++++++++++++++++++
 src/iterator.h                    |  63 +++++++++++++++
 src/iterator_async.cc             |  37 +++++++++
 src/iterator_async.h              |  57 ++++++++++++++
 src/levelup.cc                    |   4 +
 test/read-stream-test.js          |  95 +++++++++++++++++++++++
 test/simple-test.js               |   4 +-
 test/write-stream-test.js         | 118 ++++++++++++++++++++++++++++
 20 files changed, 986 insertions(+), 359 deletions(-)

diff --git a/binding.gyp b/binding.gyp
index bad4981..2feb539 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -43,6 +43,9 @@
                 "src/async.cc"
               , "src/batch.cc"
               , "src/database.cc"
+              , "src/database_async.cc"
+              , "src/iterator.cc"
+              , "src/iterator_async.cc"
               , "src/levelup.cc"
             ]
           , "include_dirs": [
diff --git a/lib/levelup.js b/lib/levelup.js
index bfc5777..93c069a 100644
--- a/lib/levelup.js
+++ b/lib/levelup.js
@@ -1,5 +1,11 @@
-var bridge              = require('../build/Release/levelup')
-  , errors              = require('./errors')
+var bridge       = require('../build/Release/levelup')
+
+  , errors       = require('./errors')
+  , ReadStream   = require('./read-stream')
+  , WriteStream  = require('./write-stream')
+  , toEncoding   = require('./util').toEncoding
+  , toBuffer     = require('./util').toBuffer
+  , EventEmitter = require('events').EventEmitter
 
   , defaultOptions = {
         createIfMissing : false
@@ -7,10 +13,6 @@ var bridge              = require('../build/Release/levelup')
       , encoding        : 'utf8'
     }
 
-  , toString = function () {
-      return "LevelUPDatabase"
-    }
-
   , encodingOpts = (function (enc) {
       var eo = {}
       enc.forEach(function (e) { eo[e] = { encoding: e } })
@@ -38,31 +40,25 @@ var bridge              = require('../build/Release/levelup')
       return callback_
     }
 
-  , toBuffer = function (data, encoding) {
-      return data === undefined || data === null || Buffer.isBuffer(data) ? data : new Buffer('' + data, encoding)
-    }
-
-  , toEncoding = function (buffer, encoding) {
-      return encoding == 'binary' ? buffer : buffer.toString(encoding)
-    }
-
   , Database = {
         open: function (callback) {
           var options = {}
             , execute = function () {
+                var db = bridge.createDatabase()
                 Object.keys(defaultOptions).forEach(function (p) {
                   options[p] = this[p]
                 }.bind(this))
-                this.db = bridge.createDatabase()
-                this.db.open(this.location, options, function (err) {
+                db.open(this.location, options, function (err) {
                   if (err) {
-                    this.db = null
                     err = new errors.OpenError(err)
+                    if (callback)
+                      return callback(err)
+                    this.ee.emit('error', err)
+                  } else {
+                    this.db = db
+                    callback()
+                    this.ee.emit('ready')
                   }
-                  if (callback)
-                    callback.apply(null, err ? [ err ] : [])
-                  else if (err)
-                    throw err
                 }.bind(this))
               }.bind(this)
 
@@ -74,7 +70,10 @@ var bridge              = require('../build/Release/levelup')
 
       , close: function (callback) {
           if (this.isOpen()) {
-            this.db.close(callback)
+            this.db.close(function () {
+              this.ee.emit('closed')
+              callback.apply(null, arguments)
+            }.bind(this))
             this.db = null
           } else {
             callback()
@@ -85,28 +84,35 @@ var bridge              = require('../build/Release/levelup')
           return !!this.db
         }
 
-      , get: function (key, options_, callback_) {
+      , get: function (key_, options_, callback_) {
           var callback = getCallback(options_, callback_)
-            , options
+            , options, key, err
+
           if (this.isOpen()) {
             options  = getOptions(options_, this.encoding)
-            key      = toBuffer(key,   options.keyEncoding   || options.encoding)
+            key      = toBuffer(key_, options.keyEncoding || options.encoding)
             this.db.get(key, options, function (err, value) {
               if (err) {
-                err = new errors.NotFoundError('Key not found in database [' + key + ']')
+                err = new errors.NotFoundError('Key not found in database [' + key_ + ']')
                 if (callback)
                   return callback(err)
                 throw err
               }
-              callback && callback(null, toEncoding(value, options.valueEncoding || options.encoding), key)
+              callback && callback(null, toEncoding(value, options.valueEncoding || options.encoding), key_)
             })
-          } else
-            callback(new errors.ReadError('Database has not been opened'))
+          } else {
+            err = new errors.ReadError('Database has not been opened')
+            if (callback)
+              callback(err)
+            else
+              throw err
+          }
         }
 
       , put: function (key, value, options_, callback_) {
           var callback = getCallback(options_, callback_)
-            , options
+            , options, err
+
           if (this.isOpen()) {
             options  = getOptions(options_, this.encoding)
             key      = toBuffer(key,   options.keyEncoding   || options.encoding)
@@ -116,17 +122,25 @@ var bridge              = require('../build/Release/levelup')
                 err = new errors.WriteError(err)
                 if (callback)
                   return callback(err)
-                throw err
+                this.ee.emit('error', err)
+              } else {
+                this.ee.emit('put', key, value)
+                callback && callback(null, key, value)
               }
-              callback && callback(null, key, value)
-            })
-          } else
-            callback(new errors.ReadError('Database has not been opened'))
+            }.bind(this))
+          } else {
+            err = new errors.WriteError('Database has not been opened')
+            if (callback)
+              callback(err)
+            else
+              throw err
+          }
         }
 
       , del: function (key, options_, callback_) {
           var callback = getCallback(options_, callback_)
-            , options
+            , options, err
+
           if (this.isOpen()) {
             options  = getOptions(options_, this.encoding)
             key      = toBuffer(key,   options.keyEncoding   || options.encoding)
@@ -135,32 +149,44 @@ var bridge              = require('../build/Release/levelup')
                 err = new errors.WriteError(err)
                 if (callback)
                   return callback(err)
-                throw err
+                this.ee.emit('error', err)
+              } else {
+                this.ee.emit('del', key)
+                callback && callback(null, key)
               }
-              callback && callback(null, key)
-            })
-          } else
-            callback(new errors.ReadError('Database has not been opened'))
+            }.bind(this))
+          } else {
+            err = new errors.WriteError('Database has not been opened')
+            if (callback)
+              callback(err)
+            else
+              throw err
+          }
         }
 
       , batch: function (arr, options_, callback_) {
           var callback = getCallback(options_, callback_)
             , empty    = {}
-            , options, keyEncoding, valueEncoding
+            , options, keyEncoding, valueEncoding, err
 
-          if (!this.isOpen())
-            return callback(new errors.ReadError('Database has not been opened'))
+          if (!this.isOpen()) {
+            err = new errors.WriteError('Database has not been opened')
+            if (callback)
+              return callback(err)
+            else
+              throw err
+          }
 
           options       = getOptions(options_, this.encoding)
           keyEncoding   = options.keyEncoding   || options.encoding
           valueEncoding = options.valueEncoding || options.encoding
           arr           = arr.map(function (e) {
-            if (e.type && e.key) {
+            if (e.type !== undefined && e.key !== undefined) {
               var o = {
                   type  : e.type
                 , key   : toBuffer(e.key, keyEncoding)
               }
-              if (e.value)
+              if (e.value !== undefined)
                 o.value = toBuffer(e.value, valueEncoding)
               return o
             }
@@ -171,10 +197,33 @@ var bridge              = require('../build/Release/levelup')
               err = new errors.WriteError(err)
               if (callback)
                 return callback(err)
-              throw err
+              this.ee.emit('error', err)
+            } else {
+              this.ee.emit('batch', arr)
+              callback && callback()
             }
-            callback && callback()
-          })
+          }.bind(this))
+        }
+
+      , readStream: function (options) {
+          return new ReadStream(
+              options || {}
+            , this
+            , function () {
+                return bridge.createIterator(this.db)
+              }.bind(this)
+          )
+        }
+
+      , writeStream: function (options) {
+         return new WriteStream(
+              options || {}
+            , this
+          )
+        }
+
+      , toString: function () {
+          return "LevelUPDatabase"
         }
     }
 
@@ -193,11 +242,13 @@ var bridge              = require('../build/Release/levelup')
       if (typeof location != 'string')
         throw new errors.InitializationError('Must provide a location for the database')
 
-      database = Object.create({ toString: toString })
-      ctx = {}
+      database = new EventEmitter()
+      ctx = {
+          ee       : database
+        , location : location
+      }
 
       defineReadOnlyProperty(database, 'location', location)
-      ctx.location = location
       Object.keys(defaultOptions).forEach(function (p) {
         var value = (options && options[p]) || defaultOptions[p]
         defineReadOnlyProperty(database, p, value)
diff --git a/lib/read-stream.js b/lib/read-stream.js
new file mode 100644
index 0000000..560dd2c
--- /dev/null
+++ b/lib/read-stream.js
@@ -0,0 +1,45 @@
+var Stream     = require("stream").Stream
+  , inherits   = require("inherits")
+
+  , toEncoding = require('./util').toEncoding
+
+function ReadStream (options, db, iteratorFactory) {
+  Stream.call(this)
+  this._options = options
+  this._status  = 'ready'
+
+  var ready = function () {
+    this._iterator = iteratorFactory()
+    this.emit('ready')
+    this._read()
+  }.bind(this)
+
+  if (db.isOpen())
+    process.nextTick(ready)
+  else
+    db.ee.once('ready', ready)
+}
+
+inherits(ReadStream, Stream)
+
+ReadStream.prototype._read = function () {
+  if (this._status == 'ready' || this._status == 'reading') {
+    this._iterator.next(this._onEnd.bind(this), this._onData.bind(this))
+  }
+}
+
+ReadStream.prototype._onData = function (key, value) {
+  if (this._status == 'ready') this._status = 'reading'
+  this.emit('data', {
+      key   : toEncoding(key   , this._options.keyEncoding   || this._options.encoding)
+    , value : toEncoding(value , this._options.valueEncoding || this._options.encoding)
+  })
+  this._read()
+}
+
+ReadStream.prototype._onEnd = function () {
+  this._status = 'ended'
+  this.emit('end')
+}
+
+module.exports = ReadStream
\ No newline at end of file
diff --git a/lib/util.js b/lib/util.js
new file mode 100644
index 0000000..566bd3d
--- /dev/null
+++ b/lib/util.js
@@ -0,0 +1,12 @@
+var toBuffer = function (data, encoding) {
+      return data === undefined || data === null || Buffer.isBuffer(data) ? data : new Buffer('' + data, encoding)
+    }
+
+  , toEncoding = function (buffer, encoding) {
+      return encoding == 'binary' ? buffer : buffer.toString(encoding)
+    }
+
+module.exports = {
+    toBuffer   : toBuffer
+  , toEncoding : toEncoding
+}
\ No newline at end of file
diff --git a/lib/write-stream.js b/lib/write-stream.js
new file mode 100644
index 0000000..3d53014
--- /dev/null
+++ b/lib/write-stream.js
@@ -0,0 +1,94 @@
+var Stream     = require("stream").Stream
+  , inherits   = require("inherits")
+
+  , toEncoding = require('./util').toEncoding
+
+function WriteStream (options, db) {
+  Stream.call(this)
+  this._options = options
+  this._db      = db
+  this._buffer  = []
+  this._status  = 'init'
+  this._end     = false
+
+  var ready = function () {
+    this._status = 'ready'
+    this.emit('ready')
+    this._process()
+  }.bind(this)
+
+  if (db.isOpen())
+    process.nextTick(ready)
+  else
+    db.ee.once('ready', ready)
+}
+
+inherits(WriteStream, Stream)
+
+WriteStream.prototype.write = function (data) {
+  this._buffer.push(data)
+  if (this._status != 'init')
+    this._processDelayed()
+  if (this._options.maxBufferLength && this._buffer.length > this._options.maxBufferLength) {
+    this._writeBlock = true
+    return false
+  }
+  return true
+}
+
+WriteStream.prototype._processDelayed = function() {
+  process.nextTick(this._process.bind(this))
+}
+
+WriteStream.prototype._process = function() {
+  var entry
+    , cb = function (err) {
+        if (this._status != 'closed')
+          this._status = 'ready'
+        if (err)
+          return this.emit('error', err)
+        this._process()
+      }.bind(this)
+
+  if (this._status != 'ready') {
+    if (this._buffer.length && this._status != 'closed')
+      this._processDelayed()
+    return
+  }
+
+  if (this._end) {
+    this.emit('close')
+    this._status = 'closed'
+    return
+  }
+
+  if (!this._buffer.length)
+    return
+
+  if (this._buffer.length == 1) {
+    entry = this._buffer.pop()
+    if (entry.key !== undefined && entry.value !== undefined) {
+      this._status = 'writing'
+      this._db.put(entry.key, entry.value, cb)
+    }
+  } else {
+    this._status = 'writing'
+    this._db.batch(this._buffer.map(function (d) {
+      return { type: 'put', key: d.key, value: d.value }
+    }), cb)
+    this._buffer = []
+  }
+  if (this._writeBlock) {
+    this._writeBlock = false
+    this.emit('drain')
+  }
+}
+
+WriteStream.prototype.end = function() {
+  process.nextTick(function () {
+    this._end = true
+    this._process()
+  }.bind(this))
+}
+
+module.exports = WriteStream
\ No newline at end of file
diff --git a/package.json b/package.json
index 6bf3b52..ca6d77f 100644
--- a/package.json
+++ b/package.json
@@ -3,7 +3,8 @@
   , "version"         : "0.0.0"
   , "main"            : "lib/levelup.js"
   , "dependencies"    : {
-        "node-errno"    : "*"
+        "errno"         : "*"
+      , "inherits"      : "~1.0.0"
     }
   , "devDependencies" : {
         "buster"        : "*"
diff --git a/src/async.cc b/src/async.cc
index d7b678b..4183e79 100644
--- a/src/async.cc
+++ b/src/async.cc
@@ -14,10 +14,17 @@ using namespace v8;
 using namespace node;
 using namespace leveldb;
 
-void runCallback (Persistent<Function> callback, Local<Value> argv[], int length);
-
 /** ASYNC BASE **/
 
+void runCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
+  TryCatch try_catch;
+ 
+  callback->Call(Context::GetCurrent()->Global(), length, argv);
+  if (try_catch.HasCaught()) {
+    FatalException(try_catch);
+  }
+}
+
 void AsyncWorker::WorkComplete () {
   HandleScope scope;
   if (status.ok())
@@ -39,100 +46,6 @@ void AsyncWorker::HandleErrorCallback () {
   runCallback(callback, argv, 1);
 }
 
-/** OPEN WORKER **/
-
-void OpenWorker::Execute() {
-  status = database->OpenDatabase(options, location);
-}
-
-/** CLOSE WORKER **/
-
-void CloseWorker::Execute() {
-  database->CloseDatabase();
-}
-
-void CloseWorker::WorkComplete () {
-  HandleScope scope;
-  HandleOKCallback();
-  callback.Dispose();
-}
-
-/** IO WORKER (abstract) **/
-
-void IOWorker::WorkComplete() {
-  AsyncWorker::WorkComplete();
-  keyPtr.Dispose();
-}
-
-/** WRITE WORKER **/
-
-WriteWorker::~WriteWorker () {
-  delete options;
-}
-
-void WriteWorker::Execute() {
-  status = database->PutToDatabase(options, key, value);
-}
-
-void WriteWorker::WorkComplete() {
-  IOWorker::WorkComplete();
-  valuePtr.Dispose();
-}
-
-/** READ WORKER **/
-
-ReadWorker::~ReadWorker () {
-  delete options;
-}
-
-void ReadWorker::Execute () {
-  status = database->GetFromDatabase(options, key, value);
-}
-
-void ReadWorker::HandleOKCallback () {
-  Local<Value> argv[] = {
-      Local<Value>::New(Null())
-    , Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
-  };
-  runCallback(callback, argv, 2);
-}
-
-/** DELETE WORKER **/
-
-DeleteWorker::~DeleteWorker () {
-  delete options;
-}
-
-void DeleteWorker::Execute() {
-  status = database->DeleteFromDatabase(options, key);
-}
-
-/** BATCH WORKER **/
-
-BatchWorker::~BatchWorker () {
-  for (unsigned int i = 0; i < operations.size(); i++)
-    delete operations[i];
-  operations.clear();
-}
-
-void BatchWorker::Execute() {
-  WriteBatch batch;
-  for (unsigned int i = 0; i < operations.size(); i++)
-    operations[i]->Execute(&batch);
-  status = database->WriteBatchToDatabase(options, &batch);
-}
-
-/** UTIL **/
-
-void runCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
-  TryCatch try_catch;
- 
-  callback->Call(Context::GetCurrent()->Global(), length, argv);
-  if (try_catch.HasCaught()) {
-    FatalException(try_catch);
-  }
-}
-
 void AsyncExecute (uv_work_t* req) {
   static_cast<AsyncWorker*>(req->data)->Execute();
 }
diff --git a/src/async.h b/src/async.h
index 1537723..6948a90 100644
--- a/src/async.h
+++ b/src/async.h
@@ -5,12 +5,12 @@
 #include <vector>
 #include <node.h>
 
-#include "batch.h"
-
 using namespace std;
 using namespace v8;
 using namespace leveldb;
 
+void runCallback (Persistent<Function> callback, Local<Value> argv[], int length);
+
 class AsyncWorker {
 public:
   AsyncWorker (
@@ -33,158 +33,6 @@ protected:
   virtual void         HandleErrorCallback ();
 };
 
-class OpenWorker  : public AsyncWorker {
-public:
-  OpenWorker (
-      Database* database
-    , Persistent<Function> callback
-    , string location
-    , bool createIfMissing
-    , bool errorIfExists
-  ) : AsyncWorker(database, callback)
-    , location(location)
-  {
-    options                    = new Options();
-    options->create_if_missing = createIfMissing;
-    options->error_if_exists   = errorIfExists;
-  };
-
-  ~OpenWorker () {
-    delete options;
-  }
-
-  string               location;
-  Options*             options;
-  virtual void         Execute ();
-};
-
-class CloseWorker : public AsyncWorker {
-public:
-  CloseWorker (
-      Database* database
-    , Persistent<Function> callback
-  ) : AsyncWorker(database, callback)
-  {};
-
-  virtual void         Execute ();
-
-private:
-  virtual void         WorkComplete ();
-};
-
-class IOWorker    : public AsyncWorker {
-public:
-  IOWorker (
-      Database* database
-    , Persistent<Function> callback
-    , Slice key
-    , Persistent<Object> keyPtr
-  ) : AsyncWorker(database, callback)
-    , key(key)
-    , keyPtr(keyPtr)
-  {};
-
-  virtual void         WorkComplete ();
-
-protected:
-  Slice                key;
-  Persistent<Object>   keyPtr;
-};
-
-class ReadWorker : public IOWorker {
-public:
-  ReadWorker(
-      Database* database
-    , Persistent<Function> callback
-    , Slice key
-    , Persistent<Object> keyPtr
-  ) : IOWorker(database, callback, key, keyPtr)
-  {
-    options = new ReadOptions();
-  };
-
-  ~ReadWorker ();
-
-  ReadOptions*         options;
-  virtual void         Execute ();
-
-protected:
-  virtual void         HandleOKCallback ();
-
-private:
-  string               value;
-};
-
-class DeleteWorker : public IOWorker {
-public:
-  DeleteWorker(
-      Database* database
-    , Persistent<Function> callback
-    , Slice key
-    , bool sync
-    , Persistent<Object> keyPtr
-  ) : IOWorker(database, callback, key, keyPtr)
-  {
-    options        = new WriteOptions();
-    options->sync  = sync;
-  };
-
-  ~DeleteWorker ();
-
-  virtual void         Execute ();
-
-protected:
-  WriteOptions*        options;
-};
-
-class WriteWorker : public DeleteWorker {
-public:
-  WriteWorker (
-      Database* database
-    , Persistent<Function> callback
-    , Slice key
-    , Slice value
-    , bool sync
-    , Persistent<Object> keyPtr
-    , Persistent<Object> valuePtr
-  ) : DeleteWorker(database, callback, key, sync, keyPtr)
-    , value(value)
-    , valuePtr(valuePtr)
-  {};
-
-  ~WriteWorker ();
-
-  virtual void         Execute ();
-  virtual void         WorkComplete ();
-
-private:
-  Slice                value;
-  Persistent<Object>   valuePtr;
-};
-
-class BatchWorker : public AsyncWorker {
-public:
-  BatchWorker(
-      Database* database
-    , Persistent<Function> callback
-    , vector<BatchOp*> operations
-    , bool sync
-  ) : AsyncWorker(database, callback)
-    , operations(operations)
-  {
-    options        = new WriteOptions();
-    options->sync  = sync;
-  };
-
-  ~BatchWorker ();
-
-  virtual void         Execute ();
-
-private:
-  WriteOptions*        options;
-  vector<BatchOp*>     operations;
-};
-
 void AsyncExecute (uv_work_t* req);
 void AsyncExecuteComplete (uv_work_t* req);
 void AsyncQueueWorker (AsyncWorker* worker);
diff --git a/src/database.cc b/src/database.cc
index 12b9242..70a3b51 100644
--- a/src/database.cc
+++ b/src/database.cc
@@ -10,7 +10,9 @@
 #include "levelup.h"
 #include "database.h"
 #include "async.h"
+#include "database_async.h"
 #include "batch.h"
+#include "iterator.h"
 
 using namespace std;
 using namespace v8;
@@ -57,6 +59,10 @@ Status Database::WriteBatchToDatabase (WriteOptions* options, WriteBatch* batch)
   return db->Write(*options, batch);
 }
 
+leveldb::Iterator* Database::NewIterator (ReadOptions* options) {
+  return db->NewIterator(*options);
+}
+
 void Database::CloseDatabase () {
   delete db;
   db = NULL;
@@ -67,13 +73,14 @@ Persistent<Function> Database::constructor;
 void Database::Init () {
   Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
   tpl->SetClassName(String::NewSymbol("Database"));
-  tpl->InstanceTemplate()->SetInternalFieldCount(5);
-  tpl->PrototypeTemplate()->Set(String::NewSymbol("open")  , FunctionTemplate::New(Open)->GetFunction());
-  tpl->PrototypeTemplate()->Set(String::NewSymbol("close") , FunctionTemplate::New(Close)->GetFunction());
-  tpl->PrototypeTemplate()->Set(String::NewSymbol("put")   , FunctionTemplate::New(Put)->GetFunction());
-  tpl->PrototypeTemplate()->Set(String::NewSymbol("get")   , FunctionTemplate::New(Get)->GetFunction());
-  tpl->PrototypeTemplate()->Set(String::NewSymbol("del")   , FunctionTemplate::New(Delete)->GetFunction());
-  tpl->PrototypeTemplate()->Set(String::NewSymbol("batch") , FunctionTemplate::New(Batch)->GetFunction());
+  tpl->InstanceTemplate()->SetInternalFieldCount(7);
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("open")     , FunctionTemplate::New(Open)->GetFunction());
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("close")    , FunctionTemplate::New(Close)->GetFunction());
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("put")      , FunctionTemplate::New(Put)->GetFunction());
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("get")      , FunctionTemplate::New(Get)->GetFunction());
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("del")      , FunctionTemplate::New(Delete)->GetFunction());
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("batch")    , FunctionTemplate::New(Batch)->GetFunction());
+//  tpl->PrototypeTemplate()->Set(String::NewSymbol("iterator") , FunctionTemplate::New(Iterator)->GetFunction());
   constructor = Persistent<Function>::New(tpl->GetFunction());
 }
 
@@ -239,6 +246,44 @@ Handle<Value> Database::Batch (const Arguments& args) {
   return Undefined();
 }
 
+/*
+Handle<Value> Database::Iterator (const Arguments& args) {
+  HandleScope scope;
+
+  cout << "Database::Iterator" << endl;
+
+  Database* database = ObjectWrap::Unwrap<Database>(args.This());
+  Persistent<Function> dataCallback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
+  Persistent<Function> callback = Persistent<Function>::New(Local<Function>::Cast(args[1]));
+
+  cout << "Database::Iterator making worker" << endl;
+
+  levelup::Iterator iterator = new Iterator(
+      this
+    , callback
+    , dataCallback
+    , NULL
+    , NULL
+  );
+*/
+/*
+  IteratorWorker* worker = new IteratorWorker(
+      database
+    , callback
+    , dataCallback
+    , NULL
+    , NULL
+  );
+
+  cout << "Queueing iterator worker..." << endl;
+
+  AsyncQueueWorker(worker);
+*/
+/*  
+  return Undefined();
+}
+*/
+
 Handle<Value> CreateDatabase (const Arguments& args) {
   HandleScope scope;
   return scope.Close(Database::NewInstance(args));
diff --git a/src/database.h b/src/database.h
index 4a3f8a6..b27e897 100644
--- a/src/database.h
+++ b/src/database.h
@@ -18,18 +18,19 @@ struct AsyncDescriptor;
 Handle<Value> CreateDatabase (const Arguments& args);
 
 class Database : public node::ObjectWrap {
- public:
+public:
   static void Init ();
   static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
 
-  Status OpenDatabase         (Options* options, string location);
-  Status PutToDatabase        (WriteOptions* options, Slice key, Slice value);
-  Status GetFromDatabase      (ReadOptions* options, Slice key, string& value);
-  Status DeleteFromDatabase   (WriteOptions* options, Slice key);
-  Status WriteBatchToDatabase (WriteOptions* options, WriteBatch* batch);
-  void   CloseDatabase        ();
+  Status OpenDatabase           (Options* options, string location);
+  Status PutToDatabase          (WriteOptions* options, Slice key, Slice value);
+  Status GetFromDatabase        (ReadOptions* options, Slice key, string& value);
+  Status DeleteFromDatabase     (WriteOptions* options, Slice key);
+  Status WriteBatchToDatabase   (WriteOptions* options, WriteBatch* batch);
+  leveldb::Iterator* NewIterator (ReadOptions* options);
+  void   CloseDatabase          ();
 
- private:
+private:
   Database ();
   ~Database ();
 
@@ -37,13 +38,14 @@ class Database : public node::ObjectWrap {
 
   static v8::Persistent<v8::Function> constructor;
 
-  LU_V8_METHOD( New    )
-  LU_V8_METHOD( Open   )
-  LU_V8_METHOD( Close  )
-  LU_V8_METHOD( Put  )
-  LU_V8_METHOD( Delete )
-  LU_V8_METHOD( Get   )
-  LU_V8_METHOD( Batch  )
+  LU_V8_METHOD( New      )
+  LU_V8_METHOD( Open     )
+  LU_V8_METHOD( Close    )
+  LU_V8_METHOD( Put      )
+  LU_V8_METHOD( Delete   )
+  LU_V8_METHOD( Get      )
+  LU_V8_METHOD( Batch    )
+//  LU_V8_METHOD( Iterator )
 };
 
 #endif
\ No newline at end of file
diff --git a/src/database_async.cc b/src/database_async.cc
new file mode 100644
index 0000000..a3e4088
--- /dev/null
+++ b/src/database_async.cc
@@ -0,0 +1,151 @@
+#include <cstdlib>
+#include <node.h>
+#include <node_buffer.h>
+#include <iostream>
+#include <pthread.h>
+
+#include "database.h"
+
+#include "async.h"
+#include "database_async.h"
+#include "batch.h"
+
+using namespace std;
+using namespace v8;
+using namespace node;
+using namespace leveldb;
+
+/** OPEN WORKER **/
+
+void OpenWorker::Execute () {
+  status = database->OpenDatabase(options, location);
+}
+
+/** CLOSE WORKER **/
+
+void CloseWorker::Execute () {
+  database->CloseDatabase();
+}
+
+void CloseWorker::WorkComplete () {
+  HandleScope scope;
+  HandleOKCallback();
+  callback.Dispose();
+}
+
+/** IO WORKER (abstract) **/
+
+void IOWorker::WorkComplete () {
+  AsyncWorker::WorkComplete();
+  keyPtr.Dispose();
+}
+
+/** WRITE WORKER **/
+
+WriteWorker::~WriteWorker () {
+  delete options;
+}
+
+void WriteWorker::Execute () {
+  status = database->PutToDatabase(options, key, value);
+}
+
+void WriteWorker::WorkComplete () {
+  IOWorker::WorkComplete();
+  valuePtr.Dispose();
+}
+
+/** READ WORKER **/
+
+ReadWorker::~ReadWorker () {
+  delete options;
+}
+
+void ReadWorker::Execute () {
+  status = database->GetFromDatabase(options, key, value);
+}
+
+void ReadWorker::HandleOKCallback () {
+  Local<Value> argv[] = {
+      Local<Value>::New(Null())
+    , Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
+  };
+  runCallback(callback, argv, 2);
+}
+
+/** DELETE WORKER **/
+
+DeleteWorker::~DeleteWorker () {
+  delete options;
+}
+
+void DeleteWorker::Execute () {
+  status = database->DeleteFromDatabase(options, key);
+}
+
+/** BATCH WORKER **/
+
+BatchWorker::~BatchWorker () {
+  for (unsigned int i = 0; i < operations.size(); i++)
+    delete operations[i];
+  operations.clear();
+}
+
+void BatchWorker::Execute () {
+  WriteBatch batch;
+  for (unsigned int i = 0; i < operations.size(); i++)
+    operations[i]->Execute(&batch);
+  status = database->WriteBatchToDatabase(options, &batch);
+}
+/*
+IteratorWorker::~IteratorWorker () {
+  delete options;
+}
+
+struct IteratorCallbackChunk {
+ Persistent<Function> callback;
+ Buffer* key;
+ Buffer* value;
+ uv_async_t async;
+};
+
+void dataCallbackProxy (void* ctx, Slice key, Slice value) {
+  IteratorWorker* worker = static_cast<IteratorWorker*>(ctx);
+  worker->DataCallback(key, value);
+}
+
+void handleDataCallback(uv_async_t *handle, int status) {
+cout<<"handleDataCallback..." << endl;
+ IteratorCallbackChunk *chunk = static_cast<IteratorCallbackChunk*>(handle->data);
+ uv_close((uv_handle_t*) &chunk->async, NULL); // necessary otherwise UV will block
+ */
+ /*
+ baton->callback->Call(v8::Context::GetCurrent()->Global(), 1, argv); // call the JS callback method as usual
+ baton->callback.Dispose(); // delete the baton
+ baton->data.Dispose();
+ */
+/* delete chunk;
+}
+
+ */
+/*
+void IteratorWorker::DataCallback (Slice key, Slice value) {
+  cout << "IT cb: " << key.ToString() << " = "  << value.ToString() << endl;
+
+  IteratorCallbackChunk* work = new IteratorCallbackChunk();
+  work->async.data = work;
+  work->callback = dataCallback;
+  work->key = Buffer::New((char*)key.data(), key.size());
+  work->value = Buffer::New((char*)value.data(), value.size());
+cout<<"uv_async_send..." << endl;
+  uv_async_init(uv_default_loop(), &work->async, handleDataCallback); // tell UV to call After_cb() async
+  uv_async_send(&work->async);
+}
+
+void IteratorWorker::Execute () {
+//Status Database::Iterator (ReadOptions* options, Slice start, Slice end, void (*callback)(Slice key, Slice value)) {
+
+cout<< "IT execute" << endl;
+  status = database->Iterator(options, start, end, this, &dataCallbackProxy);
+}
+*/
\ No newline at end of file
diff --git a/src/async.h b/src/database_async.h
similarity index 80%
copy from src/async.h
copy to src/database_async.h
index 1537723..0d1e0a0 100644
--- a/src/async.h
+++ b/src/database_async.h
@@ -1,38 +1,16 @@
-#ifndef LU_ASYNC_H
-#define LU_ASYNC_H
+#ifndef LU_DATABASE_ASYNC_H
+#define LU_DATABASE_ASYNC_H
 
 #include <cstdlib>
 #include <vector>
 #include <node.h>
-
+#include "async.h"
 #include "batch.h"
 
 using namespace std;
 using namespace v8;
 using namespace leveldb;
 
-class AsyncWorker {
-public:
-  AsyncWorker (
-      Database* database
-    , Persistent<Function> callback
-  ) : database(database)
-    , callback(callback) {
-        request.data               = this;
-      };
-
-  uv_work_t            request;
-  Database*            database;
-  Persistent<Function> callback;
-  Status               status;
-  virtual void         WorkComplete ();
-  virtual void         Execute () {};
-
-protected:
-  virtual void         HandleOKCallback ();
-  virtual void         HandleErrorCallback ();
-};
-
 class OpenWorker  : public AsyncWorker {
 public:
   OpenWorker (
@@ -93,7 +71,7 @@ protected:
 
 class ReadWorker : public IOWorker {
 public:
-  ReadWorker(
+  ReadWorker (
       Database* database
     , Persistent<Function> callback
     , Slice key
@@ -117,7 +95,7 @@ private:
 
 class DeleteWorker : public IOWorker {
 public:
-  DeleteWorker(
+  DeleteWorker (
       Database* database
     , Persistent<Function> callback
     , Slice key
@@ -164,7 +142,7 @@ private:
 
 class BatchWorker : public AsyncWorker {
 public:
-  BatchWorker(
+  BatchWorker (
       Database* database
     , Persistent<Function> callback
     , vector<BatchOp*> operations
@@ -185,8 +163,4 @@ private:
   vector<BatchOp*>     operations;
 };
 
-void AsyncExecute (uv_work_t* req);
-void AsyncExecuteComplete (uv_work_t* req);
-void AsyncQueueWorker (AsyncWorker* worker);
-
 #endif
\ No newline at end of file
diff --git a/src/iterator.cc b/src/iterator.cc
new file mode 100644
index 0000000..993b127
--- /dev/null
+++ b/src/iterator.cc
@@ -0,0 +1,114 @@
+#include <cstdlib>
+#include <node.h>
+#include <node_buffer.h>
+#include <iostream>
+#include <pthread.h>
+
+#include "database.h"
+#include "iterator.h"
+#include "iterator_async.h"
+
+using namespace std;
+using namespace v8;
+using namespace node;
+using namespace levelup;
+
+/*
+void dataCallbackProxy (void* ctx, Slice key, Slice value) {
+  IteratorWorker* worker = static_cast<IteratorWorker*>(ctx);
+  worker->DataCallback(key, value);
+}
+*/
+
+bool levelup::Iterator::GetIterator () {
+  if (dbIterator == NULL) {
+    dbIterator = database->NewIterator(options);
+    if (start != NULL)
+      dbIterator->Seek(*start);
+    else
+      dbIterator->SeekToFirst();
+    return true;
+  }
+  return false;
+}
+
+bool levelup::Iterator::IteratorNext (string& key, string& value) {
+  if (!GetIterator()) dbIterator->Next();
+  if (dbIterator->Valid()) { // && (end == NULL || dbIterator->key().data() < end->data())) {
+    key.assign(dbIterator->key().data(), dbIterator->key().size());
+    value.assign(dbIterator->value().data(), dbIterator->value().size());
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void levelup::Iterator::IteratorEnd () {
+  //TODO: could return it->status()
+  delete dbIterator;
+  dbIterator = NULL;
+}
+
+//void *ctx, void (*callback)(void *ctx, Slice key, Slice value)
+Handle<Value> levelup::Iterator::Next (const Arguments& args) {
+  HandleScope scope;
+  Iterator* iterator = ObjectWrap::Unwrap<Iterator>(args.This());
+  Persistent<Function> endCallback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
+  Persistent<Function> dataCallback = Persistent<Function>::New(Local<Function>::Cast(args[1]));
+  NextWorker* worker = new NextWorker(
+      iterator
+    , dataCallback
+    , endCallback
+  );
+  AsyncQueueWorker(worker);
+  return Undefined();
+}
+
+Handle<Value> levelup::Iterator::End (const Arguments& args) {
+  HandleScope scope;
+  Iterator* iterator = ObjectWrap::Unwrap<Iterator>(args.This());
+  Persistent<Function> endCallback = Persistent<Function>::New(Local<Function>::Cast(args[0]));
+  EndWorker* worker = new EndWorker(
+      iterator
+    , endCallback
+  );
+  AsyncQueueWorker(worker);
+  return Undefined();
+}
+
+Persistent<Function> levelup::Iterator::constructor;
+
+void levelup::Iterator::Init () {
+  Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
+  tpl->SetClassName(String::NewSymbol("Iterator"));
+  tpl->InstanceTemplate()->SetInternalFieldCount(2);
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("next") , FunctionTemplate::New(Next)->GetFunction());
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("end")  , FunctionTemplate::New(End)->GetFunction());
+  constructor = Persistent<Function>::New(tpl->GetFunction());
+}
+
+Handle<Value> levelup::Iterator::NewInstance (const Arguments& args) {
+  HandleScope scope;
+
+  Handle<Value> argv[1] = {
+      args[0]->ToObject()
+  };
+  Local<Object> instance = constructor->NewInstance(1, argv);
+
+  return scope.Close(instance);
+}
+
+Handle<Value> levelup::Iterator::New (const Arguments& args) {
+  HandleScope scope;
+
+  Database* database = ObjectWrap::Unwrap<Database>(args[0]->ToObject());
+  Iterator* iterator = new Iterator(database, NULL, NULL);
+  iterator->Wrap(args.This());
+
+  return args.This();
+}
+
+Handle<Value> levelup::CreateIterator (const Arguments& args) {
+  HandleScope scope;
+  return scope.Close(levelup::Iterator::NewInstance(args));
+}
\ No newline at end of file
diff --git a/src/iterator.h b/src/iterator.h
new file mode 100644
index 0000000..5adf9e9
--- /dev/null
+++ b/src/iterator.h
@@ -0,0 +1,63 @@
+#ifndef LU_ITERATOR_H
+#define LU_ITERATOR_H
+
+#include <cstdlib>
+#include <node.h>
+
+#include "database.h"
+
+using namespace std;
+using namespace v8;
+using namespace leveldb;
+
+namespace levelup {
+
+Handle<Value> CreateIterator (const Arguments& args);
+
+class Iterator : public node::ObjectWrap {
+public:
+  static void Init  ();
+  static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
+
+  bool IteratorNext (string& key, string& value);
+  void IteratorEnd  ();
+
+private:
+  Iterator (
+      Database*            database
+    , Slice*               start
+    , Slice*               end
+  ) : database(database)
+    , start(start)
+    , end(end)
+  {
+    options = new ReadOptions();
+    dbIterator = NULL;
+  };
+
+  ~Iterator () {
+    delete options;
+    if (start != NULL)
+      delete start;
+    if (end != NULL)
+      delete end;
+  };
+
+  Database*            database;
+  leveldb::Iterator*   dbIterator;
+  ReadOptions*         options;
+  Slice*               start;
+  Slice*               end;
+
+  bool GetIterator ();
+
+  static v8::Persistent<v8::Function> constructor;
+
+  LU_V8_METHOD( New  )
+  LU_V8_METHOD( Next )
+  LU_V8_METHOD( End  )
+};
+
+};
+
+#endif
\ No newline at end of file
diff --git a/src/iterator_async.cc b/src/iterator_async.cc
new file mode 100644
index 0000000..a7dad2e
--- /dev/null
+++ b/src/iterator_async.cc
@@ -0,0 +1,37 @@
+#include <cstdlib>
+#include <node.h>
+#include <node_buffer.h>
+#include <iostream>
+#include <pthread.h>
+
+#include "database.h"
+
+#include "async.h"
+#include "iterator_async.h"
+
+using namespace std;
+using namespace v8;
+using namespace node;
+using namespace leveldb;
+
+void NextWorker::Execute () {
+  ok = iterator->IteratorNext(key, value);
+}
+
+void NextWorker::HandleOKCallback () {
+  if (ok) {
+    Local<Value> argv[] = {
+        Local<Value>::New(Buffer::New((char*)key.data(), key.size())->handle_)
+      , Local<Value>::New(Buffer::New((char*)value.data(), value.size())->handle_)
+    };
+    //delete value;
+    runCallback(callback, argv, 2);
+  } else {
+    Local<Value> argv[0];
+    runCallback(endCallback, argv, 0);
+  }
+}
+
+void EndWorker::Execute () {
+  iterator->IteratorEnd();
+}
\ No newline at end of file
diff --git a/src/iterator_async.h b/src/iterator_async.h
new file mode 100644
index 0000000..babae38
--- /dev/null
+++ b/src/iterator_async.h
@@ -0,0 +1,57 @@
+#ifndef LU_ITERATOR_ASYNC_H
+#define LU_ITERATOR_ASYNC_H
+
+#include <cstdlib>
+#include <vector>
+#include <node.h>
+#include "async.h"
+#include "iterator.h"
+
+using namespace std;
+using namespace v8;
+using namespace leveldb;
+
+class NextWorker  : public AsyncWorker {
+public:
+  NextWorker (
+      levelup::Iterator*   iterator
+    , Persistent<Function> dataCallback
+    , Persistent<Function> endCallback
+  ) : AsyncWorker(database, dataCallback)
+    , iterator(iterator)
+    , endCallback(endCallback)
+  { };
+
+  ~NextWorker () {};
+
+  virtual void         Execute ();
+
+protected:
+  virtual void         HandleOKCallback ();
+
+private:
+  levelup::Iterator*   iterator;
+  Persistent<Function> endCallback;
+  string               key;
+  string               value;
+  bool                 ok;
+};
+
+class EndWorker  : public AsyncWorker {
+public:
+  EndWorker (
+      levelup::Iterator* iterator
+    , Persistent<Function> endCallback
+  ) : AsyncWorker(database, endCallback)
+    , iterator(iterator)
+  {};
+
+  ~EndWorker () {};
+
+  virtual void         Execute ();
+
+private:
+  levelup::Iterator*   iterator;
+};
+
+#endif
\ No newline at end of file
diff --git a/src/levelup.cc b/src/levelup.cc
index d96be0f..ae77a5f 100644
--- a/src/levelup.cc
+++ b/src/levelup.cc
@@ -2,13 +2,17 @@
 
 #include "levelup.h"
 #include "database.h"
+#include "iterator.h"
 
 using namespace v8;
+using namespace levelup;
 
 void Init (Handle<Object> target) {
   Database::Init();
+  levelup::Iterator::Init();
 
   target->Set(String::NewSymbol("createDatabase"), FunctionTemplate::New(CreateDatabase)->GetFunction());
+  target->Set(String::NewSymbol("createIterator"), FunctionTemplate::New(CreateIterator)->GetFunction());
 }
 
 NODE_MODULE(levelup, Init)
diff --git a/test/read-stream-test.js b/test/read-stream-test.js
new file mode 100644
index 0000000..9172466
--- /dev/null
+++ b/test/read-stream-test.js
@@ -0,0 +1,95 @@
+/*global cleanUp:true, openTestDatabase:true*/
+
+var buster  = require('buster')
+  , assert  = buster.assert
+  , levelup = require('../lib/levelup.js')
+  , errors  = require('../lib/errors.js')
+  , rimraf  = require('rimraf')
+  , async   = require('async')
+  , fs      = require('fs')
+
+buster.testCase('ReadStream', {
+    'setUp': function () {
+      this.cleanupDirs = []
+      this.closeableDatabases = []
+      this.openTestDatabase = openTestDatabase.bind(this)
+
+      this.readySpy   = this.spy()
+      this.dataSpy    = this.spy()
+      this.sourceData = []
+
+      for (var i = 0; i < 10; i++) {
+        this.sourceData.push({
+            type  : 'put'
+          , key   : i
+          , value : Math.random()
+        })
+      }
+
+      this.verify = function (done) {
+        assert.equals(this.readySpy.callCount, 1, 'ReadStream emitted single "ready" event')
+        assert.equals(this.dataSpy.callCount, this.sourceData.length, 'ReadStream emitted correct number of "data" events')
+        this.sourceData.forEach(function (d, i) {
+          var call = this.dataSpy.getCall(i)
+          if (call) {
+            assert.equals(call.args.length, 1, 'ReadStream "data" event #' + i + ' fired with 1 argument')
+            refute.isNull(call.args[0].key, 'ReadStream "data" event #' + i + ' argument has "key" property')
+            refute.isNull(call.args[0].value, 'ReadStream "data" event #' + i + ' argument has "value" property')
+            assert.equals(call.args[0].key, d.key, 'ReadStream "data" event #' + i + ' argument has correct "key"')
+            assert.equals(call.args[0].value, d.value, 'ReadStream "data" event #' + i + ' argument has correct "value"')
+          }
+        }.bind(this))
+        done()
+      }.bind(this)
+    }
+
+  , 'tearDown': function (done) {
+      cleanUp(this.closeableDatabases, this.cleanupDirs, done)
+    }
+
+  //TODO: test various encodings
+
+  , 'test simple ReadStream': function (done) {
+      this.openTestDatabase(function (db) {
+        // execute
+        db.batch(this.sourceData.slice(), function (err) {
+          refute(err)
+
+          var rs = db.readStream()
+          rs.on('ready' , this.readySpy)
+          rs.on('data'  , this.dataSpy)
+          rs.on('end'   , this.verify.bind(this, done))
+        }.bind(this))
+      }.bind(this))
+    }
+
+  , 'test delayed open': function (done) {
+      var execute = function () {
+        var db = levelup.createDatabase(
+                this.cleanupDirs[0] = '/tmp/levelup_test_db'
+              , { createIfMissing: true, errorIfExists: false }
+            )
+        this.closeableDatabases.push(db)
+        db.open(function (err) {
+          refute(err)
+
+          var rs = db.readStream()
+          rs.on('ready' , this.readySpy)
+          rs.on('data'  , this.dataSpy)
+          rs.on('end'   , this.verify.bind(this, done))
+        }.bind(this))
+      }.bind(this)
+
+      // setup -- open db, write stuff to it, close it again so we can reopen it
+      this.openTestDatabase(function (db) {
+        db.batch(this.sourceData.slice(), function (err) {
+          refute(err)
+          db.close(function () {
+            setTimeout(function () {
+              execute()
+            }, 10)
+          })
+        })
+      }.bind(this))
+    }
+})
\ No newline at end of file
diff --git a/test/simple-test.js b/test/simple-test.js
index 6d84994..54130c5 100644
--- a/test/simple-test.js
+++ b/test/simple-test.js
@@ -157,7 +157,7 @@ buster.testCase('Basic API', {
             refute(value)
             assert.isInstanceOf(err, Error)
             assert.isInstanceOf(err, errors.LevelUPError)
-            assert.isInstanceOf(err, errors.ReadError)
+            assert.isInstanceOf(err, errors.WriteError)
             assert.match(err, /not .*open/)
             done()
           })
@@ -193,7 +193,7 @@ buster.testCase('Basic API', {
           levelup.createDatabase('foobar').del('undefkey', function (err) {
             assert.isInstanceOf(err, Error)
             assert.isInstanceOf(err, errors.LevelUPError)
-            assert.isInstanceOf(err, errors.ReadError)
+            assert.isInstanceOf(err, errors.WriteError)
             assert.match(err, /not .*open/)
             done()
           })
diff --git a/test/write-stream-test.js b/test/write-stream-test.js
new file mode 100644
index 0000000..ed9968a
--- /dev/null
+++ b/test/write-stream-test.js
@@ -0,0 +1,118 @@
+/*global cleanUp:true, openTestDatabase:true*/
+
+var buster  = require('buster')
+  , assert  = buster.assert
+  , levelup = require('../lib/levelup.js')
+  , errors  = require('../lib/errors.js')
+  , rimraf  = require('rimraf')
+  , async   = require('async')
+  , fs      = require('fs')
+
+buster.testCase('WriteStream', {
+    'setUp': function () {
+      this.cleanupDirs = []
+      this.closeableDatabases = []
+      this.openTestDatabase = openTestDatabase.bind(this)
+      this.timeout = 1000
+
+      this.sourceData = []
+
+      for (var i = 0; i < 10; i++) {
+        this.sourceData.push({
+            type  : 'put'
+          , key   : i
+          , value : Math.random()
+        })
+      }
+
+      this.verify = function (db, done) {
+        async.forEach(
+            this.sourceData
+          , function (data, callback) {
+              db.get(data.key, function (err, value) {
+                refute(err)
+                assert.equals(value, data.value, 'WriteStream data #' + data.key + ' has correct value')
+                callback()
+              })
+            }
+          , done
+        )
+      }
+    }
+
+  , 'tearDown': function (done) {
+      cleanUp(this.closeableDatabases, this.cleanupDirs, done)
+    }
+
+  //TODO: test various encodings
+
+  , 'test simple WriteStream': function (done) {
+      this.openTestDatabase(function (db) {
+        var ws = db.writeStream()
+        ws.on('error', function (err) {
+          refute(err)
+        })
+        ws.on('close', this.verify.bind(this, db, done))
+        this.sourceData.forEach(function (d) {
+          ws.write(d)
+        })
+        ws.once('ready', ws.end) // end after it's ready, nextTick makes this work OK
+      }.bind(this))
+    }
+
+  , 'test WriteStream with async writes': function (done) {
+      this.openTestDatabase(function (db) {
+        var ws = db.writeStream()
+
+        ws.on('error', function (err) {
+          refute(err)
+        })
+        ws.on('close', this.verify.bind(this, db, done))
+        async.forEachSeries(
+            this.sourceData
+          , function (d, callback) {
+              // some should batch() and some should put()
+              if (d.key % 3) {
+                setTimeout(function () {
+                  ws.write(d)
+                  callback()
+                }, 10)
+              } else {
+                ws.write(d)
+                callback()
+              }
+            }
+          , function () {
+              ws.end()
+            }
+        )
+      }.bind(this))
+    }
+
+  , 'test delayed open with maxBufferLength': function (done) {
+      var db = levelup.createDatabase(
+              this.cleanupDirs[0] = '/tmp/levelup_test_db'
+            , { createIfMissing: true, errorIfExists: false }
+          )
+        , ws = db.writeStream({ maxBufferLength: 1 })
+
+      this.closeableDatabases.push(db)
+      // should be able to push first element in just fine
+      assert.isTrue(ws.write(this.sourceData[0]))
+      // second element should warn that the buffer isn't being cleared
+      assert.isFalse(ws.write(this.sourceData[1]))
+
+      ws.once('close', this.verify.bind(this, db, done))
+      ws.once('drain', function () {
+        this.sourceData.slice(2).forEach(function (d, i) {
+          assert[i != 0 ? 'isFalse' : 'isTrue'](ws.write(d), 'correct return value for element #' + i)
+        })
+        ws.end()
+      }.bind(this))
+
+      db.open(function (err) {
+        // should lead to a 'drain' event
+        refute(err)
+      })
+    }
+})
\ No newline at end of file

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git



More information about the Pkg-javascript-commits mailing list