Skip to content

Commit dc321fe

Browse files
authored
Merge pull request #4 from krissss/master
Support prefix
2 parents 07999e0 + e060fe3 commit dc321fe

1 file changed

Lines changed: 14 additions & 13 deletions

File tree

src/Client.php

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class Client
6262
'max_attempts' => 5,
6363
'auth' => '',
6464
'db' => 0,
65+
'prefix' => '',
6566
];
6667

6768
/**
@@ -115,16 +116,16 @@ public function send($queue, $data, $delay = 0, $cb = null)
115116
$cb((bool)$ret);
116117
};
117118
if ($delay == 0) {
118-
$this->_redisSend->lPush(static::QUEUE_WAITING . $queue, $package_str, $cb);
119+
$this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $queue, $package_str, $cb);
119120
} else {
120-
$this->_redisSend->zAdd(static::QUEUE_DELAYED, $now + $delay, $package_str, $cb);
121+
$this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $now + $delay, $package_str, $cb);
121122
}
122123
return;
123124
}
124125
if ($delay == 0) {
125-
$this->_redisSend->lPush(static::QUEUE_WAITING . $queue, $package_str);
126+
$this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $queue, $package_str);
126127
} else {
127-
$this->_redisSend->zAdd(static::QUEUE_DELAYED, $now + $delay, $package_str);
128+
$this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $now + $delay, $package_str);
128129
}
129130
}
130131

@@ -138,7 +139,7 @@ public function subscribe($queue, callable $callback)
138139
{
139140
$queue = (array)$queue;
140141
foreach ($queue as $q) {
141-
$redis_key = static::QUEUE_WAITING . $q;
142+
$redis_key = $this->_options['prefix'] . static::QUEUE_WAITING . $q;
142143
$this->_subscribeQueues[$redis_key] = $callback;
143144
}
144145
$this->pull();
@@ -154,7 +155,7 @@ public function unsubscribe($queue)
154155
{
155156
$queue = (array)$queue;
156157
foreach ($queue as $q) {
157-
$redis_key = static::QUEUE_WAITING . $q;
158+
$redis_key = $this->_options['prefix'] . static::QUEUE_WAITING . $q;
158159
unset($this->_subscribeQueues[$redis_key]);
159160
}
160161
}
@@ -171,21 +172,21 @@ protected function tryToPullDelayQueue()
171172
$retry_timer = Timer::add(1, function () {
172173
$now = time();
173174
$options = ['LIMIT', 0, 128];
174-
$this->_redisSend->zrevrangebyscore(static::QUEUE_DELAYED, $now, '-inf', $options, function ($items) {
175+
$this->_redisSend->zrevrangebyscore($this->_options['prefix'] . static::QUEUE_DELAYED, $now, '-inf', $options, function ($items) {
175176
if ($items === false) {
176177
throw new RuntimeException($this->_redisSend->error());
177178
}
178179
foreach ($items as $package_str) {
179-
$this->_redisSend->zRem(static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) {
180+
$this->_redisSend->zRem($this->_options['prefix'] . static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) {
180181
if ($result !== 1) {
181182
return;
182183
}
183184
$package = \json_decode($package_str, true);
184185
if (!$package) {
185-
$this->_redisSend->lPush(static::QUEUE_FAILED, $package_str);
186+
$this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str);
186187
return;
187188
}
188-
$this->_redisSend->lPush(static::QUEUE_WAITING . $package['queue'], $package_str);
189+
$this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $package['queue'], $package_str);
189190
});
190191
}
191192
});
@@ -208,7 +209,7 @@ public function pull()
208209
$package_str = $data[1];
209210
$package = json_decode($package_str, true);
210211
if (!$package) {
211-
$this->_redisSend->lPush(static::QUEUE_FAILED, $package_str);
212+
$this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str);
212213
} else {
213214
if (!isset($this->_subscribeQueues[$redis_key])) {
214215
// 取消订阅,放回队列
@@ -252,14 +253,14 @@ public function pull()
252253
protected function retry($package)
253254
{
254255
$delay = time() + $this->_options['retry_seconds'] * ($package['attempts']);
255-
$this->_redisSend->zAdd(static::QUEUE_DELAYED, $delay, \json_encode($package));
256+
$this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $delay, \json_encode($package));
256257
}
257258

258259
/**
259260
* @param $package
260261
*/
261262
protected function fail($package)
262263
{
263-
$this->_redisSend->lPush(static::QUEUE_FAILED, \json_encode($package));
264+
$this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, \json_encode($package));
264265
}
265266
}

0 commit comments

Comments
 (0)