[Pkg-javascript-commits] [node-node-redis] 01/19: Imported Upstream version 0.1.6
Mike Gabriel
sunweaver at debian.org
Wed Aug 20 13:18:16 UTC 2014
This is an automated email from the git hooks/post-receive script.
sunweaver pushed a commit to branch master
in repository node-node-redis.
commit 0c61d82f60841d0675fbb888723edaf355a83844
Author: Mike Gabriel <mike.gabriel at das-netzwerkteam.de>
Date: Thu May 9 22:34:00 2013 +0200
Imported Upstream version 0.1.6
---
.gitignore | 4 +
.npmignore | 1 +
README | 26 ++++
bench.js | 179 +++++++++++++++++++++++++++
index.js | 395 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
package.json | 15 +++
parser.js | 379 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
test/main.js | 94 ++++++++++++++
utils.js | 113 +++++++++++++++++
9 files changed, 1206 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..596f28b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+tags
+dump.rdb
+bench/
+node_modules/
diff --git a/.npmignore b/.npmignore
new file mode 100644
index 0000000..95923ab
--- /dev/null
+++ b/.npmignore
@@ -0,0 +1 @@
+bench*
diff --git a/README b/README
new file mode 100644
index 0000000..50bcf53
--- /dev/null
+++ b/README
@@ -0,0 +1,26 @@
+node-redis
+==========
+
+A redis client.
+
+Usage
+-----
+
+ var redis = require('node-redis')
+
+ var client = redis.createClient(port, host, auth)
+
+ client.get('key', function (error, buffer) { ... })
+
+ client.multi()
+ client.lrange('key', 0, -1)
+ client.lrange('key', [0, -1])
+ client.exec(function (error, result) { result[0]; result[1] })
+
+ client.subscribe('channel')
+ client.on('message', function (buffer) { ... })
+ client.on('message:channel', function (buffer) { ... })
+ client.unsubscribe('channel')
+
+ client.end()
+
diff --git a/bench.js b/bench.js
new file mode 100644
index 0000000..451133d
--- /dev/null
+++ b/bench.js
@@ -0,0 +1,179 @@
+var redis = require('./'),
+ //redis2 = require('redis'),
+ //redis3 = require('./bench/redis-node/redis'),
+ //redis4 = require('./bench/redis-client'),
+ Seq = require('parallel').Sequence,
+ assert = require('assert');
+
+var clients = { 'node-redis': redis.createClient()/*, 'node_redis': redis2.createClient(),*/
+ /*'redis-node': redis3.createClient(), 'redis-node-client': redis4.createClient()*/ }
+
+var iterations = 5000,
+ number = 5;
+
+//var buffer = JSON.stringify({
+ //name: 'Bob Marley',
+ //dob: '31/12/1980',
+ //hash: '213897d9827o7fn437nv348n534',
+ //salt: '12345',
+ //bio: 'p983u4f hh sdh khfkslh kfjh dkshljdh k hslhlhuil huihsuidh liushuilsh lihgudif hlugfdhliughduil hgdf',
+ //created: Date.now()
+//});
+//var buffer = require('fs').readFileSync('binary');
+//var buffer = new Buffer(Array(1025 * 2).join('x'));
+//var buffer = Array(1025 * 2).join('x');
+var buffer = 'Some some random text for the benchmark.';
+//var buffer = 'xxx';
+//var buffer = require('fs').readFileSync('bench/html');
+
+var benches = {
+ set: function (client, callback) {
+ for (var i = 0; i < iterations - 1; i++) {
+ client.set('bench' + i, buffer);
+ }
+ client.set('bench' + i, buffer, callback);
+ },
+ get: function (client, callback) {
+ for (var i = 0; i < iterations - 1; i++) {
+ client.get('bench' + i);
+ }
+ client.get('bench' + i, callback);
+ },
+ del: function (client, callback) {
+ for (var i = 0; i < iterations - 1; i++) {
+ client.del('bench' + i);
+ }
+ client.del('bench' + i, callback);
+ },
+ lpush: function (client, callback) {
+ for (var i = 0; i < iterations - 1; i++) {
+ client.lpush('bench', buffer);
+ }
+ client.lpush('bench', buffer, callback);
+ },
+ lrange: function (client, callback) {
+ for (var i = 0; i < iterations - 1; i++) {
+ client.lrange('bench', 0, 10);
+ }
+ client.lrange('bench', 0, 10, callback);
+ },
+ hmset: function (client, callback) {
+ if ('redis-node' === client._name) return callback();
+ for (var i = 0; i < iterations - 1; i++) {
+ client.hmset('bench' + i, 'key', buffer, 'key2', buffer);
+ }
+ client.hmset('bench' + i, 'key', buffer, 'key2', buffer, callback);
+ },
+ hmget: function (client, callback) {
+ if ('redis-node' === client._name) return callback();
+ for (var i = 0; i < iterations - 1; i++) {
+ client.hmget('bench' + i, 'key', 'key2');
+ }
+ client.hmget('bench' + i, 'key', 'key2', callback);
+ },
+ del2: function (client, callback) {
+ for (var i = 0; i < iterations - 1; i++) {
+ client.del('bench' + i);
+ }
+ client.del('bench' + i, callback);
+ },
+};
+
+var task = new Seq(),
+ warmup = new Seq();
+
+Object.keys(clients).forEach(function (client) {
+ clients[client]._name = client;
+ client = clients[client];
+ client.benches = {};
+
+ for (var i = 0; i < number; i++) {
+ Object.keys(benches).forEach(function (bench) {
+ client.benches[bench] = [];
+
+ task.add(function (next, error) {
+ process.stdout.write('.');
+ var time = Date.now();
+ benches[bench](client, function (error) {
+ client.benches[bench].push(Date.now() - time);
+ next();
+ });
+ });
+ });
+
+ task.add(function (next) {
+ client.del('bench', next);
+ });
+ }
+});
+Object.keys(clients).forEach(function (client) {
+ clients[client]._name = client;
+ client = clients[client];
+ client.benches = {};
+
+ Object.keys(benches).forEach(function (bench) {
+ client.benches[bench] = [];
+
+ warmup.add(function (next) {
+ client.flushall(next);
+ });
+ });
+});
+
+clients['node-redis'].on('connect', function () {
+ var old_iter = iterations;
+
+ iterations = 100;
+
+ warmup.run(function () {
+ //throw new Error;
+ iterations = old_iter;
+ setTimeout(function () {
+ task.run(end);
+ }, 1000);
+ });
+});
+
+var end = function end () {
+ process.stdout.write('\r\n');
+ var bench, client_name, client,
+ keys = Object.keys(clients),
+ bench_keys = Object.keys(benches);
+
+ for (var i = 0, il = bench_keys.length; i < il; i++) {
+ bench = bench_keys[i];
+ console.log('=== ' + bench + ' x' + iterations + ' ===');
+
+ for (var j = 0, jl = keys.length; j < jl; j++) {
+ client_name = keys[j];
+ client = clients[client_name];
+
+ console.log(client_name + ' results: ' + client.benches[bench].join(', '));
+ }
+
+ console.log('');
+
+ for (j = 0, jl = keys.length; j < jl; j++) {
+ client_name = keys[j];
+ client = clients[client_name];
+
+ client.benches[bench] = eval(client.benches[bench].join('+')) / client.benches[bench].length;
+
+ console.log(client_name + ' avg: ' + client.benches[bench]);
+ }
+
+ console.log('');
+
+ for (j = 0, jl = keys.length; j < jl; j++) {
+ client_name = keys[j];
+ client = clients[client_name];
+
+ console.log(client_name + ' ops/s: ' + ((iterations / client.benches[bench]) * 1000));
+ }
+
+ console.log('\r\n');
+ }
+
+ // Bye!
+ process.exit();
+};
diff --git a/index.js b/index.js
new file mode 100644
index 0000000..0967c44
--- /dev/null
+++ b/index.js
@@ -0,0 +1,395 @@
+// The MIT License
+//
+// Copyright (c) 2013 Tim Smart
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files
+// (the "Software"), to deal in the Software without restriction,
+// including without limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of the Software, and
+// to permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var net = require('net'),
+ utils = require('./utils'),
+ Parser = require('./parser');
+
+var RedisClient = function RedisClient(port, host, auth) {
+ this.host = host;
+ this.port = port;
+ this.auth = auth;
+ this.stream = net.createConnection(port, host);;
+ this.connected = false;
+ // Pub/sub monitor etc.
+ this.blocking = false;
+ // Command queue.
+ this.max_size = 1300;
+ this.command = '';
+ this.commands = new utils.Queue();
+ // For the retry timer.
+ this.retry = false;
+ this.retry_attempts = 0;
+ this.retry_delay = 250;
+ this.retry_backoff = 1.7;
+ // If we want to quit.
+ this.quitting = false;
+ // For when we have a full send buffer.
+ this.paused = false;
+ this.send_buffer = [];
+ this.flushing = false;
+
+ var self = this;
+
+ this.stream.on("connect", function () {
+ // Reset the retry backoff.
+ self.retry = false;
+ self.retry_delay = 250;
+ self.retry_attempts = 0;
+ self.stream.setNoDelay();
+ self.stream.setTimeout(0);
+ self.connected = true;
+
+ // Resend commands if we need to.
+ var command,
+ commands = self.commands.array.slice(self.commands.offset);
+
+ // Send auth.
+ if (self.auth) {
+ commands.unshift(['AUTH', [self.auth], null]);
+ }
+
+ self.commands = new utils.Queue();
+
+ for (var i = 0, il = commands.length; i < il; i++) {
+ command = commands[i];
+ self.sendCommand(command[0], command[1], command[2]);
+ }
+
+ // give connect listeners a chance to run first in case they need to auth
+ self.emit("connect");
+ });
+
+ this.stream.on("data", function (buffer) {
+ try {
+ self.parser.onIncoming(buffer);
+ } catch (err) {
+ self.emit("error", err);
+ // Reset state.
+ self.parser.resetState();
+ }
+ });
+
+ // _write
+ // So we can pipeline requests.
+ this._flush = function () {
+ if ('' !== self.command) {
+ self.send_buffer.push(self.command);
+ self.command = '';
+ }
+
+ for (var i = 0, il = self.send_buffer.length; i < il; i++) {
+ if (!self.stream.writable || false === self.stream.write(self.send_buffer[i])) {
+ return self.send_buffer = self.send_buffer.slice(i + 1);
+ }
+ }
+
+ self.send_buffer.length = 0;
+ self.paused = self.flushing = false;
+ };
+
+ // When we can write more.
+ this.stream.on('drain', this._flush);
+
+ this.stream.on("error", function (error) {
+ self.emit("error", error);
+ });
+
+ var onClose = function onClose () {
+ // Ignore if we are already retrying. Or we want to quit.
+ if (self.retry) return;
+ self.emit('end');
+ self.emit('close');
+ if (self.quitting) {
+ for (var i = 0, il = self.commands.length; i < il; i++) {
+ self.parser.emit('reply');
+ }
+ return;
+ }
+
+ self.onDisconnect();
+ };
+
+ this.stream.on("end", onClose);
+ this.stream.on("close", onClose);
+
+ // Setup the parser.
+ this.parser = new Parser();
+
+ this.parser.on('reply', function onReply (reply) {
+ if (false !== self.blocking) {
+ if ('pubsub' === self.blocking) {
+ var type = reply[0].toString();
+
+ switch (type) {
+ case 'psubscribe':
+ case 'punsubscribe':
+ case 'subscribe':
+ case 'unsubscribe':
+ var channel = reply[1].toString(),
+ count = reply[2];
+
+ if (0 === count) {
+ self.blocking = false;
+ }
+
+ self.emit(type, channel, count);
+ self.emit(type + ':' + channel, count);
+ break;
+ case 'message':
+ var key = reply[1].toString(),
+ data = reply[2];
+ self.emit('message', key, data);
+ self.emit('message:' + key, data);
+ break;
+ case 'pmessage':
+ var pattern = reply[1].toString(),
+ key = reply[2].toString(),
+ data = reply[3];
+ self.emit('pmessage', pattern, key, data);
+ self.emit('pmessage:' + pattern, key, data);
+ break;
+ }
+ } else {
+ self.emit('data', reply);
+ }
+ return;
+ }
+
+ var command = self.commands.shift();
+ if (command) {
+ switch (command[0]) {
+ case 'MONITOR':
+ self.blocking = true;
+ break;
+ case 'SUBSCRIBE':
+ case 'PSUBSCRIBE':
+ self.blocking = 'pubsub';
+ onReply(reply);
+ return;
+ }
+
+ if (command[2]) {
+ command[2](null, reply);
+ }
+ }
+ });
+
+ // DB error
+ this.parser.on('error', function (error) {
+ var command = self.commands.shift();
+ error = new Error(error);
+ if (command && command[2]) command[2](error);
+ else self.emit('error', error);
+ });
+
+ process.EventEmitter.call(this);
+
+ return this;
+};
+
+RedisClient.prototype = Object.create(process.EventEmitter.prototype);
+
+// Exports
+exports.RedisClient = RedisClient;
+
+// createClient
+exports.createClient = function createClient (port, host, auth) {
+ return new RedisClient(port || 6379, host, auth);
+};
+
+RedisClient.prototype.connect = function () {
+ return this.stream.connect();
+};
+
+RedisClient.prototype.onDisconnect = function (error) {
+ var self = this;
+
+ // Make sure the stream is reset.
+ this.connected = false;
+ this.stream.destroy();
+ this.parser.resetState();
+
+ // Increment the attempts, so we know what to set the timeout to.
+ this.retry_attempts++;
+
+ // Set the retry timer.
+ setTimeout(function () {
+ self.stream.connect(self.port, self.host);
+ }, this.retry_delay);
+
+ this.retry_delay *= this.retry_backoff;
+ this.retry = true;
+};
+
+RedisClient.prototype._write = function (data) {
+ if (!this.paused) {
+ if (false === this.stream.write(data)) {
+ this.paused = true;
+ }
+ } else {
+ this.send_buffer.push(data);
+ }
+};
+
+// We use this so we can watch for a full send buffer.
+RedisClient.prototype.write = function write (data, buffer) {
+ if (true !== buffer) {
+ this.command += data;
+ if (this.max_size <= this.command.length) {
+ this._write(this.command);
+ this.command = '';
+ }
+ } else {
+ if ('' !== this.command) {
+ this._write(this.command);
+ this.command = '';
+ }
+ this._write(data);
+ }
+
+ if (!this.flushing) {
+ process.nextTick(this._flush);
+ this.flushing = true;
+ }
+};
+
+// We make some assumptions:
+//
+// * command WILL be uppercase and valid.
+// * args IS an array
+RedisClient.prototype.sendCommand = function (command, args, callback) {
+ // Push the command to the stack.
+ if (false === this.blocking) {
+ this.commands.push([command, args, callback]);
+ }
+
+ // Writable?
+ if (false === this.connected) return;
+
+ // Do we have to send a multi bulk command?
+ // Assume it is a valid command for speed reasons.
+ var args_length;
+
+ if (args && 0 < (args_length = args.length)) {
+ var arg, arg_type, last,
+ previous = ['*', (args_length + 1), '\r\n', '$', command.length, '\r\n', command, '\r\n'];
+
+ for (i = 0, il = args_length; i < il; i++) {
+ arg = args[i];
+ arg_type = typeof arg;
+
+ if ('string' === arg_type) {
+ // We can send this in one go.
+ previous.push('$', Buffer.byteLength(arg), '\r\n', arg, '\r\n');
+ } else if ('number' === arg_type) {
+ // We can send this in one go.
+ previous.push('$', ('' + arg).length, '\r\n', arg, '\r\n');
+ } else if (null === arg || 'undefined' === arg_type) {
+ // Send NIL
+ previous.push('$\r\b\r\b')
+ this.write(previous.join(''));
+ previous = [];
+ } else {
+ // Assume we are a buffer.
+ previous.push('$', arg.length, '\r\n');
+ this.write(previous.join(''));
+ this.write(arg, true);
+ previous = ['\r\n'];
+ }
+ }
+
+ // Anything left?
+ this.write(previous.join(''));
+ } else {
+ // We are just sending a stand alone command.
+ this.write(command_buffers[command]);
+ }
+};
+
+RedisClient.prototype.destroy = function () {
+ this.quitting = true;
+ return this.stream.destroy();
+};
+
+// http://redis.io/commands.json
+exports.commands = [
+ 'APPEND', 'AUTH', 'BGREWRITEAOF', 'BGSAVE', 'BLPOP', 'BRPOP', 'BRPOPLPUSH', 'CONFIG GET',
+ 'CONFIG SET', 'CONFIG RESETSTAT', 'DBSIZE', 'DEBUG OBJECT', 'DEBUG SEGFAULT', 'DECR',
+ 'DECRBY', 'DEL', 'DISCARD', 'ECHO', 'EXEC', 'EXISTS', 'EXPIRE', 'EXPIREAT', 'FLUSHALL',
+ 'FLUSHDB', 'GET', 'GETBIT', 'GETRANGE', 'GETSET', 'HDEL', 'HEXISTS', 'HGET', 'HGETALL',
+ 'HINCRBY', 'HKEYS', 'HLEN', 'HMGET', 'HMSET', 'HSET', 'HSETNX', 'HVALS', 'INCR', 'INCRBY',
+ 'INFO', 'KEYS', 'LASTSAVE', 'LINDEX', 'LINSERT', 'LLEN', 'LPOP', 'LPUSH', 'LPUSHX', 'LRANGE',
+ 'LREM', 'LSET', 'LTRIM', 'MGET', 'MONITOR', 'MOVE', 'MSET', 'MSETNX', 'MULTI', 'PERSIST',
+ 'PING', 'PSUBSCRIBE', 'PUBLISH', 'PUNSUBSCRIBE', 'QUIT', 'RANDOMKEY', 'RENAME', 'RENAMENX',
+ 'RPOP', 'RPOPLPUSH', 'RPUSH', 'RPUSHX', 'SADD', 'SAVE', 'SCARD', 'SDIFF', 'SDIFFSTORE', 'SELECT',
+ 'SET', 'SETBIT', 'SETEX', 'SETNX', 'SETRANGE', 'SHUTDOWN', 'SINTER', 'SINTERSTORE', 'SISMEMBER',
+ 'SLAVEOF', 'SMEMBERS', 'SMOVE', 'SORT', 'SPOP', 'SRANDMEMBER', 'SREM', 'STRLEN', 'SUBSCRIBE',
+ 'SUNION', 'SUNIONSTORE', 'SYNC', 'TTL', 'TYPE', 'UNSUBSCRIBE', 'UNWATCH', 'WATCH', 'ZADD',
+ 'ZCARD', 'ZCOUNT', 'ZINCRBY', 'ZINTERSTORE', 'ZRANGE', 'ZRANGEBYSCORE', 'ZRANK', 'ZREM',
+ 'ZREMRANGEBYRANK', 'ZREMRANGEBYSCORE', 'ZREVRANGE', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZSCORE',
+ 'ZUNIONSTORE'
+];
+
+this.blocking_commands = ["MONITOR"];
+
+// For each command, make a buffer for it.
+var command_buffers = {};
+
+exports.commands.forEach(function (command) {
+ // Pre-alloc buffers for non-multi commands.
+ //command_buffers[command] = new Buffer('*1\r\n$' + command.length + '\r\n' + command + '\r\n');
+ command_buffers[command] = '*1\r\n$' + command.length + '\r\n' + command + '\r\n';
+
+ // Don't override stuff.
+ if (!RedisClient.prototype[command.toLowerCase()]) {
+ RedisClient.prototype[command.toLowerCase()] = function (array, callback) {
+ // An array of args.
+ // Assume we only have two args.
+ if (Array.isArray(array)) {
+ return this.sendCommand(command, array, callback);
+ }
+
+ // Arbitary amount of arguments.
+ var args = [];
+ args.push.apply(args, arguments);
+ callback = 'function' === typeof args[args.length - 1];
+
+ if (callback) {
+ callback = args.pop();
+ } else {
+ callback = null;
+ }
+
+ this.sendCommand(command, args, callback);
+ };
+ }
+});
+
+// Overwrite quit
+RedisClient.prototype.quit = RedisClient.prototype.end =
+function (callback) {
+ this.quitting = true;
+ return this.sendCommand('QUIT', null, callback);
+};
+
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..a594898
--- /dev/null
+++ b/package.json
@@ -0,0 +1,15 @@
+{
+ "name": "node-redis",
+ "description": "Lightweight, fast, Redis client.",
+ "version": "0.1.6",
+ "author": "Tim Smart",
+ "contributors": [
+ "Matt Ranney"
+ ],
+ "repository": {
+ "type": "git",
+ "url": "http://github.com/Tim-Smart/node-redis.git"
+ },
+ "engine": [ "node >=0.2.2" ],
+ "main": "./"
+}
diff --git a/parser.js b/parser.js
new file mode 100644
index 0000000..5a65cfb
--- /dev/null
+++ b/parser.js
@@ -0,0 +1,379 @@
+// The MIT License
+//
+// Copyright (c) 2013 Tim Smart
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files
+// (the "Software"), to deal in the Software without restriction,
+// including without limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of the Software, and
+// to permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var utils = require('./utils');
+
+var RedisParser = function RedisParser () {
+ this.resetState();
+
+ process.EventEmitter.call(this);
+
+ return this;
+};
+
+module.exports = RedisParser;
+
+RedisParser.prototype = Object.create(process.EventEmitter.prototype);
+
+// Reset state, no matter where we are at.
+RedisParser.prototype.resetState = function resetState () {
+ this.reply = null;
+ this.expected = null;
+ this.multi = null;
+ this.replies = null;
+ this.pos = null;
+ this.flag = 'TYPE';
+ this.data = null;
+ this.last_data = null;
+ this.remaining = null;
+};
+
+// Handle an incoming buffer.
+RedisParser.prototype.onIncoming = function onIncoming (buffer) {
+ var char_code,
+ pos = this.pos || 0,
+ length = buffer.length;
+
+ // Make sure the buffer is joint properly.
+ if ('TYPE' !== this.flag && 'BULK' !== this.flag && null !== this.data) {
+ // We need to wind back a step.
+ // If we have CR now, it would break the parser.
+ if (0 !== this.data.length) {
+ char_code = this.data.charCodeAt(this.data.length - 1);
+ this.data = this.data.slice(0, -1);
+ --pos;
+ } else {
+ char_code = buffer[pos];
+ }
+ }
+
+ for (; length > pos;) {
+ switch (this.flag) {
+ case 'TYPE':
+ // What are we doing next?
+ switch (buffer[pos++]) {
+ // Single line status reply.
+ case 43: // + SINGLE
+ this.flag = 'SINGLE';
+ break;
+
+ // Tells us the length of upcoming data.
+ case 36: // $ LENGTH
+ this.flag = 'BULK_LENGTH';
+ break;
+
+ // Tells us how many args are coming up.
+ case 42: // * MULTI
+ this.flag = 'MULTI_BULK';
+ break;
+
+ case 58: // : INTEGER
+ this.flag = 'INTEGER';
+ break;
+
+ // Errors
+ case 45: // - ERROR
+ this.flag = 'ERROR';
+ break;
+ }
+ // Fast forward a char.
+ char_code = buffer[pos];
+ this.data = '';
+ break;
+
+ // Single line status replies.
+ case 'SINGLE':
+ case 'ERROR':
+ // Add char to the data
+ this.data += String.fromCharCode(char_code);
+ pos++;
+
+ // Optimize for the common use case.
+ if ('O' === this.data && 75 === buffer[pos]) { // OK
+ // Send off the reply.
+ this.data = 'OK';
+ this.onData();
+
+ pos += 3; // Skip the `K\r\n`
+
+ // break early.
+ break;
+ }
+
+ // Otherwise check for CR
+ char_code = buffer[pos];
+ if (13 === char_code) { // \r CR
+ // Send the reply.
+ if ('SINGLE' === this.flag) {
+ this.onData();
+ } else {
+ this.onError();
+ }
+
+ // Skip \r\n
+ pos += 2;
+ }
+ break;
+
+ // We have a integer coming up. Look for a CR
+ // then assume that is the end.
+ case 'BULK_LENGTH':
+ // We are still looking for more digits.
+ // char_code already set by TYPE state.
+ this.data += String.fromCharCode(char_code);
+ pos++;
+
+ // Is the next char the end? Set next char_code while
+ // we are at it.
+ char_code = buffer[pos];
+ if (13 === char_code) { // \r CR
+ // Cast to int
+ this.data = +this.data;
+
+ // Null reply?
+ if (-1 !== this.data) {
+ this.flag = 'BULK';
+ this.last_data = this.data;
+ this.data = null;
+ } else {
+ this.data = null;
+ this.onData();
+ }
+
+ // Skip the \r\n
+ pos += 2;
+ }
+ break;
+
+ // Short bulk reply.
+ case 'BULK':
+ if (null === this.data && length >= (pos + this.last_data)) {
+ // Slow slice is slow.
+ if (14 > this.last_data) {
+ this.data = new Buffer(this.last_data);
+ for (var i = 0; i < this.last_data; i++) {
+ this.data[i] = buffer[i + pos];
+ }
+ } else {
+ this.data = buffer.slice(pos, this.last_data + pos);
+ }
+
+ // Fast forward past data.
+ pos += this.last_data + 2;
+
+ // Send it off.
+ this.onData();
+ } else if (this.data) {
+ // Still joining. pos = amount left to go.
+ if (this.remaining <= length) {
+ // End is within this buffer.
+ if (13 < this.remaining) {
+ buffer.copy(this.data, this.last_data - this.remaining, 0, this.remaining)
+ } else {
+ utils.copyBuffer(buffer, this.data, this.last_data - this.remaining, 0, this.remaining);
+ }
+
+ // Fast forward past data.
+ pos = this.remaining + 2;
+ this.remaining = null;
+
+ this.onData();
+ } else {
+ // We have more to come. Copy what we got then move on,
+ // decrementing the amount we have copied from this.remaining
+ if (13 < (this.remaining - length)) {
+ utils.copyBuffer(buffer, this.data, this.last_data - this.remaining, 0, length);
+ } else {
+ buffer.copy(this.data, this.last_data - this.remaining, 0, length);
+ }
+
+ // More to go.
+ this.remaining -= length;
+ pos = length;
+ }
+ } else {
+ // We will have to do a join.
+ this.data = new Buffer(this.last_data);
+
+ // Fast copy if small.
+ if (15 > this.last_data) {
+ utils.copyBuffer(buffer, this.data, 0, pos);
+ } else {
+ buffer.copy(this.data, 0, pos)
+ }
+
+ // Point pos to the amount we need.
+ this.remaining = this.last_data - (length - pos);
+ pos = length;
+ }
+ break;
+
+ // How many bulk's are coming?
+ case 'MULTI_BULK':
+ // We are still looking for more digits.
+ // char_code already set by TYPE state.
+ this.data += String.fromCharCode(char_code);
+ pos++;
+
+ // Is the next char the end? Set next char_code while
+ // we are at it.
+ char_code = buffer[pos];
+ if (13 === char_code) { // \r CR
+ // Cast to int
+ this.last_data = +this.data;
+ this.data = null;
+
+ // Are we multi?
+ if (null === this.expected) {
+ this.expected = this.last_data;
+ this.reply = [];
+ } else if (null === this.multi) {
+ this.multi = this.expected;
+ this.expected = null;
+ this.replies = [];
+ }
+
+ // Skip the \r\n
+ pos += 2;
+ this.flag = 'TYPE';
+
+ // Zero length replies.
+ if (0 === this.last_data) {
+ this.expected = this.reply = null;
+ this.data = [];
+ this.onData();
+ break;
+ } else if (-1 === this.last_data) {
+ // NIL reply.
+ this.expected = this.reply = null;
+ this.data = null;
+ this.onData();
+ break;
+ }
+
+ char_code = buffer[pos];
+
+ // Will have to look ahead to check for another MULTI in case
+ // we are a multi transaction.
+ if (36 === char_code) { // $ - BULK_LENGTH
+ // We are bulk data.
+ this.flag = 'BULK_LENGTH';
+
+ // We are skipping the TYPE check. Skip the $
+ pos++;
+ // We need to set char code and data.
+ char_code = buffer[pos];
+ this.data = '';
+ } else if (null === this.multi && char_code) {
+ // Multi trans time.
+ this.multi = this.expected;
+ this.expected = null;
+ this.replies = [];
+ }
+ }
+ break;
+
+ case 'INTEGER':
+ // We are still looking for more digits.
+ // char_code already set by TYPE state.
+ this.data += String.fromCharCode(char_code);
+ pos++;
+
+ // Is the next char the end? Set next char_code while
+ // we are at it.
+ char_code = buffer[pos];
+ if (13 === char_code) { // \r CR
+ // Cast to int
+ this.data = +this.data;
+ this.onData();
+
+ // Skip the \r\n
+ pos += 2;
+ }
+ break;
+ }
+ }
+
+ // In case we have multiple packets.
+ this.pos = pos - length;
+};
+
+// When we have recieved a chunk of response data.
+RedisParser.prototype.onData = function onData () {
+ if (null !== this.expected) {
+ // Decrement the expected data replies and add the data.
+ this.reply.push(this.data);
+ this.expected--;
+
+ // Finished? Send it off.
+ if (0 === this.expected) {
+ if (null !== this.multi) {
+ this.replies.push(this.reply);
+ this.multi--;
+
+ if (0 === this.multi) {
+ this.emit('reply', this.replies);
+ this.replies = this.multi = null;
+ }
+ } else {
+ this.emit('reply', this.reply);
+ }
+ this.reply = this.expected = null;
+ }
+ } else {
+ if (null === this.multi) {
+ this.emit('reply', this.data);
+ } else {
+ this.replies.push(this.data);
+ this.multi--;
+
+ if (0 === this.multi) {
+ this.emit('reply', this.replies);
+ this.replies = this.multi = null;
+ }
+ }
+ }
+
+ this.last_data = null;
+ this.data = null;
+ this.flag = 'TYPE';
+};
+
+// Found an error.
+RedisParser.prototype.onError = function onError () {
+ if (null === this.multi) {
+ this.emit('error', this.data);
+ } else {
+ this.replies.push(this.data);
+ this.multi--;
+
+ if (0 === this.multi) {
+ this.emit('reply', this.replies);
+ this.replies = this.multi = null;
+ }
+ }
+
+ this.last_data = null;
+ this.data = null;
+ this.flag = 'TYPE';
+};
diff --git a/test/main.js b/test/main.js
new file mode 100644
index 0000000..a522131
--- /dev/null
+++ b/test/main.js
@@ -0,0 +1,94 @@
+var c = require('../').createClient(null, null, 'test'),
+ c2 = require('../').createClient(),
+ assert = require('assert');
+
+var buffer = new Buffer(new Array(1025).join('x'));
+
+module.exports = {
+ "test basic commands": function (done) {
+ c.set('1', 'test');
+ c.get('1', function (error, value) {
+ assert.ok(!error);
+ assert.equal(value, 'test');
+ });
+
+ c.del('1', function (error) {
+ assert.ok(!error);
+ });
+
+ c.get('1', function (error, value) {
+ assert.ok(!error);
+ assert.isNull(value);
+ done();
+ });
+
+ },
+ "test stress": function (done) {
+ var n = 0,
+ o = 0;
+
+ for (var i = 0; i < 10000; i++) {
+ c.set('2' + i, buffer, function (error) {
+ assert.ok(!error);
+ ++n;
+ });
+ }
+
+ for (i = 0; i < 10000; i++) {
+ c.del('2' + i, function (error) {
+ assert.ok(!error);
+ ++o;
+ });
+ }
+
+ c.ping(function (error) {
+ assert.ok(!error)
+ done()
+ })
+
+ process.on('exit', function () {
+ assert.equal(10000, n);
+ assert.equal(10000, o);
+ });
+ },
+ "test pubsub": function (done) {
+ c.subscribe('test');
+ c.on('subscribe:test', function (count) {
+ assert.equal(1, count);
+ c2.publish('test', '123', function (error) {
+ assert.ok(!error);
+ });
+ });
+ c.on('message:test', function (data) {
+ assert.equal('123', data.toString());
+ c.unsubscribe('test');
+ });
+ c.on('unsubscribe:test', function (count) {
+ assert.equal(0, count);
+ assert.equal(false, c.blocking);
+ c.ping(function (error) {
+ assert.ok(!error);
+ done();
+ });
+ });
+ },
+ "test monitor": function (done) {
+ c.monitor();
+ c.once('data', function (data) {
+ assert.ok(/MONITOR/.test(data));
+ c.once('data', function (data) {
+ assert.ok(/SET/.test(data));
+ c.once('data', function (data) {
+ assert.ok(/DEL/.test(data));
+ done();
+ });
+ });
+ });
+ c2.set('test', 123);
+ c2.del('test');
+ },
+ after: function () {
+ c.quit();
+ c2.quit();
+ }
+};
diff --git a/utils.js b/utils.js
new file mode 100644
index 0000000..52eb029
--- /dev/null
+++ b/utils.js
@@ -0,0 +1,113 @@
+// The MIT License
+//
+// Copyright (c) 2013 Tim Smart
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files
+// (the "Software"), to deal in the Software without restriction,
+// including without limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of the Software, and
+// to permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// noop to keep references low.
+exports.noop = function () {};
+
+// Logger function.
+exports.log = function log (error, results) {
+ if (error) return console.error(error);
+
+ var ret;
+
+ if (results instanceof Array) {
+ var result;
+ ret = [];
+
+ for (var i = 0, il = results.length; i < il; i++) {
+ result = results[i];
+
+ if (result instanceof Buffer) {
+ ret.push(result.toString());
+ } else {
+ ret.push(result);
+ }
+ }
+ } else if (results instanceof Buffer) {
+ ret = results.toString();
+ } else ret = results;
+
+ console.log(ret);
+};
+
+// Fast copyBuffer method for small buffers.
+exports.copyBuffer = function copyBuffer (source, target, start, s_start, s_end) {
+ s_end || (s_end = source.length);
+
+ for (var i = s_start; i < s_end; i++) {
+ target[i - s_start + start] = source[i];
+ }
+
+ return target;
+};
+
+// Fast write buffer for small uns.
+var writeBuffer = exports.writeBuffer = function writeBuffer (buffer, string, offset) {
+ for (var i = 0, il = string.length; i < il; i++) {
+ buffer[i + offset] = string.charCodeAt(i);
+ }
+
+ return il;
+};
+
+var toArray = exports.toArray = function toArray (args) {
+ var len = args.length,
+ arr = new Array(len), i;
+
+ for (i = 0; i < len; i++) {
+ arr[i] = args[i];
+ }
+
+ return arr;
+};
+
+// Queue class adapted from Tim Caswell's pattern library
+// http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js
+var Queue = function () {
+ this.array = Array.prototype.slice.call(arguments);
+ this.offset = 0;
+};
+
+exports.Queue = Queue;
+
+Queue.prototype.shift = function () {
+ if (this.array.length === 0) return;
+ var ret = this.array[this.offset];
+ this.array[this.offset++] = undefined;
+ if (this.offset === this.array.length) {
+ this.array.length = 0;
+ this.offset = 0;
+ }
+ return ret;
+}
+
+Queue.prototype.push = function (item) {
+ return this.array.push(item);
+};
+
+Object.defineProperty(Queue.prototype, 'length', {
+ get: function () {
+ return this.array.length;
+ }
+});
+;
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-node-redis.git
More information about the Pkg-javascript-commits
mailing list