[Pkg-javascript-commits] [node-leveldown] 04/492: batch writes!

Andrew Kelley andrewrk-guest at moszumanska.debian.org
Sun Jul 6 17:13:38 UTC 2014


This is an automated email from the git hooks/post-receive script.

andrewrk-guest pushed a commit to annotated tag rocksdb-0.10.1
in repository node-leveldown.

commit a3e5388531b377677fa07fb87e20900f96bd31bd
Author: Rod Vagg <rod at vagg.org>
Date:   Thu Jul 12 16:09:25 2012 +1000

    batch writes!
---
 binding.gyp         |   5 ++-
 lib/levelup.js      |  15 +++++++
 src/async.cc        |  27 ++++++++++++
 src/async.h         |  15 +++++++
 src/batch.cc        |  24 +++++++++++
 src/batch.h         |  33 +++++++++++++++
 src/database.cc     |  53 +++++++++++++++++++++++
 src/database.h      |  10 +++--
 src/levelup.h       |   3 ++
 test/simple-test.js | 120 ++++++++++++++++++++++++++++++++++++++++++++--------
 10 files changed, 281 insertions(+), 24 deletions(-)

diff --git a/binding.gyp b/binding.gyp
index 032df9c..bad4981 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -40,9 +40,10 @@
         {
             "target_name": "levelup"
           , "sources": [
-                "src/levelup.cc"
+                "src/async.cc"
+              , "src/batch.cc"
               , "src/database.cc"
-              , "src/async.cc"
+              , "src/levelup.cc"
             ]
           , "include_dirs": [
                 "<(module_root_dir)/deps/leveldb-1.5.0/include/"
diff --git a/lib/levelup.js b/lib/levelup.js
index f806afd..9c2bfdf 100644
--- a/lib/levelup.js
+++ b/lib/levelup.js
@@ -93,6 +93,21 @@ var bridge              = require('../build/Release/levelup')
           } else
             callback(new errors.ReadError('Database has not been opened'))
         }
+
+      , batch: function (arr, callback) {
+          if (this.isOpen()) {
+            this.db.batch(arr, function (err) {
+              if (err) {
+                err = new errors.WriteError(err)
+                if (callback)
+                  return callback(err)
+                throw err
+              }
+              callback && callback()
+            })
+          } else
+            callback(new errors.ReadError('Database has not been opened'))
+        }
     }
 
   , defineReadOnlyProperty = function (obj, key, value) {
diff --git a/src/async.cc b/src/async.cc
index 5dad5f5..c50c5e5 100644
--- a/src/async.cc
+++ b/src/async.cc
@@ -4,7 +4,9 @@
 #include <pthread.h>
 
 #include "database.h"
+
 #include "async.h"
+#include "batch.h"
 
 using namespace std;
 using namespace v8;
@@ -139,6 +141,31 @@ void DeleteWorker::Execute() {
   status = database->DeleteFromDatabase(options, key);
 }
 
+/** BATCH WORKER **/
+
+BatchWorker::BatchWorker  (Database* database, Persistent<Function> callback, vector<BatchOp*> operations, bool sync) {
+  request.data     = this;
+  this->database   = database;
+  this->callback   = callback;
+  this->operations = operations;
+  options          = new WriteOptions();
+  options->sync    = sync;
+}
+
+BatchWorker::~BatchWorker () {
+  for (unsigned int i = 0; i < operations.size(); i++)
+    delete operations[i];
+  operations.clear();
+}
+
+
+void BatchWorker::Execute() {
+  WriteBatch batch;
+  for (unsigned int i = 0; i < operations.size(); i++)
+    operations[i]->Execute(&batch);
+  status = database->WriteBatchToDatabase(options, &batch);
+}
+
 /** UTIL **/
 
 void runCallback (Persistent<Function> callback, Local<Value> argv[], int length) {
diff --git a/src/async.h b/src/async.h
index 8093dde..ac81ca1 100644
--- a/src/async.h
+++ b/src/async.h
@@ -2,8 +2,11 @@
 #define LU_ASYNC_H
 
 #include <cstdlib>
+#include <vector>
 #include <node.h>
 
+#include "batch.h"
+
 using namespace std;
 using namespace v8;
 using namespace leveldb;
@@ -78,6 +81,18 @@ public:
   virtual void         Execute ();
 };
 
+class BatchWorker : public AsyncWorker {
+public:
+  BatchWorker  (Database* database, Persistent<Function> callback, vector<BatchOp*> operations, bool sync);
+  ~BatchWorker ();
+
+  virtual void         Execute ();
+
+private:
+  WriteOptions*        options;
+  vector<BatchOp*>     operations;
+};
+
 void AsyncExecute (uv_work_t* req);
 void AsyncExecuteComplete (uv_work_t* req);
 void AsyncQueueWorker (AsyncWorker* worker);
diff --git a/src/batch.cc b/src/batch.cc
new file mode 100644
index 0000000..8ed7707
--- /dev/null
+++ b/src/batch.cc
@@ -0,0 +1,24 @@
+#include <cstdlib>
+#include <iostream>
+
+#include "leveldb/write_batch.h"
+
+#include "batch.h"
+
+using namespace std;
+
+BatchDelete::BatchDelete (string key) {
+  this->key   = key;
+}
+
+void BatchDelete::Execute (WriteBatch* batch) {
+  batch->Delete(key);
+}
+
+BatchWrite::BatchWrite (string key, string value) : BatchDelete (key) {
+  this->value = value;
+}
+
+void BatchWrite::Execute (WriteBatch* batch) {
+  batch->Put(key, value);
+}
\ No newline at end of file
diff --git a/src/batch.h b/src/batch.h
new file mode 100644
index 0000000..0c0b2c4
--- /dev/null
+++ b/src/batch.h
@@ -0,0 +1,33 @@
+#ifndef LU_BATCH_H
+#define LU_BATCH_H
+
+#include <cstdlib>
+
+#include "database.h"
+#include "leveldb/write_batch.h"
+
+class BatchOp {
+public:
+  BatchOp () {};
+  virtual void Execute (WriteBatch* batch);
+};
+
+class BatchDelete : public BatchOp {
+public:
+  BatchDelete (string key);
+  virtual void Execute (WriteBatch* batch);
+
+protected:
+  string key;
+};
+
+class BatchWrite : public BatchDelete {
+public:
+  BatchWrite (string key, string value);
+  virtual void Execute (WriteBatch* batch);
+
+private:
+  string value;
+};
+
+#endif
\ No newline at end of file
diff --git a/src/database.cc b/src/database.cc
index 4490a86..267924f 100644
--- a/src/database.cc
+++ b/src/database.cc
@@ -1,4 +1,5 @@
 #include <cstdlib>
+#include <vector>
 #include <node.h>
 #include <iostream>
 #include <pthread.h>
@@ -8,6 +9,7 @@
 #include "levelup.h"
 #include "database.h"
 #include "async.h"
+#include "batch.h"
 
 using namespace std;
 using namespace v8;
@@ -17,6 +19,11 @@ using namespace leveldb;
 LU_OPTION ( createIfMissing ); // for open()
 LU_OPTION ( errorIfExists   ); // for open()
 LU_OPTION ( sync            ); // for write() and delete()
+LU_STR    ( key );
+LU_STR    ( value );
+LU_STR    ( type );
+LU_STR    ( del );
+LU_STR    ( put );
 
 Database::Database () {
   db = NULL;
@@ -45,6 +52,10 @@ Status Database::DeleteFromDatabase (WriteOptions* options, string key) {
   return db->Delete(*options, key);
 }
 
+Status Database::WriteBatchToDatabase (WriteOptions* options, WriteBatch* batch) {
+  return db->Write(*options, batch);
+}
+
 void Database::CloseDatabase () {
   delete db;
   db = NULL;
@@ -61,6 +72,7 @@ void Database::Init () {
   tpl->PrototypeTemplate()->Set(String::NewSymbol("put")   , FunctionTemplate::New(Write)->GetFunction());
   tpl->PrototypeTemplate()->Set(String::NewSymbol("get")   , FunctionTemplate::New(Read)->GetFunction());
   tpl->PrototypeTemplate()->Set(String::NewSymbol("del")   , FunctionTemplate::New(Delete)->GetFunction());
+  tpl->PrototypeTemplate()->Set(String::NewSymbol("batch") , FunctionTemplate::New(Batch)->GetFunction());
   constructor = Persistent<Function>::New(tpl->GetFunction());
 }
 
@@ -189,6 +201,47 @@ Handle<Value> Database::Delete (const Arguments& args) {
   return Undefined();
 }
 
+Handle<Value> Database::Batch (const Arguments& args) {
+  HandleScope scope;
+
+  Database* database = ObjectWrap::Unwrap<Database>(args.This());
+  Local<Array> array = Local<Array>::Cast(args[0]);
+  Persistent<Function> callback;
+  if (args.Length() > 1)
+    callback = Persistent<Function>::New(Local<Function>::Cast(args[args.Length() > 2 ? 2 : 1]));
+  bool sync = false;
+  if (args.Length() > 2) {
+    Local<Object> optionsObj = Local<Object>::Cast(args[1]);
+    sync = optionsObj->Has(option_sync) && optionsObj->Get(option_sync)->BooleanValue();
+  }
+
+  vector<BatchOp*> operations;
+  for (unsigned int i = 0; i < array->Length(); i++) {
+    Local<Object> obj = Local<Object>::Cast(array->Get(i));
+    if (!obj->Has(str_type) || !obj->Has(str_key))
+      continue;
+    String::Utf8Value key(obj->Get(str_key)->ToString());
+    if (obj->Get(str_type)->StrictEquals(str_del)) {
+      operations.push_back(new BatchDelete(*key));
+    } else if (obj->Get(str_type)->StrictEquals(str_put)) {
+      if (obj->Has(str_value)) {
+        String::Utf8Value value(obj->Get(str_value)->ToString());
+        operations.push_back(new BatchWrite(*key, *value));
+      }
+    }
+  }
+
+  BatchWorker* worker = new BatchWorker(
+      database
+    , callback
+    , operations
+    , sync
+  );
+  AsyncQueueWorker(worker);
+
+  return Undefined();
+}
+
 Handle<Value> CreateDatabase (const Arguments& args) {
   HandleScope scope;
   return scope.Close(Database::NewInstance(args));
diff --git a/src/database.h b/src/database.h
index 3c437cf..7438b9f 100644
--- a/src/database.h
+++ b/src/database.h
@@ -22,10 +22,11 @@ class Database : public node::ObjectWrap {
   static void Init ();
   static v8::Handle<v8::Value> NewInstance (const v8::Arguments& args);
 
-  Status OpenDatabase       (Options* options, string location);
-  Status WriteToDatabase    (WriteOptions* options, string key, string value);
-  Status ReadFromDatabase   (ReadOptions* options, string key, string& value);
-  Status DeleteFromDatabase (WriteOptions* options, string key);
+  Status OpenDatabase         (Options* options, string location);
+  Status WriteToDatabase      (WriteOptions* options, string key, string value);
+  Status ReadFromDatabase     (ReadOptions* options, string key, string& value);
+  Status DeleteFromDatabase   (WriteOptions* options, string key);
+  Status WriteBatchToDatabase (WriteOptions* options, WriteBatch* batch);
   void   CloseDatabase      ();
 
  private:
@@ -42,6 +43,7 @@ class Database : public node::ObjectWrap {
   LU_V8_METHOD( Write  )
   LU_V8_METHOD( Delete )
   LU_V8_METHOD( Read   )
+  LU_V8_METHOD( Batch  )
 };
 
 #endif
\ No newline at end of file
diff --git a/src/levelup.h b/src/levelup.h
index dfb4236..16f460b 100644
--- a/src/levelup.h
+++ b/src/levelup.h
@@ -1,6 +1,9 @@
 #ifndef LU_LEVELUP_H
 #define LU_LEVELUP_H
 
+#define LU_STR(key) \
+  static Persistent<String> str_ ## key = Persistent<String>::New(String::New(#key));
+
 #define LU_OPTION(key) \
   static Persistent<String> option_ ## key = Persistent<String>::New(String::New(#key));
 
diff --git a/test/simple-test.js b/test/simple-test.js
index 7742c45..f960e03 100644
--- a/test/simple-test.js
+++ b/test/simple-test.js
@@ -10,6 +10,20 @@ buster.testCase('Basic API', {
     'setUp': function () {
       this.cleanupDirs = []
       this.closeableDatabases = []
+      this.openTestDatabase = function (callback) {
+        var db = levelup.createDatabase(
+                this.cleanupDirs[0] = '/tmp/levelup_test_db'
+              , {
+                    createIfMissing: true
+                  , errorIfExists: true
+                }
+            )
+        this.closeableDatabases.push(db)
+        db.open(function (err) {
+          refute(err)
+          callback(db)
+        })
+      }.bind(this)
     }
 
   , 'tearDown': function (done) {
@@ -146,24 +160,7 @@ buster.testCase('Basic API', {
     }
 
   , 'Simple operations': {
-        'setUp': function () {
-          this.openTestDatabase = function (callback) {
-            var db = levelup.createDatabase(
-                    this.cleanupDirs[0] = '/tmp/levelup_test_db'
-                  , {
-                        createIfMissing: true
-                      , errorIfExists: true
-                    }
-                )
-            this.closeableDatabases.push(db)
-            db.open(function (err) {
-              refute(err)
-              callback(db)
-            })
-          }.bind(this)
-        }
-
-      , 'get() on non-open database causes error': function (done) {
+        'get() on non-open database causes error': function (done) {
           levelup.createDatabase('foobar').get('undefkey', function (err, value) {
             refute(value)
             assert.isInstanceOf(err, Error)
@@ -271,4 +268,91 @@ buster.testCase('Basic API', {
           })
         }
     }
+
+  , 'batch()': {
+        'batch() with multiple puts': function (done) {
+          this.openTestDatabase(function (db) {
+            db.batch(
+                [
+                    { type: 'put', key: 'foo', value: 'afoovalue' }
+                  , { type: 'put', key: 'bar', value: 'abarvalue' }
+                  , { type: 'put', key: 'baz', value: 'abazvalue' }
+                ]
+              , function (err) {
+                  refute(err)
+                  async.forEach(
+                      ['foo', 'bar', 'baz']
+                    , function (key, callback) {
+                        db.get(key, function (err, value) {
+                          refute(err)
+                          assert.equals(value, 'a' + key + 'value')
+                          callback()
+                        })
+                      }
+                    , done
+                  )
+                }
+            )
+          })
+        }
+
+      , 'batch() with multiple puts and deletes': function (done) {
+          this.openTestDatabase(function (db) {
+            async.series(
+                [
+                    function (callback) {
+                      db.batch(
+                          [
+                              { type: 'put', key: '1', value: 'one' }
+                            , { type: 'put', key: '2', value: 'two' }
+                            , { type: 'put', key: '3', value: 'three' }
+                          ]
+                        , callback
+                      )
+                    }
+                  , function (callback) {
+                      db.batch(
+                          [
+                              { type: 'put', key: 'foo', value: 'afoovalue' }
+                            , { type: 'del', key: '1' }
+                            , { type: 'put', key: 'bar', value: 'abarvalue' }
+                            , { type: 'del', key: 'foo' }
+                            , { type: 'put', key: 'baz', value: 'abazvalue' }
+                          ]
+                        , callback
+                      )
+                    }
+                  , function (callback) {
+                      // these should exist
+                      async.forEach(
+                          ['2', '3', 'bar', 'baz']
+                        , function (key, callback) {
+                            db.get(key, function (err, value) {
+                              refute(err)
+                              callback()
+                            })
+                          }
+                        , callback
+                      )
+                    }
+                  , function (callback) {
+                      // these shouldn't exist
+                      async.forEach(
+                          ['1', 'foo']
+                        , function (key, callback) {
+                            db.get(key, function (err, value) {
+                              assert(err)
+                              assert.isInstanceOf(err, errors.NotFoundError)
+                              callback()
+                            })
+                          }
+                        , callback
+                      )
+                    }
+                ]
+              , done
+            )
+          })
+        }
+    }
 })
\ No newline at end of file

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-leveldown.git



More information about the Pkg-javascript-commits mailing list