Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,18 @@ Disque node.

#### `QLEN <queue-name>`

Return the length of the queue.
Return the length of the queue in the local node.

#### `GLOBALQLEN <queue-name>`

Return the length of the queue across the cluster.

This is calculated by asking all the nodes their qlen and adding
all the values together. The values is cached for a second.

The first client to run this command in a node/queue will be blocked
waiting for the answer. If some node is busy and takes more than .5s
to reply its items may not be counted.

#### `QSTAT <queue-name>`

Expand Down
4 changes: 4 additions & 0 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ void unblockClient(client *c) {
unblockClientWaitingJobRepl(c);
} else if (c->btype == BLOCKED_GETJOB) {
unblockClientBlockedForJobs(c);
} else if (c->btype == BLOCKED_GLOBAL_QLEN) {
unblockClientBlockedForQLen(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand All @@ -164,6 +166,8 @@ void replyToBlockedClientTimedOut(client *c) {
return;
} else if (c->btype == BLOCKED_GETJOB) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_GLOBAL_QLEN) {
/* Do nothing - unblock client will send the best value we got */
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down
75 changes: 75 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "server.h"
#include "cluster.h"
#include "endianconv.h"
#include "queue.h"
#include "ack.h"

#include <sys/types.h>
Expand Down Expand Up @@ -69,6 +70,8 @@ void clusterSendGotJob(clusterNode *node, job *j);
void clusterSendGotAck(clusterNode *node, char *jobid, int known);
void clusterBroadcastQueued(job *j, unsigned char flags);
void clusterBroadcastDelJob(job *j);
void clusterSendGetQLen(robj *qname, dict *nodes);
void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen);
sds representClusterNodeFlags(sds ci, uint16_t flags);

/* -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1493,6 +1496,29 @@ int clusterProcessPacket(clusterLink *link) {
else
receivePauseQueue(qname,count);
decrRefCount(qname);
} else if (type == CLUSTERMSG_TYPE_GETQLEN) {
if (!sender) return 1;
uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen);
robj *qname = createStringObject(hdr->data.queueop.about.qname,
qnamelen);

serverLog(LL_VERBOSE,"RECEIVED GETQLEN FOR QUEUE %s",
(char*)qname->ptr);

clusterSendMyQLen(sender, qname, queueNameLength(qname));

} else if (type == CLUSTERMSG_TYPE_MYQLEN) {
if (!sender) return 1;
uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen);
robj *qname = createStringObject(hdr->data.queueop.about.qname,
qnamelen);
uint32_t myqlen = ntohl(hdr->data.queueop.about.aux);

serverLog(LL_VERBOSE,"RECEIVED MYQLEN FOR QUEUE %s FROM %s: %d",
(char*)qname->ptr, sender->name, myqlen);

myQLenForQueueName(qname, myqlen);

} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
Expand Down Expand Up @@ -2092,6 +2118,55 @@ void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count) {
if (payload != buf) zfree(payload);
}

/* broadcasts a request for the qlen to the whole cluster.
* Used by GLOBALQLEN. It replies with the local queue size. */
void clusterSendGetQLen(robj *qname, dict *nodes) {
uint32_t totlen, qnamelen = sdslen(qname->ptr);
uint32_t alloclen;
clusterMsg *hdr;

serverLog(LL_VERBOSE, "Sending GETQLEN for %s, %d nodes",
(char *)qname->ptr, (int)dictSize(nodes));

totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen;
alloclen = totlen;
if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg);
hdr = zmalloc(alloclen);

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_GETQLEN);
hdr->data.queueop.about.qnamelen = htonl(qnamelen);
memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen);
hdr->totlen = htonl(totlen);
clusterBroadcastMessage(nodes,hdr,totlen);
zfree(hdr);
}

/* send a reply to a GETQLEN request */
void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen) {
uint32_t totlen, qnamelen = sdslen(qname->ptr);
uint32_t alloclen;

if (!node->link) return;

serverLog(LL_VERBOSE, "Sending QLEN for %s of %d items to %s",
(char *)qname->ptr, (int)qlen, (char *)node->name);

totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen;
alloclen = totlen;
if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg);
unsigned char buf[alloclen];
clusterMsg *hdr = (clusterMsg*) buf;

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MYQLEN);
hdr->data.queueop.about.aux = htonl(qlen);
hdr->data.queueop.about.qnamelen = htonl(qnamelen);
memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,buf,totlen);
}

/* -----------------------------------------------------------------------------
* CLUSTER cron job
* -------------------------------------------------------------------------- */
Expand Down
8 changes: 7 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ typedef struct clusterState {
#define CLUSTERMSG_TYPE_YOURJOBS 13 /* NEEDJOBS reply with jobs. */
#define CLUSTERMSG_TYPE_WORKING 14 /* Postpone re-queueing & dequeue */
#define CLUSTERMSG_TYPE_PAUSE 15 /* Change queue paused state. */
#define CLUSTERMSG_TYPE_GETQLEN 16 /* request the node qlen */
#define CLUSTERMSG_TYPE_MYQLEN 17 /* reply to node qlen request */

/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
Expand Down Expand Up @@ -161,7 +163,10 @@ typedef struct {
* the PAUSE command to specify the queue to change the paused state. */
typedef struct {
uint32_t aux; /* For NEEDJOB, how many jobs we request.
* FOR PAUSE, the pause flags to set on the queue. */
* FOR PAUSE, the pause flags to set on the queue.
* For MYQLEN, the numer of items in the queue
*/

uint32_t qnamelen; /* Queue name total length. */
char qname[8]; /* Defined as 8 bytes just for alignment. */
} clusterMsgDataQueueOp;
Expand Down Expand Up @@ -241,5 +246,6 @@ void clusterSendNeedJobs(robj *qname, int numjobs, dict *nodes);
void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count);
void clusterBroadcastJobIDMessage(dict *nodes, char *id, int type, uint32_t aux, unsigned char flags);
void clusterBroadcastPause(robj *qname, uint32_t flags);
void clusterSendGetQLen(robj *qname, dict *nodes);

#endif /* __CLUSTER_H */
1 change: 1 addition & 0 deletions src/help.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct commandHelp {
{ "WORKING", "<jobid>", "Attempt to postpone next delivery of the specified job", 4, "1.0.0" },
{ "NACK", "<jobid> [<jobid> <jobid> ...]", "Negative acknowledge: increment the NACK counter for the job and ask for a next delivery ASAP", 4, "1.0.0" },
{ "QLEN", "<queue>", "Return number of queued jobs in the specified queue in the local node", 3, "1.0.0" },
{ "GLOBALQLEN", "<queue>", "Return number of queued jobs in the specified queue across the cluster - it's cached for a second", 3, "1.0.0" },
{ "QSTAT", "<queue>", "Return local node queue statistics", 3, "1.0.0" },
{ "QPEEK", "<queue> <count>", "Inspect jobs into a queue without actually fetching them", 3, "1.0.0" },
{ "ENQUEUE", "<jobid> [<jobid> <jobid> ...]", "Force local node to put the specified jobs back into the queue", 3, "1.0.0" },
Expand Down
113 changes: 113 additions & 0 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ queue *createQueue(robj *name) {
q->needjobs_adhoc_attempt = 0;
q->needjobs_responders = NULL; /* Created on demand to save memory. */
q->clients = NULL; /* Created on demand to save memory. */
q->qlenclients = NULL; /* Created on demand to save memory. */

q->globalqlen = 0;
q->last_globalqlen_time = 0;
q->globalqlen_nodes = 0;

q->current_import_jobs_time = server.mstime;
q->current_import_jobs_count = 0;
Expand Down Expand Up @@ -251,6 +256,7 @@ int GCQueue(queue *q, time_t max_idle_time) {
time_t idle = server.unixtime - q->atime;
if (idle < max_idle_time) return C_ERR;
if (q->clients && listLength(q->clients) != 0) return C_ERR;
if (q->qlenclients && listLength(q->qlenclients) != 0) return C_ERR;
if (skiplistLength(q->sl)) return C_ERR;
if (q->flags & QUEUE_FLAG_PAUSED_ALL) return C_ERR;
destroyQueue(q->name);
Expand Down Expand Up @@ -341,6 +347,50 @@ void unblockClientBlockedForJobs(client *c) {
dictEmpty(c->bpop.queues,NULL);
}

/* -------------------------- Blocking on global qlen ---------------------------- */

/* Handle blocking if GLOBALQLEN didn't have fresh information
*
* 1) We set q->qlenclients to the list of clients blocking for this queue.
* 2) We set client->bpop.queues as well, as a dictionary of queues a client
* is blocked for. So we can resolve queues from clients.
* 3) When we get myqlen from all the nodes, we unblock the clients
* waiting for the length. */
void blockForQLen(client *c, queue *q, mstime_t timeout) {
c->bpop.timeout = timeout;
c->bpop.flags = 0;
dictAdd(c->bpop.queues, q->name, NULL);
incrRefCount( q->name );
if (q->qlenclients == NULL) q->qlenclients = listCreate();
listAddNodeTail(q->qlenclients, c);
blockClient(c, BLOCKED_GLOBAL_QLEN);
}

/* Unblock client waiting for globalqlen in queue.
* Never call this directly, call unblockClient() instead. */
void unblockClientBlockedForQLen(client *c) {
dictEntry *de;
dictIterator *di;

di = dictGetIterator(c->bpop.queues);
while((de = dictNext(di)) != NULL) {
robj *qname = dictGetKey(de);
queue *q = lookupQueue(qname);
serverAssert(q != NULL);

addReplyLongLong(c,q->globalqlen);

listDelNode(q->qlenclients,listSearchKey(q->qlenclients,c));
if (listLength(q->qlenclients) == 0) {
listRelease(q->qlenclients);
q->qlenclients = NULL;
GCQueue(q,QUEUE_MAX_IDLE_TIME);
}
}
dictReleaseIterator(di);
dictEmpty(c->bpop.queues,NULL);
}

/* Add the specified queue to server.ready_queues if there is at least
* one client blocked for this queue. Otherwise no operation is performed. */
void signalQueueAsReady(queue *q) {
Expand Down Expand Up @@ -1229,3 +1279,66 @@ void pauseCommand(client *c) {
reply = sdscatlen(reply,"\r\n",2);
addReplySds(c,reply);
}

/* GLOBALQLEN <queuename>
*
* Returns the size of the queue across the full cluster, by broadcasting
* a request for the queue size, and waiting for the reply
*
* */
#define GLOBALQLEN_MAX_AGE 1000
#define GLOBALQLEN_CLIENT_TIMEOUT 500
void globalqlenCommand(client *c) {
queue *q = lookupQueue(c->argv[1]);

/* Create the queue if it does not exist. We need the queue structure
* to store meta-data needed to broadcast the QLEN request and keep
* the replies */
if (!q) q = createQueue(c->argv[1]);

mstime_t msnow = mstime();

if (q->last_globalqlen_time<(msnow-GLOBALQLEN_MAX_AGE)) {
q->last_globalqlen_time = msnow;
q->globalqlen = queueLength(q);
q->globalqlen_nodes = 1;
if (server.cluster->size > 1)
clusterSendGetQLen(q->name, server.cluster->nodes);
}

if (q->last_globalqlen_time>(msnow-GLOBALQLEN_MAX_AGE)
&& (int)q->globalqlen_nodes == server.cluster->size ) {
addReplyLongLong(c,q->globalqlen);
return;
}

blockForQLen(c, q, msnow + GLOBALQLEN_MAX_AGE);
}

void myQLenForQueue(queue *q, uint32_t qlen) {
if (!q) return;

q->globalqlen = q->globalqlen + qlen;
q->globalqlen_nodes++;

if (!q->qlenclients)
return;

if ((int)q->globalqlen_nodes == server.cluster->size) {
int numclients = listLength(q->qlenclients);
while(numclients--) {
listNode *ln = listFirst(q->qlenclients);
client *c = ln->value;
/* This will remove it from q->qlenclients.
* and send the qlen to the client */
unblockClient(c);
}
}
}

void myQLenForQueueName(robj *qname, uint32_t qlen) {
queue *q = lookupQueue(qname);
if (!q) return; /* no queue, no need to keep this */

myQLenForQueue(q, qlen);
}
6 changes: 6 additions & 0 deletions src/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ typedef struct queue {
queued or when a new client fetches elements or
blocks for elements to arrive. */
list *clients; /* Clients blocked here. */
list *qlenclients;/* Clients waiting for globalqlen. */

/* === Federation related fields === */
mstime_t needjobs_bcast_time; /* Last NEEDJOBS cluster broadcast. */
Expand Down Expand Up @@ -76,6 +77,10 @@ typedef struct queue {
uint32_t needjobs_bcast_attempt; /* Num of tries without new nodes. */
uint32_t needjobs_adhoc_attempt; /* Num of tries without new jobs. */
uint64_t jobs_in, jobs_out; /* Num of jobs enqueued and dequeued. */
mstime_t last_globalqlen_time; /* time of last globalqlen request. */
uint32_t globalqlen; /* best guess of the globalqlen */
uint32_t globalqlen_nodes; /* nodes who reported myqlen */

uint32_t flags; /* Queue flags. QUEUE_FLAG_* macros. */
uint32_t padding; /* Not used. Makes alignment obvious. */
} queue;
Expand Down Expand Up @@ -104,5 +109,6 @@ void receiveYourJobs(struct clusterNode *node, uint32_t numjobs, unsigned char *
void receiveNeedJobs(struct clusterNode *node, robj *qname, uint32_t count);
void queueChangePausedState(queue *q, int flag, int set);
void receivePauseQueue(robj *qname, uint32_t flags);
void myQLenForQueueName(robj *qname, uint32_t qlen);

#endif
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ struct serverCommand serverCommandTable[] = {

/* Queues */
{"qlen",qlenCommand,2,"rF",0,NULL,0,0,0,0,0},
{"globalqlen",globalqlenCommand,2,"rF",0,NULL,0,0,0,0,0},
{"qpeek",qpeekCommand,3,"r",0,NULL,0,0,0,0,0},
{"qstat",qstatCommand,2,"rF",0,NULL,0,0,0,0,0},
{"qscan",qscanCommand,-1,"r",0,NULL,0,0,0,0,0},
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */
#define BLOCKED_JOB_REPL 1 /* Wait job synchronous replication. */
#define BLOCKED_GETJOB 2 /* Wait for new jobs in a set of queues. */
#define BLOCKED_GLOBAL_QLEN 3 /* Wait for all the nodes to send MYQLEN */

/* Client request types */
#define PROTO_REQ_INLINE 1
Expand Down Expand Up @@ -854,6 +855,7 @@ void commandCommand(client *c);
void latencyCommand(client *c);
void addjobCommand(client *c);
void qlenCommand(client *c);
void globalqlenCommand(client *c);
void getjobCommand(client *c);
void showCommand(client *c);
void ackjobCommand(client *c);
Expand Down
Loading