[Pkg-javascript-commits] [node-node-redis] 01/19: Imported Upstream version 0.1.6

Mike Gabriel sunweaver at debian.org
Wed Aug 20 13:18:16 UTC 2014


This is an automated email from the git hooks/post-receive script.

sunweaver pushed a commit to branch master
in repository node-node-redis.

commit 0c61d82f60841d0675fbb888723edaf355a83844
Author: Mike Gabriel <mike.gabriel at das-netzwerkteam.de>
Date:   Thu May 9 22:34:00 2013 +0200

    Imported Upstream version 0.1.6
---
 .gitignore   |   4 +
 .npmignore   |   1 +
 README       |  26 ++++
 bench.js     | 179 +++++++++++++++++++++++++++
 index.js     | 395 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 package.json |  15 +++
 parser.js    | 379 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 test/main.js |  94 ++++++++++++++
 utils.js     | 113 +++++++++++++++++
 9 files changed, 1206 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..596f28b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+tags
+dump.rdb
+bench/
+node_modules/
diff --git a/.npmignore b/.npmignore
new file mode 100644
index 0000000..95923ab
--- /dev/null
+++ b/.npmignore
@@ -0,0 +1 @@
+bench*
diff --git a/README b/README
new file mode 100644
index 0000000..50bcf53
--- /dev/null
+++ b/README
@@ -0,0 +1,26 @@
+node-redis
+==========
+
+A redis client.
+
+Usage
+-----
+
+    var redis = require('node-redis')
+
+    var client = redis.createClient(port, host, auth)
+
+    client.get('key', function (error, buffer) { ... })
+
+    client.multi()
+    client.lrange('key', 0, -1)
+    client.lrange('key', [0, -1])
+    client.exec(function (error, result) { result[0]; result[1] })
+
+    client.subscribe('channel')
+    client.on('message', function (buffer) { ... })
+    client.on('message:channel', function (buffer) { ... })
+    client.unsubscribe('channel')
+
+    client.end()
+
diff --git a/bench.js b/bench.js
new file mode 100644
index 0000000..451133d
--- /dev/null
+++ b/bench.js
@@ -0,0 +1,179 @@
+var redis  = require('./'),
+    //redis2 = require('redis'),
+    //redis3 = require('./bench/redis-node/redis'),
+    //redis4 = require('./bench/redis-client'),
+    Seq    = require('parallel').Sequence,
+    assert = require('assert');
+
+var clients = { 'node-redis': redis.createClient()/*,  'node_redis':        redis2.createClient(),*/
+                /*'redis-node': redis3.createClient(), 'redis-node-client': redis4.createClient()*/ }
+
+var iterations = 5000,
+    number     = 5;
+
+//var buffer = JSON.stringify({
+  //name: 'Bob Marley',
+  //dob: '31/12/1980',
+  //hash: '213897d9827o7fn437nv348n534',
+  //salt: '12345',
+  //bio: 'p983u4f hh sdh khfkslh kfjh dkshljdh k hslhlhuil huihsuidh liushuilsh lihgudif hlugfdhliughduil hgdf',
+  //created: Date.now()
+//});
+//var buffer = require('fs').readFileSync('binary');
+//var buffer = new Buffer(Array(1025 * 2).join('x'));
+//var buffer = Array(1025 * 2).join('x');
+var buffer = 'Some some random text for the benchmark.';
+//var buffer = 'xxx';
+//var buffer = require('fs').readFileSync('bench/html');
+
+var benches = {
+  set: function (client, callback) {
+    for (var i = 0; i < iterations - 1; i++) {
+      client.set('bench' + i, buffer);
+    }
+    client.set('bench' + i, buffer, callback);
+  },
+  get: function (client, callback) {
+    for (var i = 0; i < iterations - 1; i++) {
+      client.get('bench' + i);
+    }
+    client.get('bench' + i, callback);
+  },
+  del: function (client, callback) {
+    for (var i = 0; i < iterations - 1; i++) {
+      client.del('bench' + i);
+    }
+    client.del('bench' + i, callback);
+  },
+  lpush: function (client, callback) {
+    for (var i = 0; i < iterations - 1; i++) {
+      client.lpush('bench', buffer);
+    }
+    client.lpush('bench', buffer, callback);
+  },
+  lrange: function (client, callback) {
+    for (var i = 0; i < iterations - 1; i++) {
+      client.lrange('bench', 0, 10);
+    }
+    client.lrange('bench', 0, 10, callback);
+  },
+  hmset: function (client, callback) {
+    if ('redis-node' === client._name) return callback();
+    for (var i = 0; i < iterations - 1; i++) {
+      client.hmset('bench' + i, 'key', buffer, 'key2', buffer);
+    }
+    client.hmset('bench' + i, 'key', buffer, 'key2', buffer, callback);
+  },
+  hmget: function (client, callback) {
+    if ('redis-node' === client._name) return callback();
+    for (var i = 0; i < iterations - 1; i++) {
+      client.hmget('bench' + i, 'key', 'key2');
+    }
+    client.hmget('bench' + i, 'key', 'key2', callback);
+  },
+  del2: function (client, callback) {
+    for (var i = 0; i < iterations - 1; i++) {
+      client.del('bench' + i);
+    }
+    client.del('bench' + i, callback);
+  },
+};
+
+var task   = new Seq(),
+    warmup = new Seq();
+
+Object.keys(clients).forEach(function (client) {
+  clients[client]._name = client;
+  client                = clients[client];
+  client.benches        = {};
+
+  for (var i = 0; i < number; i++) {
+    Object.keys(benches).forEach(function (bench) {
+      client.benches[bench] = [];
+
+      task.add(function (next, error) {
+        process.stdout.write('.');
+        var time = Date.now();
+        benches[bench](client, function (error) {
+          client.benches[bench].push(Date.now() - time);
+          next();
+        });
+      });
+    });
+
+    task.add(function (next) {
+      client.del('bench', next);
+    });
+  }
+});
+Object.keys(clients).forEach(function (client) {
+  clients[client]._name = client;
+  client                = clients[client];
+  client.benches        = {};
+
+  Object.keys(benches).forEach(function (bench) {
+    client.benches[bench] = [];
+
+    warmup.add(function (next) {
+      client.flushall(next);
+    });
+  });
+});
+
+clients['node-redis'].on('connect', function () {
+  var old_iter = iterations;
+
+  iterations = 100;
+
+  warmup.run(function () {
+    //throw new Error;
+    iterations = old_iter;
+    setTimeout(function () {
+      task.run(end);
+    }, 1000);
+  });
+});
+
+var end = function end () {
+  process.stdout.write('\r\n');
+  var bench, client_name, client,
+      keys       = Object.keys(clients),
+      bench_keys = Object.keys(benches);
+
+  for (var i = 0, il = bench_keys.length; i < il; i++) {
+    bench = bench_keys[i];
+    console.log('=== ' + bench + ' x' + iterations + ' ===');
+
+    for (var j = 0, jl = keys.length; j < jl; j++) {
+      client_name = keys[j];
+      client      = clients[client_name];
+
+      console.log(client_name + ' results: ' + client.benches[bench].join(', '));
+    }
+
+    console.log('');
+
+    for (j = 0, jl = keys.length; j < jl; j++) {
+      client_name = keys[j];
+      client      = clients[client_name];
+
+      client.benches[bench] = eval(client.benches[bench].join('+')) / client.benches[bench].length;
+
+      console.log(client_name + ' avg: ' + client.benches[bench]);
+    }
+
+    console.log('');
+
+    for (j = 0, jl = keys.length; j < jl; j++) {
+      client_name = keys[j];
+      client      = clients[client_name];
+
+      console.log(client_name + ' ops/s: ' + ((iterations / client.benches[bench]) * 1000));
+    }
+
+    console.log('\r\n');
+  }
+
+  // Bye!
+  process.exit();
+};
diff --git a/index.js b/index.js
new file mode 100644
index 0000000..0967c44
--- /dev/null
+++ b/index.js
@@ -0,0 +1,395 @@
+// The MIT License
+//
+// Copyright (c) 2013 Tim Smart
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files
+// (the "Software"), to deal in the Software without restriction,
+// including without limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of the Software, and
+// to permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var net    = require('net'),
+    utils  = require('./utils'),
+    Parser = require('./parser');
+
+var RedisClient = function RedisClient(port, host, auth) {
+  this.host           = host;
+  this.port           = port;
+  this.auth           = auth;
+  this.stream         = net.createConnection(port, host);;
+  this.connected      = false;
+  // Pub/sub monitor etc.
+  this.blocking       = false;
+  // Command queue.
+  this.max_size       = 1300;
+  this.command        = '';
+  this.commands       = new utils.Queue();
+  // For the retry timer.
+  this.retry          = false;
+  this.retry_attempts = 0;
+  this.retry_delay    = 250;
+  this.retry_backoff  = 1.7;
+  // If we want to quit.
+  this.quitting       = false;
+  // For when we have a full send buffer.
+  this.paused         = false;
+  this.send_buffer    = [];
+  this.flushing       = false;
+
+  var self = this;
+
+  this.stream.on("connect", function () {
+    // Reset the retry backoff.
+    self.retry          = false;
+    self.retry_delay    = 250;
+    self.retry_attempts = 0;
+    self.stream.setNoDelay();
+    self.stream.setTimeout(0);
+    self.connected      = true;
+
+    // Resend commands if we need to.
+    var command,
+        commands = self.commands.array.slice(self.commands.offset);
+
+    // Send auth.
+    if (self.auth) {
+      commands.unshift(['AUTH', [self.auth], null]);
+    }
+
+    self.commands = new utils.Queue();
+
+    for (var i = 0, il = commands.length; i < il; i++) {
+      command = commands[i];
+      self.sendCommand(command[0], command[1], command[2]);
+    }
+
+    // give connect listeners a chance to run first in case they need to auth
+    self.emit("connect");
+  });
+
+  this.stream.on("data", function (buffer) {
+    try {
+      self.parser.onIncoming(buffer);
+    } catch (err) {
+      self.emit("error", err);
+      // Reset state.
+      self.parser.resetState();
+    }
+  });
+
+  // _write
+  // So we can pipeline requests.
+  this._flush = function () {
+    if ('' !== self.command) {
+      self.send_buffer.push(self.command);
+      self.command = '';
+    }
+
+    for (var i = 0, il = self.send_buffer.length; i < il; i++) {
+      if (!self.stream.writable || false === self.stream.write(self.send_buffer[i])) {
+        return self.send_buffer = self.send_buffer.slice(i + 1);
+      }
+    }
+
+    self.send_buffer.length = 0;
+    self.paused = self.flushing = false;
+  };
+
+  // When we can write more.
+  this.stream.on('drain', this._flush);
+
+  this.stream.on("error", function (error) {
+    self.emit("error", error);
+  });
+
+  var onClose = function onClose () {
+    // Ignore if we are already retrying. Or we want to quit.
+    if (self.retry) return;
+    self.emit('end');
+    self.emit('close');
+    if (self.quitting) {
+      for (var i = 0, il = self.commands.length; i < il; i++) {
+        self.parser.emit('reply');
+      }
+      return;
+    }
+
+    self.onDisconnect();
+  };
+
+  this.stream.on("end", onClose);
+  this.stream.on("close", onClose);
+
+  // Setup the parser.
+  this.parser = new Parser();
+
+  this.parser.on('reply', function onReply (reply) {
+    if (false !== self.blocking) {
+      if ('pubsub' === self.blocking) {
+        var type = reply[0].toString();
+
+        switch (type) {
+        case 'psubscribe':
+        case 'punsubscribe':
+        case 'subscribe':
+        case 'unsubscribe':
+          var channel = reply[1].toString(),
+              count   = reply[2];
+
+          if (0 === count) {
+            self.blocking = false;
+          }
+
+          self.emit(type, channel, count);
+          self.emit(type + ':' + channel, count);
+          break;
+        case 'message':
+          var key  = reply[1].toString(),
+              data = reply[2];
+          self.emit('message', key, data);
+          self.emit('message:' + key, data);
+          break;
+        case 'pmessage':
+          var pattern = reply[1].toString(),
+              key     = reply[2].toString(),
+              data    = reply[3];
+          self.emit('pmessage', pattern, key, data);
+          self.emit('pmessage:' + pattern, key, data);
+          break;
+        }
+      } else {
+        self.emit('data', reply);
+      }
+      return;
+    }
+
+    var command = self.commands.shift();
+    if (command) {
+      switch (command[0]) {
+      case 'MONITOR':
+        self.blocking = true;
+        break;
+      case 'SUBSCRIBE':
+      case 'PSUBSCRIBE':
+        self.blocking = 'pubsub';
+        onReply(reply);
+        return;
+      }
+
+      if (command[2]) {
+        command[2](null, reply);
+      }
+    }
+  });
+
+  // DB error
+  this.parser.on('error', function (error) {
+    var command = self.commands.shift();
+    error = new Error(error);
+    if (command && command[2]) command[2](error);
+    else self.emit('error', error);
+  });
+
+  process.EventEmitter.call(this);
+
+  return this;
+};
+
+RedisClient.prototype = Object.create(process.EventEmitter.prototype);
+
+// Exports
+exports.RedisClient = RedisClient;
+
+// createClient
+exports.createClient = function createClient (port, host, auth) {
+  return new RedisClient(port || 6379, host, auth);
+};
+
+RedisClient.prototype.connect = function () {
+  return this.stream.connect();
+};
+
+RedisClient.prototype.onDisconnect = function (error) {
+  var self = this;
+
+  // Make sure the stream is reset.
+  this.connected = false;
+  this.stream.destroy();
+  this.parser.resetState();
+
+  // Increment the attempts, so we know what to set the timeout to.
+  this.retry_attempts++;
+
+  // Set the retry timer.
+  setTimeout(function () {
+    self.stream.connect(self.port, self.host);
+  }, this.retry_delay);
+
+  this.retry_delay *= this.retry_backoff;
+  this.retry        = true;
+};
+
+RedisClient.prototype._write = function (data) {
+  if (!this.paused) {
+    if (false === this.stream.write(data)) {
+      this.paused = true;
+    }
+  } else {
+    this.send_buffer.push(data);
+  }
+};
+
+// We use this so we can watch for a full send buffer.
+RedisClient.prototype.write = function write (data, buffer) {
+  if (true !== buffer) {
+    this.command += data;
+    if (this.max_size <= this.command.length) {
+      this._write(this.command);
+      this.command = '';
+    }
+  } else {
+    if ('' !== this.command) {
+      this._write(this.command);
+      this.command = '';
+    }
+    this._write(data);
+  }
+
+  if (!this.flushing) {
+    process.nextTick(this._flush);
+    this.flushing = true;
+  }
+};
+
+// We make some assumptions:
+//
+// * command WILL be uppercase and valid.
+// * args IS an array
+RedisClient.prototype.sendCommand = function (command, args, callback) {
+  // Push the command to the stack.
+  if (false === this.blocking) {
+    this.commands.push([command, args, callback]);
+  }
+
+  // Writable?
+  if (false === this.connected) return;
+
+  // Do we have to send a multi bulk command?
+  // Assume it is a valid command for speed reasons.
+  var args_length;
+
+  if (args && 0 < (args_length = args.length)) {
+    var arg, arg_type, last,
+        previous = ['*', (args_length + 1), '\r\n', '$', command.length, '\r\n', command, '\r\n'];
+
+    for (i = 0, il = args_length; i < il; i++) {
+      arg      = args[i];
+      arg_type = typeof arg;
+
+      if ('string' === arg_type) {
+        // We can send this in one go.
+        previous.push('$', Buffer.byteLength(arg), '\r\n', arg, '\r\n');
+      } else if ('number' === arg_type) {
+        // We can send this in one go.
+        previous.push('$', ('' + arg).length, '\r\n', arg, '\r\n');
+      } else if (null === arg || 'undefined' === arg_type) {
+        // Send NIL
+        previous.push('$\r\b\r\b')
+        this.write(previous.join(''));
+        previous = [];
+      } else {
+        // Assume we are a buffer.
+        previous.push('$', arg.length, '\r\n');
+        this.write(previous.join(''));
+        this.write(arg, true);
+        previous  = ['\r\n'];
+      }
+    }
+
+    // Anything left?
+    this.write(previous.join(''));
+  } else {
+    // We are just sending a stand alone command.
+    this.write(command_buffers[command]);
+  }
+};
+
+RedisClient.prototype.destroy = function () {
+  this.quitting = true;
+  return this.stream.destroy();
+};
+
+// http://redis.io/commands.json
+exports.commands = [
+  'APPEND', 'AUTH', 'BGREWRITEAOF', 'BGSAVE', 'BLPOP', 'BRPOP', 'BRPOPLPUSH', 'CONFIG GET',
+  'CONFIG SET', 'CONFIG RESETSTAT', 'DBSIZE', 'DEBUG OBJECT', 'DEBUG SEGFAULT', 'DECR',
+  'DECRBY', 'DEL', 'DISCARD', 'ECHO', 'EXEC', 'EXISTS', 'EXPIRE', 'EXPIREAT', 'FLUSHALL',
+  'FLUSHDB', 'GET', 'GETBIT', 'GETRANGE', 'GETSET', 'HDEL', 'HEXISTS', 'HGET', 'HGETALL',
+  'HINCRBY', 'HKEYS', 'HLEN', 'HMGET', 'HMSET', 'HSET', 'HSETNX', 'HVALS', 'INCR', 'INCRBY',
+  'INFO', 'KEYS', 'LASTSAVE', 'LINDEX', 'LINSERT', 'LLEN', 'LPOP', 'LPUSH', 'LPUSHX', 'LRANGE',
+  'LREM', 'LSET', 'LTRIM', 'MGET', 'MONITOR', 'MOVE', 'MSET', 'MSETNX', 'MULTI', 'PERSIST',
+  'PING', 'PSUBSCRIBE', 'PUBLISH', 'PUNSUBSCRIBE', 'QUIT', 'RANDOMKEY', 'RENAME', 'RENAMENX',
+  'RPOP', 'RPOPLPUSH', 'RPUSH', 'RPUSHX', 'SADD', 'SAVE', 'SCARD', 'SDIFF', 'SDIFFSTORE', 'SELECT',
+  'SET', 'SETBIT', 'SETEX', 'SETNX', 'SETRANGE', 'SHUTDOWN', 'SINTER', 'SINTERSTORE', 'SISMEMBER',
+  'SLAVEOF', 'SMEMBERS', 'SMOVE', 'SORT', 'SPOP', 'SRANDMEMBER', 'SREM', 'STRLEN', 'SUBSCRIBE',
+  'SUNION', 'SUNIONSTORE', 'SYNC', 'TTL', 'TYPE', 'UNSUBSCRIBE', 'UNWATCH', 'WATCH', 'ZADD',
+  'ZCARD', 'ZCOUNT', 'ZINCRBY', 'ZINTERSTORE', 'ZRANGE', 'ZRANGEBYSCORE', 'ZRANK', 'ZREM',
+  'ZREMRANGEBYRANK', 'ZREMRANGEBYSCORE', 'ZREVRANGE', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZSCORE',
+  'ZUNIONSTORE'
+];
+
+this.blocking_commands = ["MONITOR"];
+
+// For each command, make a buffer for it.
+var command_buffers = {};
+
+exports.commands.forEach(function (command) {
+  // Pre-alloc buffers for non-multi commands.
+  //command_buffers[command] = new Buffer('*1\r\n$' + command.length + '\r\n' + command + '\r\n');
+  command_buffers[command] = '*1\r\n$' + command.length + '\r\n' + command + '\r\n';
+
+  // Don't override stuff.
+  if (!RedisClient.prototype[command.toLowerCase()]) {
+    RedisClient.prototype[command.toLowerCase()] = function (array, callback) {
+      // An array of args.
+      // Assume we only have two args.
+      if (Array.isArray(array)) {
+        return this.sendCommand(command, array, callback);
+      }
+
+      // Arbitary amount of arguments.
+      var args    = [];
+      args.push.apply(args, arguments);
+      callback    = 'function' === typeof args[args.length - 1];
+
+      if (callback) {
+        callback  = args.pop();
+      } else {
+        callback  = null;
+      }
+
+      this.sendCommand(command, args, callback);
+    };
+  }
+});
+
+// Overwrite quit
+RedisClient.prototype.quit = RedisClient.prototype.end =
+function (callback) {
+  this.quitting = true;
+  return this.sendCommand('QUIT', null, callback);
+};
+
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..a594898
--- /dev/null
+++ b/package.json
@@ -0,0 +1,15 @@
+{
+  "name": "node-redis",
+  "description": "Lightweight, fast, Redis client.",
+  "version": "0.1.6",
+  "author": "Tim Smart",
+  "contributors": [
+    "Matt Ranney"
+  ],
+  "repository": {
+    "type": "git",
+    "url": "http://github.com/Tim-Smart/node-redis.git"
+  },
+  "engine": [ "node >=0.2.2" ],
+  "main": "./"
+}
diff --git a/parser.js b/parser.js
new file mode 100644
index 0000000..5a65cfb
--- /dev/null
+++ b/parser.js
@@ -0,0 +1,379 @@
+// The MIT License
+//
+// Copyright (c) 2013 Tim Smart
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files
+// (the "Software"), to deal in the Software without restriction,
+// including without limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of the Software, and
+// to permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var utils = require('./utils');
+
+var RedisParser = function RedisParser () {
+  this.resetState();
+
+  process.EventEmitter.call(this);
+
+  return this;
+};
+
+module.exports = RedisParser;
+
+RedisParser.prototype = Object.create(process.EventEmitter.prototype);
+
+// Reset state, no matter where we are at.
+RedisParser.prototype.resetState = function resetState () {
+  this.reply     = null;
+  this.expected  = null;
+  this.multi     = null;
+  this.replies   = null;
+  this.pos       = null;
+  this.flag      = 'TYPE';
+  this.data      = null;
+  this.last_data = null;
+  this.remaining = null;
+};
+
+// Handle an incoming buffer.
+RedisParser.prototype.onIncoming = function onIncoming (buffer) {
+  var char_code,
+      pos    = this.pos || 0,
+      length = buffer.length;
+
+  // Make sure the buffer is joint properly.
+  if ('TYPE' !== this.flag && 'BULK' !== this.flag && null !== this.data) {
+    // We need to wind back a step.
+    // If we have CR now, it would break the parser.
+    if (0 !== this.data.length) {
+      char_code = this.data.charCodeAt(this.data.length - 1);
+      this.data = this.data.slice(0, -1);
+      --pos;
+    } else {
+      char_code = buffer[pos];
+    }
+  }
+
+  for (; length > pos;) {
+    switch (this.flag) {
+    case 'TYPE':
+      // What are we doing next?
+      switch (buffer[pos++]) {
+      // Single line status reply.
+      case 43: // + SINGLE
+        this.flag = 'SINGLE';
+        break;
+
+      // Tells us the length of upcoming data.
+      case 36: // $ LENGTH
+        this.flag = 'BULK_LENGTH';
+        break;
+
+      // Tells us how many args are coming up.
+      case 42: // * MULTI
+        this.flag = 'MULTI_BULK';
+        break;
+
+      case 58: // : INTEGER
+        this.flag = 'INTEGER';
+        break;
+
+      // Errors
+      case 45: // - ERROR
+        this.flag = 'ERROR';
+        break;
+      }
+      // Fast forward a char.
+      char_code = buffer[pos];
+      this.data = '';
+      break;
+
+    // Single line status replies.
+    case 'SINGLE':
+    case 'ERROR':
+      // Add char to the data
+      this.data += String.fromCharCode(char_code);
+      pos++;
+
+      // Optimize for the common use case.
+      if ('O' === this.data && 75 === buffer[pos]) { // OK
+        // Send off the reply.
+        this.data = 'OK';
+        this.onData();
+
+        pos += 3; // Skip the `K\r\n`
+
+        // break early.
+        break;
+      }
+
+      // Otherwise check for CR
+      char_code = buffer[pos];
+      if (13 === char_code) { // \r CR
+        // Send the reply.
+        if ('SINGLE' === this.flag) {
+          this.onData();
+        } else {
+          this.onError();
+        }
+
+        // Skip \r\n
+        pos += 2;
+      }
+      break;
+
+    // We have a integer coming up. Look for a CR
+    // then assume that is the end.
+    case 'BULK_LENGTH':
+      // We are still looking for more digits.
+      // char_code already set by TYPE state.
+      this.data += String.fromCharCode(char_code);
+      pos++;
+
+      // Is the next char the end? Set next char_code while
+      // we are at it.
+      char_code = buffer[pos];
+      if (13 === char_code) { // \r CR
+        // Cast to int
+        this.data = +this.data;
+
+        // Null reply?
+        if (-1 !== this.data) {
+          this.flag      = 'BULK';
+          this.last_data = this.data;
+          this.data      = null;
+        } else {
+          this.data = null;
+          this.onData();
+        }
+
+        // Skip the \r\n
+        pos += 2;
+      }
+      break;
+
+    // Short bulk reply.
+    case 'BULK':
+      if (null === this.data && length >= (pos + this.last_data)) {
+        // Slow slice is slow.
+        if (14 > this.last_data) {
+          this.data = new Buffer(this.last_data);
+          for (var i = 0; i < this.last_data; i++) {
+            this.data[i] = buffer[i + pos];
+          }
+        } else {
+          this.data = buffer.slice(pos, this.last_data + pos);
+        }
+
+        // Fast forward past data.
+        pos += this.last_data + 2;
+
+        // Send it off.
+        this.onData();
+      } else if (this.data) {
+        // Still joining. pos = amount left to go.
+        if (this.remaining <= length) {
+          // End is within this buffer.
+          if (13 < this.remaining) {
+            buffer.copy(this.data, this.last_data - this.remaining, 0, this.remaining)
+          } else {
+            utils.copyBuffer(buffer, this.data, this.last_data - this.remaining, 0, this.remaining);
+          }
+
+          // Fast forward past data.
+          pos = this.remaining + 2;
+          this.remaining = null;
+
+          this.onData();
+        } else {
+          // We have more to come. Copy what we got then move on,
+          // decrementing the amount we have copied from this.remaining
+          if (13 < (this.remaining - length)) {
+            utils.copyBuffer(buffer, this.data, this.last_data - this.remaining, 0, length);
+          } else {
+            buffer.copy(this.data, this.last_data - this.remaining, 0, length);
+          }
+
+          // More to go.
+          this.remaining -= length;
+          pos             = length;
+        }
+      } else {
+        // We will have to do a join.
+        this.data = new Buffer(this.last_data);
+
+        // Fast copy if small.
+        if (15 > this.last_data) {
+          utils.copyBuffer(buffer, this.data, 0, pos);
+        } else {
+          buffer.copy(this.data, 0, pos)
+        }
+
+        // Point pos to the amount we need.
+        this.remaining = this.last_data - (length - pos);
+        pos            = length;
+      }
+      break;
+
+    // How many bulk's are coming?
+    case 'MULTI_BULK':
+      // We are still looking for more digits.
+      // char_code already set by TYPE state.
+      this.data += String.fromCharCode(char_code);
+      pos++;
+
+      // Is the next char the end? Set next char_code while
+      // we are at it.
+      char_code = buffer[pos];
+      if (13 === char_code) { // \r CR
+        // Cast to int
+        this.last_data = +this.data;
+        this.data      = null;
+
+        // Are we multi?
+        if (null === this.expected) {
+          this.expected = this.last_data;
+          this.reply    = [];
+        } else if (null === this.multi) {
+          this.multi    = this.expected;
+          this.expected = null;
+          this.replies  = [];
+        }
+
+        // Skip the \r\n
+        pos += 2;
+        this.flag = 'TYPE';
+
+        // Zero length replies.
+        if (0 === this.last_data) {
+          this.expected = this.reply = null;
+          this.data     = [];
+          this.onData();
+          break;
+        } else if (-1 === this.last_data) {
+          // NIL reply.
+          this.expected = this.reply = null;
+          this.data     = null;
+          this.onData();
+          break;
+        }
+
+        char_code = buffer[pos];
+
+        // Will have to look ahead to check for another MULTI in case
+        // we are a multi transaction.
+        if (36 === char_code) { // $ - BULK_LENGTH
+          // We are bulk data.
+          this.flag = 'BULK_LENGTH';
+
+          // We are skipping the TYPE check. Skip the $
+          pos++;
+          // We need to set char code and data.
+          char_code = buffer[pos];
+          this.data = '';
+        } else if (null === this.multi && char_code) {
+          // Multi trans time.
+          this.multi    = this.expected;
+          this.expected = null;
+          this.replies  = [];
+        }
+      }
+      break;
+
+    case 'INTEGER':
+      // We are still looking for more digits.
+      // char_code already set by TYPE state.
+      this.data += String.fromCharCode(char_code);
+      pos++;
+
+      // Is the next char the end? Set next char_code while
+      // we are at it.
+      char_code = buffer[pos];
+      if (13 === char_code) { // \r CR
+        // Cast to int
+        this.data = +this.data;
+        this.onData();
+
+        // Skip the \r\n
+        pos += 2;
+      }
+      break;
+    }
+  }
+
+  // In case we have multiple packets.
+  this.pos = pos - length;
+};
+
+// When we have recieved a chunk of response data.
+RedisParser.prototype.onData = function onData () {
+  if (null !== this.expected) {
+    // Decrement the expected data replies and add the data.
+    this.reply.push(this.data);
+    this.expected--;
+
+    // Finished? Send it off.
+    if (0 === this.expected) {
+      if (null !== this.multi) {
+        this.replies.push(this.reply);
+        this.multi--;
+
+        if (0 === this.multi) {
+          this.emit('reply', this.replies);
+          this.replies = this.multi = null;
+        }
+      } else {
+        this.emit('reply', this.reply);
+      }
+      this.reply = this.expected = null;
+    }
+  } else {
+    if (null === this.multi) {
+      this.emit('reply', this.data);
+    } else {
+      this.replies.push(this.data);
+      this.multi--;
+
+      if (0 === this.multi) {
+        this.emit('reply', this.replies);
+        this.replies = this.multi = null;
+      }
+    }
+  }
+
+  this.last_data = null;
+  this.data      = null;
+  this.flag      = 'TYPE';
+};
+
+// Found an error.
+RedisParser.prototype.onError = function onError () {
+  if (null === this.multi) {
+    this.emit('error', this.data);
+  } else {
+    this.replies.push(this.data);
+    this.multi--;
+
+    if (0 === this.multi) {
+      this.emit('reply', this.replies);
+      this.replies = this.multi = null;
+    }
+  }
+
+  this.last_data = null;
+  this.data      = null;
+  this.flag      = 'TYPE';
+};
diff --git a/test/main.js b/test/main.js
new file mode 100644
index 0000000..a522131
--- /dev/null
+++ b/test/main.js
@@ -0,0 +1,94 @@
+var c      = require('../').createClient(null, null, 'test'),
+    c2     = require('../').createClient(),
+    assert = require('assert');
+
+var buffer = new Buffer(new Array(1025).join('x'));
+
+module.exports = {
+  "test basic commands": function (done) {
+    c.set('1', 'test');
+    c.get('1', function (error, value) {
+      assert.ok(!error);
+      assert.equal(value, 'test');
+    });
+
+    c.del('1', function (error) {
+      assert.ok(!error);
+    });
+
+    c.get('1', function (error, value) {
+      assert.ok(!error);
+      assert.isNull(value);
+      done();
+    });
+
+  },
+  "test stress": function (done) {
+    var n = 0,
+        o = 0;
+
+    for (var i = 0; i < 10000; i++) {
+      c.set('2' + i, buffer, function (error) {
+        assert.ok(!error);
+        ++n;
+      });
+    }
+
+    for (i = 0; i < 10000; i++) {
+      c.del('2' + i, function (error) {
+        assert.ok(!error);
+        ++o;
+      });
+    }
+
+    c.ping(function (error) {
+      assert.ok(!error)
+      done()
+    })
+
+    process.on('exit', function () {
+      assert.equal(10000, n);
+      assert.equal(10000, o);
+    });
+  },
+  "test pubsub": function (done) {
+    c.subscribe('test');
+    c.on('subscribe:test', function (count) {
+      assert.equal(1, count);
+      c2.publish('test', '123', function (error) {
+        assert.ok(!error);
+      });
+    });
+    c.on('message:test', function (data) {
+      assert.equal('123', data.toString());
+      c.unsubscribe('test');
+    });
+    c.on('unsubscribe:test', function (count) {
+      assert.equal(0, count);
+      assert.equal(false, c.blocking);
+      c.ping(function (error) {
+        assert.ok(!error);
+        done();
+      });
+    });
+  },
+  "test monitor": function (done) {
+    c.monitor();
+    c.once('data', function (data) {
+      assert.ok(/MONITOR/.test(data));
+      c.once('data', function (data) {
+        assert.ok(/SET/.test(data));
+        c.once('data', function (data) {
+          assert.ok(/DEL/.test(data));
+          done();
+        });
+      });
+    });
+    c2.set('test', 123);
+    c2.del('test');
+  },
+  after: function () {
+    c.quit();
+    c2.quit();
+  }
+};
diff --git a/utils.js b/utils.js
new file mode 100644
index 0000000..52eb029
--- /dev/null
+++ b/utils.js
@@ -0,0 +1,113 @@
+// The MIT License
+//
+// Copyright (c) 2013 Tim Smart
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files
+// (the "Software"), to deal in the Software without restriction,
+// including without limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of the Software, and
+// to permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// noop to keep references low.
+exports.noop = function () {};
+
+// Logger function.
+exports.log = function log (error, results) {
+  if (error) return console.error(error);
+
+  var ret;
+
+  if (results instanceof Array) {
+    var result;
+    ret = [];
+
+    for (var i = 0, il = results.length; i < il; i++) {
+      result = results[i];
+
+      if (result instanceof Buffer) {
+        ret.push(result.toString());
+      } else {
+        ret.push(result);
+      }
+    }
+  } else if (results instanceof Buffer) {
+    ret = results.toString();
+  } else ret = results;
+
+  console.log(ret);
+};
+
+// Fast copyBuffer method for small buffers.
+exports.copyBuffer = function copyBuffer (source, target, start, s_start, s_end) {
+  s_end || (s_end = source.length);
+
+  for (var i = s_start; i < s_end; i++) {
+    target[i - s_start + start] = source[i];
+  }
+
+  return target;
+};
+
+// Fast write buffer for small uns.
+var writeBuffer = exports.writeBuffer = function writeBuffer (buffer, string, offset) {
+  for (var i = 0, il = string.length; i < il; i++) {
+    buffer[i + offset] = string.charCodeAt(i);
+  }
+
+  return il;
+};
+
+var toArray = exports.toArray = function toArray (args) {
+  var len = args.length,
+      arr = new Array(len), i;
+
+  for (i = 0; i < len; i++) {
+    arr[i] = args[i];
+  }
+
+  return arr;
+};
+
+// Queue class adapted from Tim Caswell's pattern library
+// http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js
+var Queue = function () {
+  this.array = Array.prototype.slice.call(arguments);
+  this.offset = 0;
+};
+
+exports.Queue = Queue;
+
+Queue.prototype.shift = function () {
+  if (this.array.length === 0) return;
+  var ret = this.array[this.offset];
+  this.array[this.offset++] = undefined;
+  if (this.offset === this.array.length) {
+    this.array.length = 0;
+    this.offset       = 0;
+  }
+  return ret;
+}
+
+Queue.prototype.push = function (item) {
+  return this.array.push(item);
+};
+
+Object.defineProperty(Queue.prototype, 'length', {
+  get: function () {
+    return this.array.length;
+  }
+});
+;

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-node-redis.git



More information about the Pkg-javascript-commits mailing list