Skip to content

A fast, reliable Redis-backed per-group FIFO queue for Node + TypeScript with guaranteed job ordering and parallel processing across groups.

License

Notifications You must be signed in to change notification settings

Openpanel-dev/groupmq

Repository files navigation

GroupMQ, Redis Group Queue

A fast, reliable Redis-backed per-group FIFO queue for Node + TypeScript with guaranteed job ordering and parallel processing across groups.

Website · Created by OpenPanel.dev



Install

npm i groupmq ioredis

Quick start

import Redis from "ioredis";
import { Queue, Worker } from "groupmq";

const redis = new Redis("redis://127.0.0.1:6379");

const queue = new Queue({
  redis,
  namespace: "orders", // Will be prefixed with 'groupmq:'
  jobTimeoutMs: 30_000, // How long before job times out
  logger: true, // Enable logging (optional)
});

await queue.add({
  groupId: "user:42",
  data: { type: "charge", amount: 999 },
  orderMs: Date.now(), // or event.createdAtMs
  maxAttempts: 5,
});

const worker = new Worker({
  queue,
  concurrency: 1, // Process 1 job at a time (can increase for parallel processing)
  handler: async (job) => {
    console.log(`Processing:`, job.data);
  },
});

worker.run();

Key Features

Key Features

  • Per-group FIFO ordering - Jobs within the same group process in strict order, perfect for user workflows, data pipelines, and sequential operations
  • Parallel processing across groups - Process multiple groups simultaneously while maintaining order within each group
  • BullMQ-compatible API - Familiar interface with enhanced group-based capabilities
  • High performance - High throughput with low latency (see benchmarks)
  • Built-in ordering strategies - Handle out-of-order job arrivals with 'none', 'scheduler', or 'in-memory' methods
  • Automatic recovery - Stalled job detection and connection error handling with exponential backoff
  • Production ready - Atomic operations, graceful shutdown, and comprehensive logging
  • Zero polling - Efficient blocking operations prevent wasteful Redis calls

Inspiration from BullMQ

GroupMQ is heavily inspired by BullMQ, one of the most popular Redis-based job queue libraries for Node.js. We've taken many core concepts and design patterns from BullMQ while adapting them for our specific use case of per-group FIFO processing.

Key differences from BullMQ:

  • Per-group FIFO ordering, jobs within the same group are processed in strict order
  • Group-based concurrency, only one job per group can be active at a time
  • Ordered processing, built-in support for orderMs timestamp-based ordering
  • Cross-group parallelism, multiple groups can be processed simultaneously
  • No job types, simplified to a single job, instead use union typed data { type: 'paint', data: { ... } } | { type: 'repair', data: { ... } }

We're grateful to the BullMQ team for their excellent work and the foundation they've provided for the Redis job queue ecosystem.

Queue Options

type QueueOptions = {
  redis: Redis;                    // Redis client instance (required)
  namespace: string;                // Unique queue name, gets 'groupmq:' prefix (required)
  logger?: boolean | LoggerInterface; // Enable logging (default: false)
  jobTimeoutMs?: number;            // Job processing timeout (default: 30000ms)
  maxAttempts?: number;             // Default max retry attempts (default: 3)
  reserveScanLimit?: number;        // Groups to scan when reserving (default: 20)
  keepCompleted?: number;           // Number of completed jobs to retain (default: 0)
  keepFailed?: number;              // Number of failed jobs to retain (default: 0)
  schedulerLockTtlMs?: number;      // Scheduler lock TTL (default: 1500ms)
  orderingMethod?: OrderingMethod;  // Ordering strategy (default: 'none')
  orderingWindowMs?: number;        // Time window for ordering (required for non-'none' methods)
  orderingMaxWaitMultiplier?: number; // Max grace period multiplier for in-memory (default: 3)
  orderingGracePeriodDecay?: number;  // Grace period decay factor for in-memory (default: 1.0)
  orderingMaxBatchSize?: number;      // Max jobs to collect in batch for in-memory (default: 10)
};

type OrderingMethod = 'none' | 'scheduler' | 'in-memory';

Ordering Methods:

  • 'none' - No ordering guarantees (fastest, zero overhead, no extra latency)
  • 'scheduler' - Redis buffering for large windows (≥1000ms, requires scheduler, adds latency)
  • 'in-memory' - Worker collection for small windows (50-500ms, no scheduler, adds latency per batch)

See Ordering Methods for detailed comparison.

Worker Options

type WorkerOptions<T> = {
  queue: Queue<T>;                           // Queue instance to process jobs from (required)
  handler: (job: ReservedJob<T>) => Promise<unknown>; // Job processing function (required)
  name?: string;                             // Worker name for logging (default: queue.name)
  logger?: boolean | LoggerInterface;        // Enable logging (default: false)
  concurrency?: number;                      // Number of jobs to process in parallel (default: 1)
  heartbeatMs?: number;                      // Heartbeat interval (default: Math.max(1000, jobTimeoutMs/3))
  onError?: (err: unknown, job?: ReservedJob<T>) => void; // Error handler
  maxAttempts?: number;                      // Max retry attempts (default: queue.maxAttempts)
  backoff?: BackoffStrategy;                 // Retry backoff function (default: exponential with jitter)
  enableCleanup?: boolean;                   // Periodic cleanup (default: true)
  cleanupIntervalMs?: number;                // Cleanup frequency (default: 60000ms)
  schedulerIntervalMs?: number;              // Scheduler frequency (default: adaptive)
  blockingTimeoutSec?: number;               // Blocking reserve timeout (default: 5s)
  atomicCompletion?: boolean;                // Atomic completion + next reserve (default: true)
  stalledInterval?: number;                  // Check if stalled every N ms (default: 30000)
  maxStalledCount?: number;                  // Fail after N stalls (default: 1)
  stalledGracePeriod?: number;               // Grace period before considering stalled (default: 0)
};

type BackoffStrategy = (attempt: number) => number; // returns delay in ms

Job Options

When adding a job to the queue:

await queue.add({
  groupId: string;           // Required: Group ID for FIFO processing
  data: T;                   // Required: Job payload data
  orderMs?: number;          // Timestamp for ordering (default: Date.now())
  maxAttempts?: number;      // Max retry attempts (default: queue.maxAttempts)
  jobId?: string;            // Custom job ID (default: auto-generated UUID)
  delay?: number;            // Delay in ms before job becomes available
  runAt?: Date | number;     // Specific time to run the job
  repeat?: RepeatOptions;    // Repeating job configuration (cron or interval)
});

type RepeatOptions = 
  | { every: number }                    // Repeat every N milliseconds
  | { pattern: string };                 // Cron pattern (standard 5-field format)

Example with delay:

await queue.add({
  groupId: 'user:123',
  data: { action: 'send-reminder' },
  delay: 3600000, // Run in 1 hour
});

Example with specific time:

await queue.add({
  groupId: 'user:123',
  data: { action: 'scheduled-report' },
  runAt: new Date('2025-12-31T23:59:59Z'),
});

Worker Concurrency

Workers support configurable concurrency to process multiple jobs in parallel from different groups:

const worker = new Worker({
  queue,
  concurrency: 8, // Process up to 8 jobs simultaneously
  handler: async (job) => {
    // Jobs from different groups can run in parallel
    // Jobs from the same group still run sequentially
  },
});

Benefits:

  • Higher throughput for multi-group workloads
  • Efficient resource utilization
  • Still maintains per-group FIFO ordering

Considerations:

  • Each job consumes memory and resources
  • Set concurrency based on job duration and system resources
  • Monitor Redis connection pool (ioredis default: 10 connections)

Logging

Both Queue and Worker support optional logging for debugging and monitoring:

// Enable default logger
const queue = new Queue({
  redis,
  namespace: 'orders',
  logger: true, // Logs to console with queue name prefix
});

const worker = new Worker({
  queue,
  logger: true, // Logs to console with worker name prefix
  handler: async (job) => { /* ... */ },
});

Custom logger:

Works out of the box with both pino and winston

import type { LoggerInterface } from 'groupmq';

const customLogger: LoggerInterface = {
  debug: (msg: string, ...args: any[]) => { /* custom logging */ },
  info: (msg: string, ...args: any[]) => { /* custom logging */ },
  warn: (msg: string, ...args: any[]) => { /* custom logging */ },
  error: (msg: string, ...args: any[]) => { /* custom logging */ },
};

const queue = new Queue({
  redis,
  namespace: 'orders',
  logger: customLogger,
});

What gets logged:

  • Job reservation and completion
  • Error handling and retries
  • Scheduler runs and delayed job promotions
  • Group locking and unlocking
  • Redis connection events
  • Performance warnings

Repeatable jobs (cron/interval)

GroupMQ supports simple repeatable jobs using either a fixed interval (every) or a basic cron pattern (pattern). Repeats are materialized by a lightweight scheduler that runs as part of the worker's periodic cleanup cycle.

Add a repeating job (every N ms)

await queue.add({
  groupId: 'reports',
  data: { type: 'daily-summary' },
  repeat: { every: 5000 }, // run every 5 seconds
});

const worker = new Worker({
  queue,
  handler: async (job) => {
    // process...
  },
  // IMPORTANT: For timely repeats, run the scheduler frequently
  cleanupIntervalMs: 1000, // <= repeat.every (recommended 1–2s for 5s repeats)
});

worker.run();

Add a repeating job (cron pattern)

await queue.add({
  groupId: 'emails',
  data: { type: 'weekly-digest' },
  repeat: { pattern: '0 9 * * 1-5' }, // 09:00 Mon–Fri
});

Remove a repeating job

await queue.removeRepeatingJob('reports', { every: 5000 });
// or
await queue.removeRepeatingJob('emails', { pattern: '0 9 * * 1-5' });

Scheduler behavior and best practices

  • The worker's periodic cycle runs: cleanup(), promoteDelayedJobs(), and processRepeatingJobs().
  • Repeating jobs are enqueued during this cycle via a distributed scheduler with lock coordination.
  • Minimum practical repeat interval: ~1.5-2 seconds (controlled by schedulerLockTtlMs, default: 1500ms)
  • For sub-second repeats (not recommended in production):
    const queue = new Queue({
      redis,
      namespace: 'fast',
      schedulerLockTtlMs: 50, // Allow fast scheduler lock
    });
    
    const worker = new Worker({
      queue,
      schedulerIntervalMs: 10,   // Check every 10ms
      cleanupIntervalMs: 100,    // Cleanup every 100ms
      handler: async (job) => { /* ... */ },
    });
    ⚠️ Fast repeats (< 1s) increase Redis load and should be used sparingly.
  • The scheduler is idempotent: it updates the next run time before enqueueing to prevent double runs.
  • Each occurrence is a normal job with a fresh jobId, preserving per-group FIFO semantics.
  • You can monitor repeated runs via BullBoard using the provided adapter.

Graceful Shutdown

// Stop worker gracefully - waits for current job to finish
await worker.close(gracefulTimeoutMs);

// Wait for queue to be empty
const isEmpty = await queue.waitForEmpty(timeoutMs);

// Recover groups that might be stuck due to ordering delays
const recoveredCount = await queue.recoverDelayedGroups();

Additional Methods

Queue Methods

// Job counts and status
const counts = await queue.getJobCounts();
// { active: 5, waiting: 12, delayed: 3, total: 20, uniqueGroups: 8 }

const activeCount = await queue.getActiveCount();
const waitingCount = await queue.getWaitingCount();
const delayedCount = await queue.getDelayedCount();
const completedCount = await queue.getCompletedCount();
const failedCount = await queue.getFailedCount();

// Get job IDs by status
const activeJobIds = await queue.getActiveJobs();
const waitingJobIds = await queue.getWaitingJobs();
const delayedJobIds = await queue.getDelayedJobs();

// Get Job instances by status
const completedJobs = await queue.getCompletedJobs(limit); // returns Job[]
const failedJobs = await queue.getFailedJobs(limit);

// Group information
const groups = await queue.getUniqueGroups(); // ['user:123', 'order:456']
const groupCount = await queue.getUniqueGroupsCount();
const jobsInGroup = await queue.getGroupJobCount('user:123');

// Get specific job
const job = await queue.getJob(jobId); // returns Job instance

// Job manipulation
await queue.remove(jobId);
await queue.retry(jobId); // Re-enqueue a failed job
await queue.promote(jobId); // Promote delayed job to waiting
await queue.changeDelay(jobId, newDelayMs);
await queue.updateData(jobId, newData);

// Scheduler operations
await queue.runSchedulerOnce(); // Manual scheduler run
await queue.promoteDelayedJobs(); // Promote delayed jobs
await queue.recoverDelayedGroups(); // Recover stuck groups

// Cleanup and shutdown
await queue.waitForEmpty(timeoutMs);
await queue.close();

Job Instance Methods

Jobs returned from queue.getJob(), queue.getCompletedJobs(), etc. have these methods:

const job = await queue.getJob(jobId);

// Manipulate the job
await job.remove();
await job.retry();
await job.promote();
await job.changeDelay(newDelayMs);
await job.updateData(newData);
await job.update(newData); // Alias for updateData

// Get job state
const state = await job.getState(); // 'active' | 'waiting' | 'delayed' | 'completed' | 'failed'

// Serialize job
const json = job.toJSON();

Worker Methods

// Check worker status
const isProcessing = worker.isProcessing();

// Get current job(s) being processed
const currentJob = worker.getCurrentJob();
// { job: ReservedJob, processingTimeMs: 1500 } | null

// For concurrency > 1
const currentJobs = worker.getCurrentJobs();
// [{ job: ReservedJob, processingTimeMs: 1500 }, ...]

// Get worker metrics
const metrics = worker.getWorkerMetrics();
// { jobsInProgress: 2, lastJobPickupTime: 1234567890, ... }

// Graceful shutdown
await worker.close(gracefulTimeoutMs);

Worker Events

Workers emit events that you can listen to:

worker.on('ready', () => {
  console.log('Worker is ready');
});

worker.on('completed', (job: Job) => {
  console.log('Job completed:', job.id);
});

worker.on('failed', (job: Job) => {
  console.log('Job failed:', job.id, job.failedReason);
});

worker.on('error', (error: Error) => {
  console.error('Worker error:', error);
});

worker.on('closed', () => {
  console.log('Worker closed');
});

worker.on('graceful-timeout', (job: Job) => {
  console.log('Job exceeded graceful timeout:', job.id);
});

// Remove event listeners
worker.off('completed', handler);
worker.removeAllListeners();

BullBoard Integration

GroupMQ provides a BullBoard adapter for visual monitoring and management:

import { createBullBoard } from '@bull-board/api';
import { ExpressAdapter } from '@bull-board/express';
import { BullBoardGroupMQAdapter } from 'groupmq';
import express from 'express';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullBoardGroupMQAdapter(queue, {
      displayName: 'Order Processing',
      description: 'Processes customer orders',
      readOnlyMode: false, // Allow job manipulation through UI
    }),
  ],
  serverAdapter,
});

const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3000, () => {
  console.log('BullBoard running at http://localhost:3000/admin/queues');
});

Detailed Architecture

Redis Data Structures

GroupMQ uses these Redis keys (all prefixed with groupmq:{namespace}:):

  • :g:{groupId}, sorted set of job IDs in a group, ordered by score (derived from orderMs and seq)
  • :ready, sorted set of group IDs that have jobs available, ordered by lowest job score
  • :job:{jobId}, hash containing job data (id, groupId, data, attempts, status, etc.)
  • :lock:{groupId}, string with job ID that currently owns the group lock (with TTL)
  • :processing, sorted set of active job IDs, ordered by deadline
  • :processing:{jobId}, hash with processing metadata (groupId, deadlineAt)
  • :delayed, sorted set of delayed jobs, ordered by runAt timestamp
  • :completed, sorted set of completed job IDs (for retention)
  • :failed, sorted set of failed job IDs (for retention)
  • :repeats, hash of repeating job definitions (groupId → config)

Job Lifecycle States

  1. Waiting, job is in :g:{groupId} and group is in :ready
  2. Delayed, job is in :delayed (scheduled for future)
  3. Active, job is in :processing and group is locked
  4. Completed, job is in :completed (retention)
  5. Failed, job exceeded maxAttempts, moved to :failed (retention)

Worker Loop

The worker runs a continuous loop optimized for both single and concurrent processing:

For concurrency = 1 (sequential):

while (!stopping) {
  // 1. Blocking reserve (waits for job, efficient)
  const job = await queue.reserveBlocking(timeoutSec);
  
  // 2. Process job synchronously
  if (job) {
    await processOne(job);
  }
  
  // 3. Periodic scheduler run (every schedulerIntervalMs)
  await queue.runSchedulerOnce(); // Promotes delayed jobs, processes repeats
}

For concurrency > 1 (parallel):

while (!stopping) {
  // 1. Run lightweight scheduler periodically
  await queue.runSchedulerOnce();
  
  // 2. Try batch reservation if we have capacity
  const capacity = concurrency - jobsInProgress.size;
  if (capacity > 0) {
    const jobs = await queue.reserveBatch(capacity);
    // Process all jobs concurrently (fire and forget)
    for (const job of jobs) {
      void processOne(job);
    }
  }
  
  // 3. Blocking reserve for remaining capacity
  const job = await queue.reserveBlocking(blockingTimeoutSec);
  if (job) {
    void processOne(job); // Process async
  }
}

Key optimizations:

  • Batch reservation reduces Redis round-trips for concurrent workers
  • Blocking operations prevent wasteful polling
  • Heartbeat mechanism keeps jobs alive during long processing
  • Atomic completion + next reservation reduces latency

Atomic Operations (Lua Scripts)

All critical operations use Lua scripts for atomicity:

  • enqueue.lua, adds job to group queue, adds group to ready set
  • reserve.lua, finds ready group, pops head job, locks group
  • reserve-batch.lua, reserves one job from multiple groups atomically
  • complete.lua, marks job complete, unlocks group, re-adds group to ready if more jobs
  • complete-and-reserve-next.lua, atomic completion + reservation from same group
  • retry.lua, increments attempts, re-adds job to group with backoff delay
  • remove.lua, removes job from all data structures

Job Reservation Flow

When a worker reserves a job:

  1. Find Ready Group: ZRANGE :ready 0 0 gets lowest-score group
  2. Check Lock: PTTL :lock:{groupId} ensures group isn't locked
  3. Pop Job: ZPOPMIN :g:{groupId} 1 gets head job atomically
  4. Lock Group: SET :lock:{groupId} {jobId} PX {timeout}
  5. Mark Processing: Add to :processing sorted set with deadline
  6. Re-add Group: If more jobs exist, ZADD :ready {score} {groupId}

Job Completion Flow

When a job completes successfully:

  1. Remove from Processing: DEL :processing:{jobId}, ZREM :processing {jobId}
  2. Mark Completed: HSET :job:{jobId} status completed
  3. Add to Retention: ZADD :completed {now} {jobId}
  4. Unlock Group: DEL :lock:{groupId} (only if this job owns the lock)
  5. Check for More Jobs: ZCARD :g:{groupId}
  6. Re-add to Ready: If jobs remain, ZADD :ready {nextScore} {groupId}

The critical fix in step 6 ensures that after a job completes, the group becomes available again for other workers to pick up the next job in the queue.

Ordering and Scoring

Jobs are ordered using a composite score:

score = (orderMs - baseEpoch) * 1000 + seq
  • orderMs, user-provided timestamp for event ordering
  • baseEpoch, fixed epoch timestamp (1704067200000) to keep scores manageable
  • seq, auto-incrementing sequence for tiebreaking (resets daily to prevent overflow)

This ensures:

  • Jobs with earlier orderMs process first
  • Jobs with same orderMs process in submission order
  • Score is stable and sortable
  • Daily sequence reset prevents integer overflow

Concurrency Modes

concurrency = 1 (Sequential):

  • Worker processes one job at a time
  • Uses blocking reserve with synchronous processing
  • Simplest mode, lowest memory, lowest Redis overhead
  • Best for: CPU-intensive jobs, resource-constrained environments

concurrency > 1 (Parallel):

  • Worker attempts batch reservation first (lower latency)
  • Processes multiple jobs concurrently (from different groups only)
  • Each job runs in parallel with its own heartbeat
  • Falls back to blocking reserve when batch is empty
  • Higher throughput, efficient for I/O-bound workloads
  • Best for: Network calls, database operations, API requests

Important: Per-group FIFO ordering is maintained regardless of concurrency level. Multiple jobs from the same group never run in parallel.

Error Handling and Retries

When a job fails:

  1. Increment Attempts: HINCRBY :job:{jobId} attempts 1
  2. Check Max Attempts: If attempts >= maxAttempts, mark as failed
  3. Calculate Backoff: Use exponential backoff strategy
  4. Re-enqueue: Add job back to :g:{groupId} with delay
  5. Unlock Group: Release lock so next job can process

If a job times out (visibility timeout expires):

  • Heartbeat mechanism extends the lock: SET :lock:{groupId} {jobId} PX {timeout}
  • If heartbeat fails, job remains locked until TTL expires
  • Cleanup cycle detects expired locks and recovers jobs

Cleanup and Recovery

Periodic cleanup runs:

  1. Promote Delayed Jobs: Move jobs from :delayed to waiting when runAt arrives
  2. Process Repeats: Enqueue next occurrence of repeating jobs
  3. Recover Stale Locks: Find expired locks in :processing and unlock groups
  4. Recover Delayed Groups: Handle groups stuck due to ordering delays
  5. Trim Completed/Failed: Remove old completed and failed jobs per retention policy

Performance Characteristics

Latest Benchmarks (MacBook M2, 500 jobs, 4 workers, multi-process):

GroupMQ Performance

  • Throughput: 68-73 jobs/sec (500 jobs), 80-86 jobs/sec (5000 jobs)
  • Latency: P95 pickup ~5-5.5s, P95 processing ~45-50ms
  • Memory: ~120-145 MB per worker process
  • CPU: <1% average, <70% peak

vs BullMQ Comparison

GroupMQ maintains competitive performance while adding per-group FIFO ordering guarantees:

  • Similar throughput for group-based workloads
  • Better job ordering with guaranteed per-group FIFO processing
  • Atomic operations reduce race conditions and improve reliability

For detailed benchmark results and comparisons over time, see our Performance Benchmarks page.

Optimizations:

  • Batch Operations: reserveBatch reduces round-trips for concurrent workers
  • Blocking Operations: Efficient Redis BLPOP-style blocking prevents wasteful polling
  • Lua Scripts: All critical paths are atomic, avoiding race conditions
  • Atomic Completion: Complete job + reserve next in single operation
  • Minimal Data: Jobs store only essential fields, keeps memory low
  • Score-Based Ordering: O(log N) insertions and retrievals via sorted sets
  • Adaptive Behavior: Scheduler intervals adjust based on ordering configuration

Contributing

Contributions are welcome! When making changes:

  1. Run tests and benchmarks before and after your changes to verify everything works correctly
  2. Add tests for any new features

Testing

Requires a local Redis at 127.0.0.1:6379 (no auth).

npm i
npm run build
npm test

Optionally:

docker run --rm -p 6379:6379 redis:7