Migrating from BullMQ
This guide covers migrating a Node.js application from BullMQ to glide-mq. It documents every API surface difference, gaps with workarounds, and what you gain by switching.
Table of Contents
- Multi-tenant job queue pattern
- Why migrate
- Prerequisites
- Install
- Connection config
- Quick start
- API mapping table
- Breaking differences
- Gaps and workarounds
- What glide-mq adds
- Testing without a server
- Workflow helpers
- NestJS
- Migration checklist
Multi-tenant job queue pattern
A common BullMQ Pro pattern: per-client job isolation, concurrency caps per client, retries with accumulated data, and worker-level rate limiting. This section walks through a full migration of that setup.
Job grouping and per-client isolation
BullMQ Pro groups jobs by group.id so jobs for the same client do not run in parallel:
// BullMQ Pro
await queue.add('process', { clientId, payload }, {
group: { id: `client-${clientId}` },
});glide-mq provides opts.ordering.key for the same isolation guarantee - jobs sharing a key run sequentially, one at a time, regardless of worker concurrency:
// glide-mq
await queue.add('process', { clientId, payload }, {
ordering: { key: `client-${clientId}` },
});What you get: jobs for client-123 always run one at a time, in enqueue order. Jobs for different clients run in parallel. No changes needed to the worker.
Group-level concurrency limit (max N per client)
BullMQ Pro lets you cap concurrent jobs per group:
// BullMQ Pro - max 2 concurrent jobs per client
await queue.add('process', { clientId, payload }, {
group: { id: `client-${clientId}`, limit: { max: 2, duration: 0 } },
});glide-mq supports this natively via ordering.concurrency:
// glide-mq - max 2 concurrent jobs per client (open source, no Pro license)
await queue.add('process', { clientId, payload }, {
ordering: { key: `client-${clientId}`, concurrency: 2 },
});Jobs exceeding the limit are automatically parked in a per-group wait list and released when a slot opens. No thundering herd - exactly one waiting job is promoted per slot freed.
For strict serialization (1 at a time), omit concurrency or set it to 1:
// glide-mq - one job at a time per client
await queue.add('process', { clientId, payload }, {
ordering: { key: `client-${clientId}` },
});Exponential backoff
Identical API - no changes needed:
// BullMQ
await queue.add('process', { clientId, payload }, {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
});// glide-mq - identical
await queue.add('process', { clientId, payload }, {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
});job.attemptsMade is also identical - same property name, same semantics.
Accumulating partial results across retries with job.updateData()
job.updateData() works the same way. Use it to accumulate results across retry attempts:
// BullMQ
const worker = new Worker('process', async (job) => {
const results = job.data.partialResults ?? [];
const chunk = await processNextChunk(job.data.clientId, job.attemptsMade);
results.push(chunk);
await job.updateData({ ...job.data, partialResults: results });
if (results.length < job.data.totalChunks) {
throw new Error('more chunks needed'); // triggers retry
}
return results;
});// glide-mq - identical, no changes
const worker = new Worker('process', async (job) => {
const results = job.data.partialResults ?? [];
const chunk = await processNextChunk(job.data.clientId, job.attemptsMade);
results.push(chunk);
await job.updateData({ ...job.data, partialResults: results });
if (results.length < job.data.totalChunks) {
throw new Error('more chunks needed');
}
return results;
}, { connection });For custom attempt tracking in job data:
// BullMQ
await job.updateData({ ...job.data, customAttemptsMade: job.data.customAttemptsMade + 1 });// glide-mq - identical
await job.updateData({ ...job.data, customAttemptsMade: (job.data.customAttemptsMade ?? 0) + 1 });Worker-level rate limiting
Identical API - no changes needed:
// BullMQ - max 2 jobs per 100ms across this worker
const worker = new Worker('process', processor, {
connection,
limiter: { max: 2, duration: 100 },
});// glide-mq - identical
const worker = new Worker('process', processor, {
connection: { addresses: [{ host: 'localhost', port: 6379 }] },
limiter: { max: 2, duration: 100 },
});Dynamic rate limiting from inside the processor also works the same way:
// Both BullMQ and glide-mq
const worker = new Worker('process', async (job) => {
const retryAfter = await checkUpstreamRateLimit(job.data.clientId);
if (retryAfter > 0) {
await worker.rateLimit(retryAfter);
throw new Worker.RateLimitError(); // re-queues the job, not counted as failure
}
return process(job.data);
}, { connection, limiter: { max: 100, duration: 1000 } });Full before/after for the described setup
// BullMQ Pro - original setup
import { Queue, Worker } from 'bullmq';
const connection = { host: 'localhost', port: 6379 };
const queue = new Queue('jobs', { connection });
// Enqueue with per-client isolation, max 2 concurrent
await queue.add('task', { clientId: 'acme', payload: {} }, {
group: { id: 'acme', limit: { max: 2, duration: 0 } },
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
});
const worker = new Worker('jobs', async (job) => {
const results = job.data.partialResults ?? [];
results.push(await doWork(job.data));
await job.updateData({ ...job.data, partialResults: results });
return results;
}, {
connection,
concurrency: 20,
limiter: { max: 2, duration: 100 },
});// glide-mq - migrated (direct equivalent, open source)
import { Queue, Worker } from 'glide-mq';
const connection = { addresses: [{ host: 'localhost', port: 6379 }] };
const queue = new Queue('jobs', { connection });
// ordering.key + concurrency: 2 gives exact BullMQ Pro group behavior
await queue.add('task', { clientId: 'acme', payload: {} }, {
ordering: { key: 'acme', concurrency: 2 },
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
});
const worker = new Worker('jobs', async (job) => {
const results = job.data.partialResults ?? [];
results.push(await doWork(job.data));
await job.updateData({ ...job.data, partialResults: results });
return results;
}, {
connection,
concurrency: 20,
limiter: { max: 2, duration: 100 },
});This is a direct equivalent. No behavioral differences.
| BullMQ | glide-mq | |
|---|---|---|
| Redis client | ioredis (JS) | valkey-glide (Rust NAPI) |
| RTT per job | 2-3 (fetch + ack + complete) | 1 (completeAndFetchNext FCALL) |
| Server-side logic | 53 EVAL Lua scripts | 1 FUNCTION LOAD library |
| Cluster support | Works, not hash-tagged by default | Built-in, all keys glide:{name}:* |
| AZ-affinity routing | No | Yes - pin reads to your AZ |
| IAM auth (ElastiCache/MemoryDB) | No | Yes |
| Compression | No | gzip transparent compression |
| Per-key ordering + group rate limit | No (BullMQ Pro groups only) | Yes, opts.ordering.key with concurrency, rateLimit, and tokenBucket |
| In-memory test mode | No | Yes, TestQueue / TestWorker |
glide-mq is a strict superset of BullMQ's core job queue semantics. At-least-once delivery, consumer groups, stall detection, retries, DLQ, flows, and schedulers all work the same way. The differences are in API shape and some missing conveniences listed in Gaps and workarounds.
Prerequisites
- Node.js 20+
- Valkey 7.0+ or Redis 7.0+ (both are supported)
- TypeScript 5+ recommended
Install
npm remove bullmq
npm install glide-mqUpdate your import paths:
// Before
import { Queue, Worker, Job, QueueEvents, FlowProducer } from 'bullmq';
// After
import { Queue, Worker, Job, QueueEvents, FlowProducer } from 'glide-mq';Connection config
This is the most common source of errors when migrating. BullMQ uses ioredis's flat connection format; glide-mq uses valkey-glide's addresses array.
// BullMQ
const connection = { host: 'localhost', port: 6379 };
const queue = new Queue('tasks', { connection });
const worker = new Worker('tasks', processor, { connection });// glide-mq
const connection = { addresses: [{ host: 'localhost', port: 6379 }] };
const queue = new Queue('tasks', { connection });
const worker = new Worker('tasks', processor, { connection });For TLS, password, and cluster mode:
// BullMQ
const connection = {
host: 'my-cluster.cache.amazonaws.com',
port: 6379,
tls: {},
password: 'secret',
};// glide-mq
const connection = {
addresses: [{ host: 'my-cluster.cache.amazonaws.com', port: 6379 }],
useTLS: true,
credentials: { password: 'secret' },
clusterMode: true, // set to true for Redis Cluster / ElastiCache cluster
};For IAM auth on AWS ElastiCache / MemoryDB (no equivalent in BullMQ):
// glide-mq only
const connection = {
addresses: [{ host: 'my-cluster.cache.amazonaws.com', port: 6379 }],
useTLS: true,
clusterMode: true,
credentials: {
type: 'iam',
serviceType: 'elasticache',
region: 'us-east-1',
userId: 'my-iam-user',
clusterName: 'my-cluster',
},
};Quick start
// BullMQ
import { Queue, Worker } from 'bullmq';
const connection = { host: 'localhost', port: 6379 };
const queue = new Queue('tasks', { connection });
await queue.add('send-email', { to: 'user@example.com' });
const worker = new Worker('tasks', async (job) => {
await sendEmail(job.data.to);
}, { connection, concurrency: 10 });
worker.on('completed', (job) => console.log(job.id, 'done'));
worker.on('failed', (job, err) => console.error(job.id, err.message));// glide-mq
import { Queue, Worker } from 'glide-mq';
const connection = { addresses: [{ host: 'localhost', port: 6379 }] };
const queue = new Queue('tasks', { connection });
await queue.add('send-email', { to: 'user@example.com' });
const worker = new Worker('tasks', async (job) => {
await sendEmail(job.data.to);
}, { connection, concurrency: 10 });
worker.on('completed', (job) => console.log(job.id, 'done'));
worker.on('failed', (job, err) => console.error(job.id, err.message));The processor function signature is identical. The only change is the connection format.
API mapping table
Full = identical API. Changed = available with different shape. Gap = not yet implemented (see Gaps and workarounds).
Queue methods
| BullMQ | glide-mq | Status |
|---|---|---|
new Queue(name, { connection, prefix, defaultJobOptions }) | new Queue(name, { connection, prefix, deadLetterQueue, compression }) | Changed |
queue.add(name, data, opts) | queue.add(name, data, opts) | Full |
queue.addBulk(jobs) | queue.addBulk(jobs) | Full |
queue.pause() | queue.pause() | Full |
queue.resume() | queue.resume() | Full |
queue.isPaused() | queue.isPaused() | Full |
queue.obliterate({ force }) | queue.obliterate({ force }) | Full |
queue.getJob(id) | queue.getJob(id) | Full |
queue.getJobs(types, start, end, asc) | queue.getJobs(type, start, end) | Changed |
queue.getJobCounts(...types) | queue.getJobCounts() | Changed |
queue.getJobCountByTypes(...types) | queue.getJobCountByTypes() | Full |
queue.count() | queue.count() | Full |
queue.getMetrics(type, start, end) | queue.getMetrics(type) | Changed |
queue.getJobLogs(id, start, end) | queue.getJobLogs(id, start, end) | Full |
queue.setGlobalConcurrency(n) | queue.setGlobalConcurrency(n) | Full |
queue.upsertJobScheduler(id, opts, template) | queue.upsertJobScheduler(name, scheduleOpts, template) | Full |
queue.getJobScheduler(id) | queue.getJobScheduler(name) | Full |
queue.getJobSchedulers() | queue.getRepeatableJobs() | Changed |
queue.removeJobScheduler(id) | queue.removeJobScheduler(name) | Full |
queue.getWorkers() | queue.getWorkers() | Full |
queue.drain(delayed?) | queue.drain(delayed?) | Full |
queue.clean(grace, limit, type) | queue.clean(grace, limit, type) | Full |
queue.retryJobs(opts) | queue.retryJobs(opts?) | Full |
queue.close() | queue.close() | Full |
| BullMQ request-reply pattern | queue.addAndWait(name, data, { waitTimeout }) | glide-mq extension |
| - | queue.revoke(jobId) | glide-mq only |
| - | queue.getDeadLetterJobs(start, end) | glide-mq only |
| - | queue.searchJobs(opts) | glide-mq only |
| - | queue.setGlobalRateLimit({ max, duration }) | glide-mq only |
| - | queue.getGlobalRateLimit() | glide-mq only |
| - | queue.removeGlobalRateLimit() | glide-mq only |
Worker methods and options
| BullMQ | glide-mq | Status |
|---|---|---|
new Worker(name, processor, { connection, concurrency, limiter, stalledInterval, maxStalledCount, lockDuration, settings }) | new Worker(name, processor, { connection, concurrency, globalConcurrency, limiter, stalledInterval, maxStalledCount, lockDuration, backoffStrategies }) | Changed |
worker.pause(doNotWaitActive?) | worker.pause(force?) | Full |
worker.resume() | worker.resume() | Full |
worker.close(force?) | worker.close(force?) | Full |
worker.drain() | worker.drain() | Full |
worker.rateLimit(ms) | worker.rateLimit(ms) | Full |
worker.on('completed', (job, result)) | worker.on('completed', (job, result)) | Full |
worker.on('failed', (job, err)) | worker.on('failed', (job, err)) | Full |
worker.on('error', (err)) | worker.on('error', (err)) | Full |
worker.on('stalled', (jobId)) | worker.on('stalled', (jobId)) | Full |
worker.on('closing') | worker.on('closing') | Full |
worker.on('closed') | worker.on('closed') | Full |
worker.on('active', (job, prev)) | worker.on('active', (job, jobId)) | Changed |
worker.on('drained') | worker.on('drained') | Full |
Worker.RateLimitError | Worker.RateLimitError | Full |
| Sandboxed processor (file path string) | new Worker('q', './processor.js', { connection, sandbox: {} }) | Full |
Job methods
| BullMQ | glide-mq | Status |
|---|---|---|
job.id, job.name, job.data, job.opts | Same | Full |
job.attemptsMade, job.timestamp, job.processedOn, job.finishedOn | Same | Full |
job.returnvalue, job.failedReason, job.progress | Same | Full |
job.updateData(data) | job.updateData(data) | Full |
job.updateProgress(progress) | job.updateProgress(progress) | Full |
job.log(message) | job.log(message) | Full |
job.getState() | job.getState() | Full |
job.isCompleted(), job.isFailed(), job.isActive(), job.isWaiting(), job.isDelayed() | Same | Full |
job.waitUntilFinished(queueEvents, ttl) | job.waitUntilFinished(pollIntervalMs, timeoutMs) | Changed |
job.retry(state?) | job.retry() | Changed |
job.remove(opts?) | job.remove() | Full |
job.getChildrenValues() | job.getChildrenValues() | Full |
job.promote() | job.promote() | Full |
job.changeDelay(delay) | job.changeDelay(newDelay) | Full |
job.changePriority(opts) | job.changePriority(newPriority) | Full |
job.discard() | job.discard() | Full |
| BullMQ process-step pattern | job.moveToDelayed(timestampMs, nextStep?) | glide-mq extension |
job.toJSON() | - | Use job.data, job.opts, etc. directly |
| - | job.abortSignal | glide-mq only |
| - | job.isRevoked() | glide-mq only |
JobOptions
| BullMQ field | glide-mq field | Status |
|---|---|---|
delay | delay | Full |
priority | priority | Full |
attempts | attempts | Full |
backoff | backoff | Full |
timeout | timeout | Full |
removeOnComplete | removeOnComplete | Full |
removeOnFail | removeOnFail | Full |
deduplication | deduplication | Changed (see Deduplication) |
parent | parent | Full |
jobId (custom ID) | jobId | Full |
lifo | lifo | Same |
repeat | - | Gap - use queue.upsertJobScheduler() |
sizeLimit | - | 1 MB hard limit enforced internally (JSON string character length) |
| - | ordering.key | glide-mq only |
| - | ordering.concurrency | glide-mq only |
| - | ordering.rateLimit | glide-mq only |
| - | ordering.tokenBucket | glide-mq only |
| - | cost | glide-mq only |
QueueEvents events
| BullMQ event | glide-mq event | Status |
|---|---|---|
'added' | 'added' | Full |
'completed' | 'completed' | Full |
'failed' | 'failed' | Full |
'stalled' | 'stalled' | Full |
'progress' | 'progress' | Full |
'paused' | 'paused' | Full |
'resumed' | 'resumed' | Full |
'removed' | 'removed' | Full |
'retries-exhausted' | 'failed' | Changed - check job.attemptsMade >= job.opts.attempts |
'waiting' | - | Gap |
'active' | - | Gap |
'delayed' | - | Gap |
'drained' | - | Gap |
'cleaned' | - | Gap |
'deduplicated' | - | Gap |
'waiting-children' | - | Gap |
Breaking differences
Queue
defaultJobOptions removed - BullMQ accepts defaultJobOptions on QueueOptions to set per-queue defaults. glide-mq does not. Set options explicitly on each queue.add() call, or wrap queue.add in a helper:
// BullMQ
const queue = new Queue('tasks', {
connection,
defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
});// glide-mq - wrap add() with your defaults
const DEFAULTS = { attempts: 3, backoff: { type: 'exponential', delay: 1000 } } as const;
const add = (name: string, data: unknown, opts?: JobOptions) =>
queue.add(name, data, { ...DEFAULTS, ...opts });queue.getJobs() takes a single type - BullMQ accepts an array of types; glide-mq takes one type at a time:
// BullMQ
const jobs = await queue.getJobs(['waiting', 'active'], 0, 99);// glide-mq
const [waiting, active] = await Promise.all([
queue.getJobs('waiting', 0, 99),
queue.getJobs('active', 0, 99),
]);
const jobs = [...waiting, ...active];queue.getJobCounts() returns all states - BullMQ accepts a variadic list of state names; glide-mq always returns all five (waiting, active, delayed, completed, failed).
// BullMQ
const { waiting, active } = await queue.getJobCounts('waiting', 'active');// glide-mq
const { waiting, active } = await queue.getJobCounts(); // always returns allWorker
settings.backoffStrategy renamed - BullMQ v3+ uses a single settings.backoffStrategy function. glide-mq uses backoffStrategies as a named-strategy map:
// BullMQ
const worker = new Worker('q', processor, {
connection,
settings: {
backoffStrategy: (attemptsMade, type, delay, err) => {
if (type === 'jitter') return delay + Math.random() * delay;
return delay * attemptsMade;
},
},
});// glide-mq
const worker = new Worker('q', processor, {
connection,
backoffStrategies: {
jitter: (attemptsMade, err) => 1000 + Math.random() * 1000,
linear: (attemptsMade, err) => 1000 * attemptsMade,
},
});
// Reference by name in job options:
await queue.add('job', data, { backoff: { type: 'jitter', delay: 1000 } });globalConcurrency on WorkerOptions - BullMQ sets global concurrency via queue.setGlobalConcurrency(n) or WorkerOptions.concurrency combined with queue-level limits. glide-mq exposes both:
// glide-mq
const worker = new Worker('q', processor, {
connection,
concurrency: 10, // per-worker concurrency
globalConcurrency: 50, // queue-wide cap across all workers (set once, stored in Valkey)
});
// Or set it on the queue separately:
await queue.setGlobalConcurrency(50);Sandboxed processors - Both BullMQ and glide-mq support passing a file path string as the processor. The processor runs in a worker thread (default) or child process, isolating crashes from the main process.
// BullMQ
const worker = new Worker('q', './processor.js', { connection, useWorkerThreads: true });// glide-mq
const worker = new Worker('q', './processor.js', { connection, sandbox: { useWorkerThreads: true } });The processor file must export a function, either via CommonJS (module.exports = async (job) => { ... }) or as an ESM default export (export default async (job) => { ... }). Inside the sandbox, job.log(), job.updateProgress(), and job.updateData() work normally via IPC proxy. Methods that require direct Valkey access (job.getState(), job.remove(), etc.) are not available.
worker.on('active') and worker.on('drained') are fully supported. The active event fires with (job, jobId) when a job starts processing - note that BullMQ passes (job, prev) where prev is the previous job state, while glide-mq passes the job ID as the second argument (jobId is equivalent to job.id). If your BullMQ code depends on prev, you will need to derive state via job.getState(). The drained event fires when the queue transitions from non-empty to empty.
Job
job.waitUntilFinished() takes different arguments - BullMQ requires a QueueEvents instance. glide-mq polls the job hash directly:
// BullMQ
const qe = new QueueEvents('tasks', { connection });
const state = await job.waitUntilFinished(qe, 30000);// glide-mq - no QueueEvents needed
const state = await job.waitUntilFinished(500, 30000);
// args: pollIntervalMs (default 500), timeoutMs (default 30000)Custom jobId - glide-mq supports custom job IDs, matching BullMQ's opts.jobId. Max 256 characters, must not contain control characters, curly braces ({, }), or colons (:). Adding a job with a duplicate custom ID returns null (silent skip) from Queue.add; FlowProducer.add throws on duplicates since flows cannot be partially created:
// BullMQ
await queue.add('job', data, { jobId: 'my-deterministic-id' });
// glide-mq - direct equivalent
const job = await queue.add('job', data, { jobId: 'my-deterministic-id' });
// job is null if a job with that ID already existsLIFO support - Last-in-first-out ordering is supported via lifo: true option. Jobs are processed in reverse-chronological order (newest first). Priority and delayed jobs take precedence over LIFO. Cannot be combined with ordering keys (per-key sequencing).
job.promote() is now implemented - call job.promote() to move a delayed job to waiting immediately. Throws if the job is not in the delayed state.
job.discard() is now implemented - call job.discard() inside a processor to immediately fail the job without consuming any remaining retry attempts. Alternatively, throw UnrecoverableError to trigger the same behavior declaratively. Both are exported from glide-mq.
QueueEvents
The QueueEvents class API is compatible, but the available events differ. glide-mq emits fewer events from the stream.
'retries-exhausted' renamed to 'failed' - BullMQ emits a separate retries-exhausted event when all attempts are used up. In glide-mq, a job exhausting all retries emits 'failed'. Check attemptsMade vs attempts to detect exhaustion:
// BullMQ
qe.on('retries-exhausted', ({ jobId, attemptsMade }) => {
console.log(`Job ${jobId} gave up after ${attemptsMade} attempts`);
});// glide-mq
qe.on('failed', async ({ jobId }) => {
const job = await queue.getJob(jobId);
if (job && job.attemptsMade >= (job.opts.attempts ?? 1)) {
console.log(`Job ${jobId} gave up after ${job.attemptsMade} attempts`);
}
});job.waitUntilFinished() does not need QueueEvents - see Job section above.
lastEventId: '0' for historical replay - Same as BullMQ:
// Replay all events from the beginning
const qe = new QueueEvents('tasks', {
connection,
lastEventId: '0',
});FlowProducer
The FlowProducer API is compatible. FlowJob type shape differs slightly:
// BullMQ FlowJob
interface FlowJob {
name: string;
queueName: string;
data?: any;
opts?: Omit<JobsOptions, 'repeat'>;
children?: FlowChildJob[];
}
// glide-mq FlowJob
interface FlowJob {
name: string;
queueName: string;
data: any; // required in glide-mq (not optional)
opts?: JobOptions;
children?: FlowJob[]; // same type for parent and children
}The behavior is the same: parent job waits in waiting-children state until all children complete. If a child fails without retries, the parent is permanently stuck (same as BullMQ's default behavior - use failParentOnFailure in BullMQ; glide-mq does not yet expose this option).
// Both BullMQ and glide-mq - same usage
const flow = await flowProducer.add({
name: 'parent',
queueName: 'tasks',
data: { step: 'final' },
children: [
{ name: 'child-1', queueName: 'tasks', data: { step: '1' } },
{ name: 'child-2', queueName: 'tasks', data: { step: '2' } },
],
});glide-mq also provides higher-level workflow helpers - see Workflow helpers.
Scheduling / repeatable jobs
BullMQ v5 introduced Job Schedulers (upsertJobScheduler) to replace opts.repeat. glide-mq uses the same upsertJobScheduler API.
Do not use opts.repeat in queue.add() - pass schedule options to upsertJobScheduler instead:
// BullMQ (old repeat API - deprecated in v5)
await queue.add('daily-report', data, {
repeat: { pattern: '0 9 * * *', tz: 'America/New_York' },
});// glide-mq (and BullMQ v5+) - upsertJobScheduler
await queue.upsertJobScheduler(
'daily-report', // scheduler name
{ pattern: '0 9 * * *', tz: 'America/New_York' }, // direct replacement for repeat
{ name: 'daily-report', data: { v: 1 } }, // job template
);Retries and backoff
JobOptions.backoff is compatible:
// Both BullMQ and glide-mq
await queue.add('job', data, {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
});glide-mq adds a jitter field to spread retries under load:
// glide-mq only
await queue.add('job', data, {
attempts: 5,
backoff: { type: 'exponential', delay: 1000, jitter: 0.25 }, // ±25% random jitter
});Custom backoff strategies moved from settings.backoffStrategy to backoffStrategies map - see Worker section.
Deduplication
Both libraries support deduplication. The option shape differs:
// BullMQ
await queue.add('job', data, {
deduplication: { id: 'my-dedup-key' },
// or with TTL:
deduplication: { id: 'my-dedup-key', ttl: 60000 },
});// glide-mq - same shape, plus explicit mode
await queue.add('job', data, {
deduplication: { id: 'my-dedup-key', ttl: 60000 },
// optional mode:
deduplication: { id: 'my-dedup-key', ttl: 60000, mode: 'simple' },
// mode: 'throttle' - drop duplicates
// mode: 'debounce' - reset window on each add
});Default mode is 'simple' (drop duplicate if a job with that ID already exists in any active state).
Rate limiting
Both libraries support the limiter option and Worker.RateLimitError. The usage is identical:
// Both BullMQ and glide-mq
const worker = new Worker('q', async (job) => {
if (shouldThrottle()) {
throw new Worker.RateLimitError(); // re-queues job, does not count as failure
}
return process(job);
}, {
connection,
limiter: { max: 100, duration: 1000 }, // 100 jobs per second
});Global rate limiting - glide-mq supports a queue-wide rate limit stored in Valkey, dynamically picked up by all workers:
// glide-mq only - global rate limit across all workers
await queue.setGlobalRateLimit({ max: 500, duration: 60_000 });
const limit = await queue.getGlobalRateLimit(); // { max, duration } or null
await queue.removeGlobalRateLimit();Priorities
Priority values work the same way. Lower number = higher priority (0 is default, highest priority):
// Both BullMQ and glide-mq - identical
await queue.add('urgent', data, { priority: 1 });
await queue.add('normal', data, { priority: 10 });
await queue.add('background', data, { priority: 100 });Job ordering / per-key sequencing
BullMQ Pro offers group-level concurrency (max N parallel jobs per group key). glide-mq provides per-key sequential ordering (concurrency = 1 per key) as a built-in feature:
// BullMQ Pro only
await queue.add('job', data, {
group: { id: 'tenant-123', limit: 1 }, // requires BullMQ Pro license
});// glide-mq (open source)
await queue.add('job', data, {
ordering: { key: 'tenant-123' }, // jobs for this key run one at a time, in order
});Group concurrency > 1 is also supported via ordering.concurrency.
Dead letter queues
BullMQ does not have a native DLQ - failed jobs stay in the failed state. glide-mq has first-class DLQ support configured at the queue level:
// glide-mq only
const queue = new Queue('tasks', {
connection,
deadLetterQueue: {
name: 'tasks-dlq', // separate queue for permanently failed jobs
maxRetries: 3, // override job's own attempts setting
},
});Job revocation
BullMQ has no equivalent. glide-mq supports revoking a job from outside the worker:
// glide-mq only
await queue.revoke(jobId); // signals abort to the processor via job.abortSignalGlobal concurrency
Both libraries support queue-level global concurrency to cap total parallel jobs across all workers:
// BullMQ
await queue.setGlobalConcurrency(50);// glide-mq - same, plus a WorkerOptions shorthand
await queue.setGlobalConcurrency(50);
// Or set it once at worker startup:
const worker = new Worker('q', processor, {
connection,
globalConcurrency: 50,
});Compression
BullMQ has no built-in compression. glide-mq supports transparent gzip compression of job payloads:
// glide-mq only
const queue = new Queue('tasks', {
connection,
compression: 'gzip', // compress all job payloads on write, decompress on read
});
// No change needed in worker or job code - transparent.Resolved gaps
These BullMQ features have been implemented in glide-mq.
| Feature | glide-mq API | Issue |
|---|---|---|
job.promote() | job.promote() - move delayed job to waiting | #11 |
job.changeDelay(delay) | job.changeDelay(newDelay) | #12 |
job.changePriority(opts) | job.changePriority(newPriority) | #13 |
job.discard() | job.discard() or throw UnrecoverableError | #14 |
queue.drain(delayed?) | queue.drain() or queue.drain(true) | #15 |
queue.clean(grace, limit, type) | Same signature | #16 |
queue.retryJobs(opts) | queue.retryJobs({ count: 100 }) | #17 |
queue.getWorkers() | Same signature | #18 |
queue.getJobScheduler(name) | Same signature | #19 |
worker.on('active') | Emits (job, jobId) - note: BullMQ passes (job, prev) | #20 |
worker.on('drained') | Same signature | #20 |
| Sandboxed processor | new Worker('q', './processor.js', { connection, sandbox: {} }) | - |
Current gaps
These BullMQ features are not yet implemented.
| Missing feature | Workaround |
|---|---|
QueueEvents 'waiting', 'active', 'delayed', 'drained', 'deduplicated' events | Use worker-level events or poll getJobCounts() |
failParentOnFailure in FlowJob | Implement manually in the worker's failed handler |
What glide-mq adds
Beyond BullMQ parity, glide-mq provides:
1 RTT per job - completeAndFetchNext is a single FCALL that atomically marks the current job complete and fetches the next one. BullMQ uses 2-3 round-trips for the same operation.
Cluster-native from day one - All keys use glide:{queueName}:* hash tags. Cross-slot operations (flows, global concurrency, ordering) work correctly in Valkey Cluster without any configuration.
AZ-affinity routing - Pin worker reads to replicas in your availability zone to reduce cross-AZ network cost and latency.
IAM authentication - Native AWS ElastiCache and MemoryDB IAM auth with automatic token refresh.
Transparent compression - compression: 'gzip' on QueueOptions compresses all payloads server-side transparently.
Built-in DLQ - deadLetterQueue on QueueOptions routes permanently failed jobs to a named queue without any application-level code.
Job revocation - queue.revoke(jobId) and job.abortSignal allow in-flight jobs to be cancelled cooperatively.
Per-key ordering - opts.ordering.key guarantees sequential execution per key across any number of workers without a separate lock system. Group concurrency and per-group rate limiting are also supported.
Cost-based token bucket - opts.ordering.tokenBucket with per-job opts.cost enables weighted rate limiting per ordering key.
Global rate limiting - queue.setGlobalRateLimit() caps queue-wide throughput across all workers.
Broadcast / BroadcastWorker - Pub/sub fan-out pattern where every connected BroadcastWorker receives every message.
DAG workflows - FlowProducer.addDAG() and the dag() helper define arbitrary directed acyclic graphs where a job can depend on multiple parents.
Batch processing - batch: { size, timeout } worker option to receive multiple jobs in a single processor invocation.
Step jobs - job.moveToDelayed(timestampMs, nextStep?) accepts an optional nextStep token so a processor can implement multi-step state machines.
repeatAfterComplete - Scheduler mode where the next job is enqueued only after the previous one completes, guaranteeing no overlap.
Pluggable serializers - Pass a custom { serialize, deserialize } object to use MessagePack, Protobuf, or any format instead of JSON.
Job TTL - opts.ttl auto-expires a job after the given number of milliseconds.
excludeData - queue.getJobs(type, start, end, { excludeData: true }) returns jobs without their data field, useful for lightweight dashboard listings.
Testing without a server
glide-mq ships an in-memory test mode that requires no Valkey or Redis instance:
import { TestQueue, TestWorker } from 'glide-mq/testing';
const queue = new TestQueue<{ email: string }, { sent: boolean }>('tasks');
const worker = new TestWorker(queue, async (job) => {
return { sent: true };
});
await queue.add('send-email', { email: 'user@example.com' });
const jobs = await queue.getJobs('completed');
// [{ data: { email: 'user@example.com' }, returnvalue: { sent: true } }]BullMQ has no equivalent. You would typically use a real Redis instance or ioredis-mock.
Workflow helpers
Beyond FlowProducer, glide-mq provides higher-level helpers:
import { chain, group, chord } from 'glide-mq';
const connection = { addresses: [{ host: 'localhost', port: 6379 }] };
// chain: sequential pipeline - each job is a child of the next
await chain('tasks', [
{ name: 'step-1', data: {} },
{ name: 'step-2', data: {} },
{ name: 'step-3', data: {} },
], connection);
// group: parallel fan-out - all jobs run concurrently, synthetic parent waits
await group('tasks', [
{ name: 'shard-1', data: {} },
{ name: 'shard-2', data: {} },
], connection);
// chord: group then callback - callback fires after all group jobs complete
await chord('tasks', [
{ name: 'task-1', data: {} },
{ name: 'task-2', data: {} },
], { name: 'aggregate', data: {} }, connection);NestJS
@glidemq/nestjs provides a drop-in NestJS integration. If you are migrating from @nestjs/bullmq, see the @glidemq/nestjs package README for the decorator-based API (@Processor, @OnWorkerEvent, InjectQueue).
You can also use glide-mq directly with NestJS by creating queue and worker instances in providers:
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { Queue, Worker } from 'glide-mq';
const connection = { addresses: [{ host: 'localhost', port: 6379 }] };
@Injectable()
export class TaskQueue implements OnModuleInit, OnModuleDestroy {
private queue = new Queue('tasks', { connection });
private worker = new Worker('tasks', async (job) => {
// process job
}, { connection, concurrency: 10 });
async add(data: unknown) {
return this.queue.add('task', data);
}
async onModuleDestroy() {
await this.worker.close();
await this.queue.close();
}
}Migration checklist
Work through this after completing your migration:
- [ ] Replace
bullmqwithglide-mqinpackage.json - [ ] Update all imports from
'bullmq'to'glide-mq' - [ ] Convert all connection configs from
{ host, port }to{ addresses: [{ host, port }] } - [ ] Replace
opts.repeatwithqueue.upsertJobScheduler() - [ ]
opts.jobIdworks directly - duplicate custom IDs returnnullinstead of the existing job - [ ] Replace
settings.backoffStrategywithbackoffStrategiesmap inWorkerOptions - [ ] Remove
QueueSchedulerinstantiation (not needed) - [ ] Remove
defaultJobOptionsfromQueueOptions; apply options per job or via a wrapper - [ ] Replace
queue.getJobs([...types])with per-type calls - [ ] Update any
worker.on('active')handlers: glide-mq emits(job, jobId)instead of BullMQ's(job, prev) - [ ] Replace
job.waitUntilFinished(queueEvents, ttl)withjob.waitUntilFinished(pollMs, timeoutMs) - [ ] Check
QueueEventslisteners for removed events ('waiting','active','delayed','drained') - [ ] Run your test suite:
npm test - [ ] Confirm queue counts look correct:
await queue.getJobCounts() - [ ] Confirm no jobs are stuck in
activestate: stall detection is running