diff --git a/Readme.md b/Readme.md index 1c83236..0e62afe 100644 --- a/Readme.md +++ b/Readme.md @@ -28,7 +28,7 @@ $ npm install nsq.js except fo the framer: ``` -$ DEBUG=nsq*,-nsq:framer node test +$ DEBUG=nsq*,-nsq:framer npm test nsq:reader connect nsqd 0.0.0.0:4150 events/ingestion [5] +0ms nsq:connection connect: 0.0.0.0:4150 V2 +0ms @@ -151,17 +151,17 @@ Close the writer's connection(s) and fire the optional [fn] when completed. A single message. -### Message#finish() +### Message#finish([fn]) - Mark message as complete. + Mark message as complete.. -### Message#requeue([delay]) +### Message#requeue([delay], [fn]) Re-queue the message immediately, or with the given `delay` in milliseconds, or a string such as "5s", "10m" etc. -### Message#touch() +### Message#touch([fn]) Reset the message's timeout, increasing the length of time before NSQD considers it timed out. diff --git a/circle.yml b/circle.yml index 4a36d5d..85e490f 100644 --- a/circle.yml +++ b/circle.yml @@ -4,14 +4,14 @@ machine: dependencies: pre: - - wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.6.linux-amd64.go1.5.1.tar.gz - - tar xvzf nsq-0.3.6.linux-amd64.go1.5.1.tar.gz - - cp nsq-0.3.6.linux-amd64.go1.5.1/bin/* . + - wget https://github.com/nsqio/nsq/releases/download/v0.3.8/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz + - tar xvzf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz + - cp nsq-0.3.8.linux-amd64.go1.6.2/bin/* . - ./nsqlookupd: background: true - ./nsqd --lookupd-tcp-address=127.0.0.1:4160: background: true cache_directories: - - nsq-0.3.6.linux-amd64.go1.5.1 + - nsq-0.3.8.linux-amd64.go1.6.2 override: - npm install diff --git a/lib/connection.js b/lib/connection.js index c46a368..2176be2 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -341,13 +341,17 @@ Connection.prototype.ready = function(n){ * @api private */ -Connection.prototype.finish = function(id){ +Connection.prototype.finish = function(id, fn){ assertValidMessageId(id); - var fn = this.onerror; var self = this; - if (!this._ready) return fn(new Error('cannot finish, connection not ready')); - this.command('FIN', [id], function(){ + if (!this._ready) { + var err = new Error('cannot finish, connection not ready'); + if (fn) fn(err); + return this.onerror(err); + } + this.command('FIN', [id], function(err){ self.emit('finish', id); + if (fn) fn(err); --self.inFlight; }); }; @@ -361,13 +365,17 @@ Connection.prototype.finish = function(id){ * @api private */ -Connection.prototype.requeue = function(id, timeout){ +Connection.prototype.requeue = function(id, timeout, fn){ assertValidMessageId(id); - var fn = this.onerror; var self = this; - if (!this._ready) return fn(new Error('cannot requeue, connection not ready')); - this.command('REQ', [id, timeout || 0], function(){ + if (!this._ready) { + var err = new Error('cannot requeue, connection not ready'); + if (fn) fn(err); + return this.onerror(err); + } + this.command('REQ', [id, timeout || 0], function(err){ self.emit('requeue', id); + if (fn) fn(err); --self.inFlight; }); }; @@ -379,13 +387,17 @@ Connection.prototype.requeue = function(id, timeout){ * @api private */ -Connection.prototype.touch = function(id){ +Connection.prototype.touch = function(id, fn){ assertValidMessageId(id); - var fn = this.onerror; var self = this; - if (!this._ready) return fn(new Error('cannot touch, connection not ready')); - this.command('TOUCH', [id], function(){ + if (!this._ready) { + var err = new Error('cannot touch, connection not ready'); + if (fn) fn(err); + return this.onerror(err); + } + this.command('TOUCH', [id], function(err){ self.emit('touch', id); + if (fn) fn(err); }); }; diff --git a/lib/message.js b/lib/message.js index 0b5a6e5..1045c80 100644 --- a/lib/message.js +++ b/lib/message.js @@ -41,12 +41,13 @@ function Message(body, conn) { /** * Mark the message as finished. * + * @param {Function} [fn] * @api public */ -Message.prototype.finish = function(){ +Message.prototype.finish = function(fn){ this.responded = true; - this.conn.finish(this.id); + this.conn.finish(this.id, fn); this.trace('message:finish', { msg: this }); }; @@ -77,25 +78,27 @@ Message.prototype.timedout = function(){ * Re-queue the message with optional `delay`. * * @param {Number|String} [delay] + * @param {Function} [fn] * @api public */ -Message.prototype.requeue = function(delay){ +Message.prototype.requeue = function(delay, fn){ if ('string' == typeof delay) delay = ms(delay); this.responded = true; - this.conn.requeue(this.id, delay); + this.conn.requeue(this.id, delay, fn); this.trace('message:requeue', { msg: this }); }; /** * Touch the message. * + * @param {Function} [fn] * @api public */ -Message.prototype.touch = function(){ +Message.prototype.touch = function(fn){ this.lastTouch = Date.now(); - this.conn.touch(this.id); + this.conn.touch(this.id, fn); this.trace('message:touch', { msg: this }); }; diff --git a/package.json b/package.json index 43fd2b9..92d7ee5 100644 --- a/package.json +++ b/package.json @@ -12,21 +12,21 @@ "queue" ], "dependencies": { - "backo": "~1.0.1", + "backo": "~1.1.0", "bignum": "~0.12.5", - "debug": "~0.7.4", - "ms": "~0.6.2", - "node-int64": "~0.3.0", - "nsq-lookup": "~1.0.0", + "debug": "~2.2.0", + "ms": "~0.7.1", + "node-int64": "~0.4.0", + "nsq-lookup": "~1.0.2", "set-component": "~1.0.0" }, "devDependencies": { - "bytes": "~0.3.0", - "jstrace": "~0.1.0", + "bytes": "~2.4.0", + "jstrace": "~0.3.0", + "lodash.get": "^4.4.2", "mocha": "*", "should": "*", - "superagent": "~0.17.0", - "uid": "0.0.2" + "superagent": "~2.3.0" }, "license": "MIT" } diff --git a/test/acceptance/connection.js b/test/acceptance/connection.js index d95d985..d7daf59 100644 --- a/test/acceptance/connection.js +++ b/test/acceptance/connection.js @@ -1,18 +1,10 @@ +require('./hooks'); + var Connection = require('../../lib/connection'); var assert = require('assert'); -var utils = require('../utils'); -var uid = require('uid'); describe('Connection', function(){ - var topic = uid(); - afterEach(function(done){ - utils.deleteTopic(topic, function(){ - topic = uid(); - done(); - }); - }) - it('should identify on connect', function(done){ var conn = new Connection; @@ -31,17 +23,17 @@ describe('Connection', function(){ var sub = new Connection; pub.on('ready', function(){ - pub.publish(topic, 'something'); + pub.publish('test', 'something'); }); sub.on('ready', function(){ - sub.subscribe(topic, 'tailer'); + sub.subscribe('test', 'tailer'); sub.ready(5); }); sub.on('message', function(msg){ msg.finish(); - done(); + sub.close(done); }); pub.connect(); @@ -52,7 +44,7 @@ describe('Connection', function(){ var conn = new Connection; conn.on('ready', function(){ - conn.subscribe(topic, 'tailer', function(err){ + conn.subscribe('test', 'tailer', function(err){ assert(!err); conn.close(done); }); @@ -68,7 +60,7 @@ describe('Connection', function(){ conn.on('error', function(){}); conn.on('ready', function(){ conn.sock.destroy(); - conn.publish(topic, 'something', function(err){ + conn.publish('test', 'something', function(err){ called++; }); assert.equal(called, 1); @@ -94,7 +86,7 @@ describe('Connection', function(){ conn.connect(); conn.on('ready', function(){ conn.end(); - conn.publish(topic, 'stuff'); + conn.publish('test', 'stuff'); conn.on('error', done); conn.on('end', done); }); diff --git a/test/acceptance/hooks.js b/test/acceptance/hooks.js new file mode 100644 index 0000000..6eed516 --- /dev/null +++ b/test/acceptance/hooks.js @@ -0,0 +1,17 @@ + +var assert = require('assert'); +var get = require('lodash.get'); +var utils = require('../utils'); + +beforeEach(function(done){ + utils.deleteTopic('test', done); +}) + +afterEach(function(done){ + utils.stats('test', 'reader', function(err, stats){ + if (err) return done(err); + var state = get(stats.body.data, 'topics[0].channels[0].clients[0].state'); + assert(state != 3, 'client in subscribed state'); + done(); + }) +}); diff --git a/test/acceptance/reader.js b/test/acceptance/reader.js index babac84..ab0372d 100644 --- a/test/acceptance/reader.js +++ b/test/acceptance/reader.js @@ -1,223 +1,191 @@ -var utils = require('../utils'); +require('./hooks'); + var assert = require('assert'); var nsq = require('../..'); -var uid = require('uid'); describe('Reader', function(){ - var topic = uid(); - afterEach(function(done){ - utils.deleteTopic(topic, function(){ - topic = uid(); - done(); - }) - }) - - describe('Reader()', function(){ - describe('with .nsqd addresses', function(){ - it('should subscribe to messages', function(done){ - var pub = nsq.writer(); - var sub = nsq.reader({ - topic: topic, - channel: 'reader', - nsqd: ['0.0.0.0:4150'] - }); - - sub.on('message', function(msg){ - msg.finish(); - done(); - }); - - pub.on('ready', function(){ - pub.publish(topic, 'something', function(err){ - if (err) return done(err); - }); - }); - }) - - it('should connect after event handlers are added', function(done){ - var sub = nsq.reader({ - topic: topic, - channel: 'reader', - nsqd: ['0.0.0.0:4150'] - }); - - sub.connect = function(){ - sub.emit('done'); - }; - - sub.on('done', done); - }); - }) - - describe('with .nsqlookupd addresses', function(){ - it('should subscribe to messages', function(done){ - var pub = nsq.writer(); - var sub = nsq.reader({ - topic: topic, - channel: 'reader', - nsqlookupd: ['0.0.0.0:4161'], - pollInterval: 100 - }); - sub.on('message', function(msg){ - msg.finish(); - done(); - }); - - pub.on('ready', function(){ - pub.publish(topic, 'something', function(err){ - if (err) return done(err); - }); - }); - }) - - it('should set .timer attribute', function(done){ - var sub = nsq.reader({ - topic: topic, - channel: 'reader', - nsqlookupd: ['0.0.0.0:4161'] - }); - - setImmediate(function(){ - assert(sub.timer !== null); - done(); - }); - }) - }) - - it('should discard messages after the max attempts', function(done){ + describe('with .nsqd addresses', function(){ + it('should subscribe to messages', function(done){ var pub = nsq.writer(); - var sub = nsq.reader({ - topic: topic, - channel: 'reader', - nsqd: ['0.0.0.0:4150'], - maxAttempts: 5, - }); - - sub.once('discard', function(msg){ - sub.removeAllListeners('message'); - done(); - }); - - sub.on('message', function(msg){ - msg.requeue(0); - }); - - pub.on('ready', function(){ - pub.publish(topic, 'something'); - }); - }) - }) - - describe('Reader#close()', function(){ - var topic = uid(); - beforeEach(function(done){ - var oldTopic = topic; - var newTopic = uid(); - topic = newTopic; - utils.deleteTopic(oldTopic, function(){ - utils.createTopic(newTopic, done); - }) - }) - - it('should wait for pending messages and emit "close"', function(done){ - var pub = nsq.writer(); - var recv = 0; var sub = nsq.reader({ - topic: topic, + topic: 'test', channel: 'reader', - nsqd: ['0.0.0.0:4150'], - maxInFlight: 10 + nsqd: ['0.0.0.0:4150'] }); - var n = 0; pub.on('ready', function(){ - var count = 30; - while (count--) pub.publish(topic, { n: n++ }); - }); - - sub.on('close', function(){ - assert(recv < 30, 'received too many messages'); - done(); + pub.publish('test', 'something'); }); - sub.on('message', function(msg){ - setTimeout(function(){ - if (recv++ == 10) sub.close(); - msg.finish(); - }, 50); + sub.once('message', function(msg){ + msg.finish(); + sub.close(done); }); }) }) - describe('Reader#close(fn)', function(){ - it('should wait for pending messages and invoke the callback', function(done){ + describe('with .nsqlookupd addresses', function(){ + it('should subscribe to messages', function(done){ var pub = nsq.writer(); - var recv = 0; - - var sub = nsq.reader({ - topic: topic, - channel: 'reader', - nsqd: ['0.0.0.0:4150'], - maxInFlight: 10 - }); - var n = 0; pub.on('ready', function(){ - var count = 30; - while (count--) pub.publish(topic, { n: n++ }); - }); + pub.publish('test', 'something', function(err){ + if (err) return done(err); + + var sub = nsq.reader({ + topic: 'test', + channel: 'reader', + nsqlookupd: ['0.0.0.0:4161'] + }); - sub.on('message', function(msg){ - setTimeout(function(){ - if (recv++ == 10) { - sub.close(function(){ - assert(recv < 30, 'received too many messages'); - done(); - }); - } - msg.finish(); - }, 50); + sub.once('message', function(msg){ + msg.finish(); + sub.close(done); + }); + }); }); }) - it('should close if there are no in-flight messages', function(done){ - var pub = nsq.writer(); + it('should set .timer attribute', function(){ var sub = nsq.reader({ - topic: topic, + topic: 'test', channel: 'reader', - nsqd: ['0.0.0.0:4150'], - maxInFlight: 10 + nsqlookupd: ['0.0.0.0:4161'] }); - sub.on('ready', function(){ - // give it some time to SUB... lame - // add subscribe event? - setTimeout(function(){ - sub.close(done); - }, 100); + setImmediate(function(){ + assert(sub.timer !== null); + sub.close(done); }); }) }) - describe('Reader#close(fn)', function(){ - it('should stop polling nsqlookupd if reader had been closed', function(done){ - var sub = nsq.reader({ - topic: topic, - channel: 'reader', - nsqlookupd: ['0.0.0.0:4161'], - pollInterval: 100 - }); + it('should discard messages after the max attempts', function(done){ + var pub = nsq.writer(); - setImmediate(function(){ - sub.close(); - sub.lookup = function(fn){ - done(new Error('setInterval() is still running')); - }; - }); + var sub = nsq.reader({ + topic: 'test', + channel: 'reader', + nsqd: ['0.0.0.0:4150'], + maxAttempts: 1 + }); - setTimeout(done, 500); - }) + pub.on('ready', function(){ + pub.publish('test', 'something'); + }); + + sub.on('message', function(msg){ + msg.requeue(); + }); + + sub.once('discard', function(msg){ + sub.removeAllListeners('message'); + sub.close(done); + }); + }) +}) + +describe('Reader#close()', function(){ + it('should wait for pending messages and emit "close"', function(done){ + var pub = nsq.writer(); + var recv = 0; + + var sub = nsq.reader({ + topic: 'test', + channel: 'reader', + nsqd: ['0.0.0.0:4150'], + maxInFlight: 10 + }); + + var n = 0; + pub.on('ready', function(){ + var count = 30; + while (count--) pub.publish('test', { n: n++ }); + }); + + sub.on('close', function(){ + assert(recv < 30, 'received too many messages'); + done(); + }); + + sub.on('message', function(msg){ + setTimeout(function(){ + if (recv++ == 10) sub.close(); + msg.finish(); + }, 50); + }); + }) +}) + +describe('Reader#close(fn)', function(){ + it('should wait for pending messages and invoke the callback', function(done){ + var pub = nsq.writer(); + var recv = 0; + + var sub = nsq.reader({ + topic: 'test', + channel: 'reader', + nsqd: ['0.0.0.0:4150'], + maxInFlight: 10 + }); + + var n = 0; + pub.on('ready', function(){ + var count = 30; + while (count--) pub.publish('test', { n: n++ }); + }); + + sub.on('message', function(msg){ + setTimeout(function(){ + if (recv++ == 10) { + sub.close(function(){ + assert(recv < 30, 'received too many messages'); + done(); + }); + } + msg.finish(); + }, 50); + }); + }) + + it('should close if there are no in-flight messages', function(done){ + var pub = nsq.writer(); + + var sub = nsq.reader({ + topic: 'empty', + channel: 'reader', + nsqd: ['0.0.0.0:4150'], + maxInFlight: 10 + }); + + sub.on('ready', function(){ + // give it some time to SUB... lame + // add subscribe event? + setTimeout(function(){ + sub.close(done); + }, 100); + }); + }) +}) + +describe('Reader#close(fn)', function(){ + it('should stop polling nsqlookupd if reader had been closed', function(done){ + var sub = nsq.reader({ + topic: 'test', + channel: 'reader', + nsqlookupd: ['0.0.0.0:4161'], + pollInterval: 100 + }); + + setImmediate(function(){ + sub.close(); + sub.lookup = function(fn){ + done(new Error('setInterval() is still running')); + }; + }); + + setTimeout(done, 500); }) -}); \ No newline at end of file +}) diff --git a/test/acceptance/writer.js b/test/acceptance/writer.js index 034fcea..a82a408 100644 --- a/test/acceptance/writer.js +++ b/test/acceptance/writer.js @@ -1,35 +1,27 @@ +require('./hooks'); + var Connection = require('../../lib/connection'); -var utils = require('../utils'); var assert = require('assert'); var nsq = require('../..'); -var uid = require('uid'); describe('Writer#publish()', function(){ - var topic = uid(); - afterEach(function(done){ - utils.deleteTopic(topic, function(){ - topic = uid(); - done(); - }); - }) - it('should publish messages', function(done){ var pub = nsq.writer(); var sub = new Connection; pub.on('ready', function(){ - pub.publish(topic, 'something'); + pub.publish('test', 'something'); }); sub.on('ready', function(){ - sub.subscribe(topic, 'tailer'); + sub.subscribe('test', 'tailer'); sub.ready(5); }); sub.on('message', function(msg){ msg.finish(); - done(); + sub.close(done); }); sub.connect(); @@ -40,7 +32,7 @@ describe('Writer#publish()', function(){ pub.on('error', function(){}); - pub.publish(topic, 'something', function(err){ + pub.publish('test', 'something', function(err){ err.message.should.equal('no nsqd nodes connected'); done(); }); @@ -71,17 +63,17 @@ describe('Writer#publish()', function(){ } pub.on('ready', function(){ - pub.publish(topic, new Buffer(1024), next); - pub.publish(topic, new Buffer(1024), next); - pub.publish(topic, new Buffer(1024), next); + pub.publish('test', new Buffer(1024), next); + pub.publish('test', new Buffer(1024), next); + pub.publish('test', new Buffer(1024), next); pub.close(function(){ assert(n === 3); - done(); + sub.close(done); }); }); sub.on('ready', function(){ - sub.subscribe(topic, 'tailer'); + sub.subscribe('test', 'tailer'); }); sub.on('message', function(msg){ @@ -99,11 +91,11 @@ describe('Writer#publish()', function(){ var n = 0; pub.on('ready', function(){ - pub.publish(topic, ['foo', 'bar', 'baz']); + pub.publish('test', ['foo', 'bar', 'baz']); }); sub.on('ready', function(){ - sub.subscribe(topic, 'something'); + sub.subscribe('test', 'something'); sub.ready(5); }); @@ -113,7 +105,7 @@ describe('Writer#publish()', function(){ if (++n == 3) { msgs.should.eql(['foo', 'bar', 'baz']); - done(); + sub.close(done); } }); @@ -127,18 +119,18 @@ describe('Writer#publish()', function(){ var sub = new Connection; pub.on('ready', function(){ - pub.publish(topic, Buffer('foobar')); + pub.publish('test', Buffer('foobar')); }); sub.on('ready', function(){ - sub.subscribe(topic, 'something'); + sub.subscribe('test', 'something'); sub.ready(5); }); sub.on('message', function(msg){ msg.finish(); msg.body.toString().should.eql('foobar'); - done(); + sub.close(done); }); sub.connect(); diff --git a/test/utils.js b/test/utils.js index de181b3..6cc42cb 100644 --- a/test/utils.js +++ b/test/utils.js @@ -13,30 +13,24 @@ var request = require('superagent'); */ exports.deleteTopic = function(topic, fn){ - req(topic, 'delete', fn) -}; - -/** - * Create `topic`. - * - * @param {String} topic - * @param {Function} fn - */ - -exports.createTopic = function(topic, fn){ - req(topic, 'create', fn); + request + .post('http://127.0.0.1:4151/topic/delete') + .query({ topic: topic }) + .end(function(err, res){ + if (err && err.status == 404) err = null; + fn(err, res); + }); }; -function req(topic, cmd, fn){ +exports.stats = function(topic, channel, fn){ request - .post('http://127.0.0.1:4151/topic/' + cmd) - .query({ topic: topic }) + .get('http://127.0.0.1:4151/stats') + .query({ format: 'json', topic: topic, channel: channel }) .end(function(err, res){ - if (err) return fn(err); - if (res.error) return fn(res.error); - fn(); - }); -} + if (err) return fn(err) + fn(null, res); + }); +}; exports.framerData = [ '000000b0000000007b226d61785f7264795f636f756e74223a323530302c2276657273696f6e223a22302e322e3234222c226d61785f6d73675f74696d656f7574223a3930303030302c226d73675f74696d656f7574223a36303030302c22746c735f7631223a66616c73652c226465666c617465223a66616c73652c226465666c6174655f6c6576656c223a302c226d61785f6465666c6174655f6c6576656c223a362c22736e61707079223a66616c73657d', @@ -44,4 +38,3 @@ exports.framerData = [ '0000002f00000002135a2ad167d76e45000130363236323534303166363566303038736f6d65206d6573736167652068657265', '062625401f65f008' ]; -