@@ -12,17 +12,15 @@ function RedisPubSub(options) {
1212 options || ( options = { } ) ;
1313
1414 this . client = options . client || redis . createClient ( options ) ;
15+ this . _clientConnection = null ;
1516
1617 // Redis doesn't allow the same connection to both listen to channels and do
1718 // operations. Make an extra redis connection for subscribing with the same
1819 // options if not provided
1920 this . observer = options . observer || redis . createClient ( this . client . options ) ;
21+ this . _observerConnection = null ;
2022
21- var pubsub = this ;
22- this . observer . on ( 'message' , function ( channel , message ) {
23- var data = JSON . parse ( message ) ;
24- pubsub . _emit ( channel , data ) ;
25- } ) ;
23+ this . _connect ( ) ;
2624}
2725module . exports = RedisPubSub ;
2826
@@ -37,27 +35,59 @@ RedisPubSub.prototype.close = function(callback) {
3735 var pubsub = this ;
3836 PubSub . prototype . close . call ( this , function ( err ) {
3937 if ( err ) return callback ( err ) ;
40- pubsub . client . quit ( function ( err ) {
41- if ( err ) return callback ( err ) ;
42- pubsub . observer . quit ( callback ) ;
43- } ) ;
38+ pubsub . _close ( ) . then ( function ( ) {
39+ callback ( ) ;
40+ } , callback ) ;
4441 } ) ;
4542} ;
4643
44+ RedisPubSub . prototype . _close = function ( ) {
45+ return this . _closing = this . _closing || this . _connect ( ) . then ( Promise . all ( [
46+ this . client . quit ( ) ,
47+ this . observer . quit ( )
48+ ] ) ) ;
49+ } ;
50+
4751RedisPubSub . prototype . _subscribe = function ( channel , callback ) {
48- this . observer . subscribe ( channel , callback ) ;
52+ var pubsub = this ;
53+ pubsub . observer
54+ . subscribe ( channel , function ( message ) {
55+ var data = JSON . parse ( message ) ;
56+ pubsub . _emit ( channel , data ) ;
57+ } )
58+ . then ( function ( ) {
59+ callback ( ) ;
60+ } , callback ) ;
4961} ;
5062
5163RedisPubSub . prototype . _unsubscribe = function ( channel , callback ) {
52- this . observer . unsubscribe ( channel , callback ) ;
64+ this . observer . unsubscribe ( channel )
65+ . then ( function ( ) {
66+ callback ( ) ;
67+ } , callback ) ;
5368} ;
5469
5570RedisPubSub . prototype . _publish = function ( channels , data , callback ) {
5671 var message = JSON . stringify ( data ) ;
57- var args = [ PUBLISH_SCRIPT , 0 , message ] . concat ( channels ) ;
58- this . client . eval ( args , callback ) ;
72+ var args = [ message ] . concat ( channels ) ;
73+ this . client . eval ( PUBLISH_SCRIPT , { arguments : args } ) . then ( function ( ) {
74+ callback ( ) ;
75+ } , callback ) ;
5976} ;
6077
78+ RedisPubSub . prototype . _connect = function ( ) {
79+ this . _clientConnection = this . _clientConnection || connect ( this . client ) ;
80+ this . _observerConnection = this . _observerConnection || connect ( this . observer ) ;
81+ return Promise . all ( [
82+ this . _clientConnection ,
83+ this . _observerConnection
84+ ] ) ;
85+ } ;
86+
87+ function connect ( client ) {
88+ return client . isOpen ? Promise . resolve ( ) : client . connect ( ) ;
89+ }
90+
6191var PUBLISH_SCRIPT =
6292 'for i = 2, #ARGV do ' +
6393 'redis.call("publish", ARGV[i], ARGV[1]) ' +
0 commit comments