From 5be01a54d86d9d02c84eb55c488ad5486d5522a6 Mon Sep 17 00:00:00 2001 From: theMage Date: Fri, 11 Mar 2016 17:41:44 +0100 Subject: [PATCH 1/7] add support to request the global size of a queue themage --- src/blocked.c | 4 ++ src/cluster.c | 74 ++++++++++++++++++++++++++++++++++ src/cluster.h | 8 +++- src/queue.c | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/queue.h | 5 +++ src/server.c | 1 + src/server.h | 2 + 7 files changed, 201 insertions(+), 1 deletion(-) diff --git a/src/blocked.c b/src/blocked.c index 01811fd..856292a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -138,6 +138,8 @@ void unblockClient(client *c) { unblockClientWaitingJobRepl(c); } else if (c->btype == BLOCKED_GETJOB) { unblockClientBlockedForJobs(c); + } else if (c->btype == BLOCKED_GLOBAL_QLEN) { + unblockClientBlockedForQLen(c); } else { serverPanic("Unknown btype in unblockClient()."); } @@ -164,6 +166,8 @@ void replyToBlockedClientTimedOut(client *c) { return; } else if (c->btype == BLOCKED_GETJOB) { addReply(c,shared.nullmultibulk); + } else if (c->btype == BLOCKED_GLOBAL_QLEN) { + /* Do nothing - unblock client will send the best value we got */ } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } diff --git a/src/cluster.c b/src/cluster.c index ad7e0a0..34f2b74 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -69,6 +69,8 @@ void clusterSendGotJob(clusterNode *node, job *j); void clusterSendGotAck(clusterNode *node, char *jobid, int known); void clusterBroadcastQueued(job *j, unsigned char flags); void clusterBroadcastDelJob(job *j); +void clusterSendGetQLen(robj *qname, dict *nodes); +void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen); sds representClusterNodeFlags(sds ci, uint16_t flags); /* ----------------------------------------------------------------------------- @@ -1493,6 +1495,29 @@ int clusterProcessPacket(clusterLink *link) { else receivePauseQueue(qname,count); decrRefCount(qname); + } else if (type == CLUSTERMSG_TYPE_GETQLEN) { + if (!sender) return 1; + uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen); + robj *qname = createStringObject(hdr->data.queueop.about.qname, + qnamelen); + + serverLog(LL_VERBOSE,"RECEIVED GETQLEN FOR QUEUE %s", + (char*)qname->ptr); + + clusterSendMyQLen(sender, qname, queueNameLength(qname)); + + } else if (type == CLUSTERMSG_TYPE_MYQLEN) { + if (!sender) return 1; + uint32_t qnamelen = ntohl(hdr->data.queueop.about.qnamelen); + robj *qname = createStringObject(hdr->data.queueop.about.qname, + qnamelen); + uint32_t myqlen = ntohl(hdr->data.queueop.about.aux); + + serverLog(LL_VERBOSE,"RECEIVED MYQLEN FOR QUEUE %s FROM %s: %d", + (char*)qname->ptr, sender->name, myqlen); + + myQLenForQueueName(qname, myqlen); + } else { serverLog(LL_WARNING,"Received unknown packet type: %d", type); } @@ -2092,6 +2117,55 @@ void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count) { if (payload != buf) zfree(payload); } +/* broadcasts a requst for the qlen to the whole cluster. + * Used by GLOBALQLEN. It replies with the loca queue. */ +void clusterSendGetQLen(robj *qname, dict *nodes) { + uint32_t totlen, qnamelen = sdslen(qname->ptr); + uint32_t alloclen; + clusterMsg *hdr; + + serverLog(LL_VERBOSE, "Sending GETQLEN for %s, %d nodes", + (char *)qname->ptr, (int)dictSize(nodes)); + + totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen; + alloclen = totlen; + if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg); + hdr = zmalloc(alloclen); + + clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_GETQLEN); + hdr->data.queueop.about.qnamelen = htonl(qnamelen); + memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen); + hdr->totlen = htonl(totlen); + clusterBroadcastMessage(nodes,hdr,totlen); + zfree(hdr); +} + +/* send a reply to a GETQLEN request */ +void clusterSendMyQLen(clusterNode *node, robj *qname, uint32_t qlen) { + uint32_t totlen, qnamelen = sdslen(qname->ptr); + uint32_t alloclen; + + if (!node->link) return; + + serverLog(LL_VERBOSE, "Sending QLEN for %s of %d items to %s", + (char *)qname->ptr, (int)qlen, (char *)node->name); + + totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + totlen += sizeof(clusterMsgDataQueueOp) - 8 + qnamelen; + alloclen = totlen; + if (alloclen < (int)sizeof(clusterMsg)) alloclen = sizeof(clusterMsg); + unsigned char buf[alloclen]; + clusterMsg *hdr = (clusterMsg*) buf; + + clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MYQLEN); + hdr->data.queueop.about.aux = htonl(qlen); + hdr->data.queueop.about.qnamelen = htonl(qnamelen); + memcpy(hdr->data.queueop.about.qname, qname->ptr, qnamelen); + hdr->totlen = htonl(totlen); + clusterSendMessage(node->link,buf,totlen); +} + /* ----------------------------------------------------------------------------- * CLUSTER cron job * -------------------------------------------------------------------------- */ diff --git a/src/cluster.h b/src/cluster.h index fc0c9f3..9b02a9c 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -113,6 +113,8 @@ typedef struct clusterState { #define CLUSTERMSG_TYPE_YOURJOBS 13 /* NEEDJOBS reply with jobs. */ #define CLUSTERMSG_TYPE_WORKING 14 /* Postpone re-queueing & dequeue */ #define CLUSTERMSG_TYPE_PAUSE 15 /* Change queue paused state. */ +#define CLUSTERMSG_TYPE_GETQLEN 16 /* request the node qlen */ +#define CLUSTERMSG_TYPE_MYQLEN 17 /* reply to node qlen request */ /* Initially we don't know our "name", but we'll find it once we connect * to the first node, using the getsockname() function. Then we'll use this @@ -161,7 +163,10 @@ typedef struct { * the PAUSE command to specify the queue to change the paused state. */ typedef struct { uint32_t aux; /* For NEEDJOB, how many jobs we request. - * FOR PAUSE, the pause flags to set on the queue. */ + * FOR PAUSE, the pause flags to set on the queue. + * For MYQLEN, the numer of items in the queue + */ + uint32_t qnamelen; /* Queue name total length. */ char qname[8]; /* Defined as 8 bytes just for alignment. */ } clusterMsgDataQueueOp; @@ -241,5 +246,6 @@ void clusterSendNeedJobs(robj *qname, int numjobs, dict *nodes); void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count); void clusterBroadcastJobIDMessage(dict *nodes, char *id, int type, uint32_t aux, unsigned char flags); void clusterBroadcastPause(robj *qname, uint32_t flags); +void clusterSendGetQLen(robj *qname, dict *nodes); #endif /* __CLUSTER_H */ diff --git a/src/queue.c b/src/queue.c index 81c9f4c..d2955d3 100644 --- a/src/queue.c +++ b/src/queue.c @@ -251,6 +251,7 @@ int GCQueue(queue *q, time_t max_idle_time) { time_t idle = server.unixtime - q->atime; if (idle < max_idle_time) return C_ERR; if (q->clients && listLength(q->clients) != 0) return C_ERR; + if (q->qlenclients && listLength(q->qlenclients) != 0) return C_ERR; if (skiplistLength(q->sl)) return C_ERR; if (q->flags & QUEUE_FLAG_PAUSED_ALL) return C_ERR; destroyQueue(q->name); @@ -341,6 +342,50 @@ void unblockClientBlockedForJobs(client *c) { dictEmpty(c->bpop.queues,NULL); } +/* -------------------------- Blocking on global qlen ---------------------------- */ + +/* Handle blocking if GLOBALQLEN didn't have fresh information + * + * 1) We set q->qlenclients to the list of clients blocking for this queue. + * 2) We set client->bpop.queues as well, as a dictionary of queues a client + * is blocked for. So we can resolve queues from clients. + * 3) When we get myqlen from all the nodes, we unblock the clients + * waiting for the length. */ +void blockForQLen(client *c, queue *q, mstime_t timeout) { + c->bpop.timeout = timeout; + c->bpop.flags = 0; + dictAdd(c->bpop.queues, q->name, NULL); + incrRefCount( q->name ); + if (q->qlenclients == NULL) q->qlenclients = listCreate(); + listAddNodeTail(q->qlenclients, c); + blockClient(c, BLOCKED_GLOBAL_QLEN); +} + +/* Unblock client waiting for globalqlen in queue. + * Never call this directly, call unblockClient() instead. */ +void unblockClientBlockedForQLen(client *c) { + dictEntry *de; + dictIterator *di; + + di = dictGetIterator(c->bpop.queues); + while((de = dictNext(di)) != NULL) { + robj *qname = dictGetKey(de); + queue *q = lookupQueue(qname); + serverAssert(q != NULL); + + addReplyLongLong(c,q->globalqlen); + + listDelNode(q->qlenclients,listSearchKey(q->qlenclients,c)); + if (listLength(q->qlenclients) == 0) { + listRelease(q->qlenclients); + q->qlenclients = NULL; + GCQueue(q,QUEUE_MAX_IDLE_TIME); + } + } + dictReleaseIterator(di); + dictEmpty(c->bpop.queues,NULL); +} + /* Add the specified queue to server.ready_queues if there is at least * one client blocked for this queue. Otherwise no operation is performed. */ void signalQueueAsReady(queue *q) { @@ -1229,3 +1274,66 @@ void pauseCommand(client *c) { reply = sdscatlen(reply,"\r\n",2); addReplySds(c,reply); } + +/* GLOBALQLEN + * + * Returns the size of the queue across the full cluster, by broadcasting + * a request for the queue size, and waiting for the reply + * + * */ +#define GLOBALQLEN_MAX_AGE 1000 +#define GLOBALQLEN_CLIENT_TIMEOUT 500 +void globalqlenCommand(client *c) { + queue *q = lookupQueue(c->argv[1]); + + /* Create the queue if it does not exist. We need the queue structure + * to store meta-data needed to broadcast the QLEN request and keep + * the replies */ + if (!q) q = createQueue(c->argv[1]); + + mstime_t msnow = mstime(); + + if (q->last_globalqlen_time<(msnow-GLOBALQLEN_MAX_AGE)) { + q->last_globalqlen_time = msnow; + q->globalqlen = queueLength(q); + q->globalqlen_nodes = 1; + if (server.cluster->size > 1) + clusterSendGetQLen(q->name, server.cluster->nodes); + } + + if (q->last_globalqlen_time>(msnow-GLOBALQLEN_MAX_AGE) + && q->globalqlen_nodes == server.cluster->size ) { + addReplyLongLong(c,q->globalqlen); + return; + } + + blockForQLen(c, q, msnow + GLOBALQLEN_MAX_AGE); +} + +void myQLenForQueue(queue *q, uint32_t qlen) { + if (!q) return; + + q->globalqlen = q->globalqlen + qlen; + q->globalqlen_nodes++; + + if (!q->qlenclients) + return; + + if (q->globalqlen_nodes == server.cluster->size) { + int numclients = listLength(q->qlenclients); + while(numclients--) { + listNode *ln = listFirst(q->qlenclients); + client *c = ln->value; + /* This will remove it from q->qlenclients. + * and send the qlen to the client */ + unblockClient(c); + } + } +} + +void myQLenForQueueName(robj *qname, uint32_t qlen) { + queue *q = lookupQueue(qname); + if (!q) return; /* no queue, no need to keep this */ + + myQLenForQueue(q, qlen); +} diff --git a/src/queue.h b/src/queue.h index aa22d42..183b656 100644 --- a/src/queue.h +++ b/src/queue.h @@ -45,6 +45,7 @@ typedef struct queue { queued or when a new client fetches elements or blocks for elements to arrive. */ list *clients; /* Clients blocked here. */ + list *qlenclients;/* Clients waiting for globalqlen. */ /* === Federation related fields === */ mstime_t needjobs_bcast_time; /* Last NEEDJOBS cluster broadcast. */ @@ -76,6 +77,10 @@ typedef struct queue { uint32_t needjobs_bcast_attempt; /* Num of tries without new nodes. */ uint32_t needjobs_adhoc_attempt; /* Num of tries without new jobs. */ uint64_t jobs_in, jobs_out; /* Num of jobs enqueued and dequeued. */ + mstime_t last_globalqlen_time; /* time of last globalqlen request. */ + uint32_t globalqlen; /* best guess of the globalqlen */ + uint32_t globalqlen_nodes; /* nodes who reported myqlen */ + uint32_t flags; /* Queue flags. QUEUE_FLAG_* macros. */ uint32_t padding; /* Not used. Makes alignment obvious. */ } queue; diff --git a/src/server.c b/src/server.c index fbdf563..0c825cd 100644 --- a/src/server.c +++ b/src/server.c @@ -137,6 +137,7 @@ struct serverCommand serverCommandTable[] = { /* Queues */ {"qlen",qlenCommand,2,"rF",0,NULL,0,0,0,0,0}, + {"globalqlen",globalqlenCommand,2,"rF",0,NULL,0,0,0,0,0}, {"qpeek",qpeekCommand,3,"r",0,NULL,0,0,0,0,0}, {"qstat",qstatCommand,2,"rF",0,NULL,0,0,0,0,0}, {"qscan",qscanCommand,-1,"r",0,NULL,0,0,0,0,0}, diff --git a/src/server.h b/src/server.h index 0a292c2..98f512c 100644 --- a/src/server.h +++ b/src/server.h @@ -201,6 +201,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */ #define BLOCKED_JOB_REPL 1 /* Wait job synchronous replication. */ #define BLOCKED_GETJOB 2 /* Wait for new jobs in a set of queues. */ +#define BLOCKED_GLOBAL_QLEN 3 /* Wait for all the nodes to send MYQLEN */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -854,6 +855,7 @@ void commandCommand(client *c); void latencyCommand(client *c); void addjobCommand(client *c); void qlenCommand(client *c); +void globalqlenCommand(client *c); void getjobCommand(client *c); void showCommand(client *c); void ackjobCommand(client *c); From b9bcf9906648625247e6237d5c15f9e405f274bd Mon Sep 17 00:00:00 2001 From: theMage Date: Mon, 14 Mar 2016 09:59:05 +0100 Subject: [PATCH 2/7] properly initialize queue->qlenclients, warnings-- themage --- src/cluster.c | 1 + src/queue.c | 5 +++-- src/queue.h | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 34f2b74..efebf36 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -31,6 +31,7 @@ #include "server.h" #include "cluster.h" #include "endianconv.h" +#include "queue.h" #include "ack.h" #include diff --git a/src/queue.c b/src/queue.c index d2955d3..99bda6d 100644 --- a/src/queue.c +++ b/src/queue.c @@ -74,6 +74,7 @@ queue *createQueue(robj *name) { q->needjobs_adhoc_attempt = 0; q->needjobs_responders = NULL; /* Created on demand to save memory. */ q->clients = NULL; /* Created on demand to save memory. */ + q->qlenclients = NULL; /* Created on demand to save memory. */ q->current_import_jobs_time = server.mstime; q->current_import_jobs_count = 0; @@ -1302,7 +1303,7 @@ void globalqlenCommand(client *c) { } if (q->last_globalqlen_time>(msnow-GLOBALQLEN_MAX_AGE) - && q->globalqlen_nodes == server.cluster->size ) { + && (int)q->globalqlen_nodes == server.cluster->size ) { addReplyLongLong(c,q->globalqlen); return; } @@ -1319,7 +1320,7 @@ void myQLenForQueue(queue *q, uint32_t qlen) { if (!q->qlenclients) return; - if (q->globalqlen_nodes == server.cluster->size) { + if ((int)q->globalqlen_nodes == server.cluster->size) { int numclients = listLength(q->qlenclients); while(numclients--) { listNode *ln = listFirst(q->qlenclients); diff --git a/src/queue.h b/src/queue.h index 183b656..ec1a891 100644 --- a/src/queue.h +++ b/src/queue.h @@ -109,5 +109,6 @@ void receiveYourJobs(struct clusterNode *node, uint32_t numjobs, unsigned char * void receiveNeedJobs(struct clusterNode *node, robj *qname, uint32_t count); void queueChangePausedState(queue *q, int flag, int set); void receivePauseQueue(robj *qname, uint32_t flags); +void myQLenForQueueName(robj *qname, uint32_t qlen); #endif From 76c736ad50982cb78ce565587f46502c41d02af4 Mon Sep 17 00:00:00 2001 From: theMage Date: Mon, 14 Mar 2016 12:08:28 +0100 Subject: [PATCH 3/7] typos on commments themage --- src/cluster.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index efebf36..6b86ee8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2118,8 +2118,8 @@ void clusterSendYourJobs(clusterNode *node, job **jobs, uint32_t count) { if (payload != buf) zfree(payload); } -/* broadcasts a requst for the qlen to the whole cluster. - * Used by GLOBALQLEN. It replies with the loca queue. */ +/* broadcasts a request for the qlen to the whole cluster. + * Used by GLOBALQLEN. It replies with the local queue size. */ void clusterSendGetQLen(robj *qname, dict *nodes) { uint32_t totlen, qnamelen = sdslen(qname->ptr); uint32_t alloclen; From a86890967c2a934a0899f2f047698ee0d2783d2f Mon Sep 17 00:00:00 2001 From: theMage Date: Mon, 14 Mar 2016 12:09:43 +0100 Subject: [PATCH 4/7] tests for globalqlen themage --- tests/cluster/tests/13-globallqlen.tcl | 38 ++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 tests/cluster/tests/13-globallqlen.tcl diff --git a/tests/cluster/tests/13-globallqlen.tcl b/tests/cluster/tests/13-globallqlen.tcl new file mode 100644 index 0000000..22fbe61 --- /dev/null +++ b/tests/cluster/tests/13-globallqlen.tcl @@ -0,0 +1,38 @@ +source "../tests/includes/init-tests.tcl" +source "../tests/includes/job-utils.tcl" + +test "Queue jobs into random nodes" { + for {set j 1} {$j <= 10} {incr j} { + set target_id [randomInt $::instances_count] + set body "somejob$j" + set id [D $target_id addjob qlenqueue $body 5000 replicate 3 retry 60] + assert {$id ne {}} + } +} + +test "check globalqlen on target node" { + set gqlen [D 0 globalqlen qlenqueue] + assert {$gqlen == 10} +} + +test "Get jobs from queue" { + for {set j 1} {$j <= 10} {incr j} { + set target_id [randomInt $::instances_count] + set myjob [lindex [D $target_id getjob from qlenqueue] 0] + assert {[lindex $myjob 0] eq "qlenqueue"} + assert {[lindex $myjob 1] ne {}} + assert {[lindex $myjob 2] ne {}} + + set res [D $target_id ackjob [lindex $myjob 1]] + } +} + +test "check globalqlen is now empty" { + wait_for_condition { + [D 1 globalqlen qlenqueue] == 0 + } else { + fail "globalqlen doesn't get empty" + } +} + + From 7ffbe7cdfa5c062f48f60a32174e72053a927bdf Mon Sep 17 00:00:00 2001 From: theMage Date: Mon, 14 Mar 2016 12:25:46 +0100 Subject: [PATCH 5/7] help and documentation for globalqlen themage --- README.md | 13 ++++++++++++- src/help.h | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 95f24fe..0605e30 100644 --- a/README.md +++ b/README.md @@ -463,7 +463,18 @@ Disque node. #### `QLEN ` -Return the length of the queue. +Return the length of the queue in the local node. + +#### `GLOBALQLEN ` + +Return the length of the queue across the cluster. + +This is calculated by asking all the nodes their qlen and adding +all the values together. The values is cached for a second. + +The first client to run this command in a node/queue will be blocked +waiting for the answer. If some node is busy and takes more than .5s +to reply its items may not be counted. #### `QSTAT ` diff --git a/src/help.h b/src/help.h index 58b32c4..a57a806 100644 --- a/src/help.h +++ b/src/help.h @@ -46,6 +46,7 @@ struct commandHelp { { "WORKING", "", "Attempt to postpone next delivery of the specified job", 4, "1.0.0" }, { "NACK", " [ ...]", "Negative acknowledge: increment the NACK counter for the job and ask for a next delivery ASAP", 4, "1.0.0" }, { "QLEN", "", "Return number of queued jobs in the specified queue in the local node", 3, "1.0.0" }, + { "GLOBALQLEN", "", "Return number of queued jobs in the specified queue across the cluster - it's cached for a minute", 3, "1.0.0" }, { "QSTAT", "", "Return local node queue statistics", 3, "1.0.0" }, { "QPEEK", " ", "Inspect jobs into a queue without actually fetching them", 3, "1.0.0" }, { "ENQUEUE", " [ ...]", "Force local node to put the specified jobs back into the queue", 3, "1.0.0" }, From 4afd26a090855aefe4c4e3bedcda57f09731eef3 Mon Sep 17 00:00:00 2001 From: theMage Date: Mon, 14 Mar 2016 13:03:29 +0100 Subject: [PATCH 6/7] globalqlen is cached for a second, not a minute themage --- src/help.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/help.h b/src/help.h index a57a806..660f2d7 100644 --- a/src/help.h +++ b/src/help.h @@ -46,7 +46,7 @@ struct commandHelp { { "WORKING", "", "Attempt to postpone next delivery of the specified job", 4, "1.0.0" }, { "NACK", " [ ...]", "Negative acknowledge: increment the NACK counter for the job and ask for a next delivery ASAP", 4, "1.0.0" }, { "QLEN", "", "Return number of queued jobs in the specified queue in the local node", 3, "1.0.0" }, - { "GLOBALQLEN", "", "Return number of queued jobs in the specified queue across the cluster - it's cached for a minute", 3, "1.0.0" }, + { "GLOBALQLEN", "", "Return number of queued jobs in the specified queue across the cluster - it's cached for a second", 3, "1.0.0" }, { "QSTAT", "", "Return local node queue statistics", 3, "1.0.0" }, { "QPEEK", " ", "Inspect jobs into a queue without actually fetching them", 3, "1.0.0" }, { "ENQUEUE", " [ ...]", "Force local node to put the specified jobs back into the queue", 3, "1.0.0" }, From 428edd9b1ae545dd5cfc34b4c8a7ca7085a5e24c Mon Sep 17 00:00:00 2001 From: theMage Date: Wed, 16 Mar 2016 20:46:30 +0100 Subject: [PATCH 7/7] initialize globalqlen vars themage --- src/queue.c | 4 ++++ tests/cluster/tests/13-globallqlen.tcl | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/src/queue.c b/src/queue.c index 99bda6d..def1ba8 100644 --- a/src/queue.c +++ b/src/queue.c @@ -75,6 +75,10 @@ queue *createQueue(robj *name) { q->needjobs_responders = NULL; /* Created on demand to save memory. */ q->clients = NULL; /* Created on demand to save memory. */ q->qlenclients = NULL; /* Created on demand to save memory. */ + + q->globalqlen = 0; + q->last_globalqlen_time = 0; + q->globalqlen_nodes = 0; q->current_import_jobs_time = server.mstime; q->current_import_jobs_count = 0; diff --git a/tests/cluster/tests/13-globallqlen.tcl b/tests/cluster/tests/13-globallqlen.tcl index 22fbe61..87d140b 100644 --- a/tests/cluster/tests/13-globallqlen.tcl +++ b/tests/cluster/tests/13-globallqlen.tcl @@ -1,6 +1,11 @@ source "../tests/includes/init-tests.tcl" source "../tests/includes/job-utils.tcl" +test "Globalqlen before anything else" { + set gqlen [D 2 globalqlen qlenqueue] + assert { $gqlen == 0 } +} + test "Queue jobs into random nodes" { for {set j 1} {$j <= 10} {incr j} { set target_id [randomInt $::instances_count]