Advanced Features
Table of Contents
- Shared Client (Connection Reuse)
- Job Schedulers (Repeatable / Cron Jobs)
- LIFO Mode
- Job TTL
- Pluggable Serializers
- Ordering and Group Concurrency
- Custom Job IDs
- Deduplication
- Token Bucket Rate Limiting
- Global Concurrency
- Global Rate Limiting
- Job Revocation (Cooperative Cancellation)
- Transparent Compression
- Retries and Backoff
- Dead Letter Queues
Shared Client
By default, each glide-mq component creates its own GLIDE client (one TCP connection). You can optionally inject a shared client to reduce connection count.
Default behavior (dedicated connections)
const connection = { addresses: [{ host: 'localhost', port: 6379 }] };
const queue = new Queue('jobs', { connection }); // 1 connection
const flow = new FlowProducer({ connection }); // 1 connection
const worker = new Worker('jobs', handler, { connection });// 2 connections (command + blocking)
const events = new QueueEvents('jobs', { connection }); // 1 connection
// Total: 5 TCP connectionsShared client (opt-in)
import { GlideClient } from '@glidemq/speedkey';
const client = await GlideClient.createClient({ addresses: [{ host: 'localhost' }] });
const connection = { addresses: [{ host: 'localhost' }] };
const queue = new Queue('jobs', { client });
const flow = new FlowProducer({ client });
const worker = new Worker('jobs', handler, { connection, commandClient: client });
const events = new QueueEvents('jobs', { connection });
// Total: 2 TCP connections (shared + Worker's blocking client)What can share
Queue, FlowProducer, and Worker's command client all perform non-blocking operations (FCALL, HGET, ZADD, etc.) and can safely share a single GLIDE client. GLIDE's Rust core multiplexes commands over one TCP connection with up to 1000 concurrent in-flight requests.
What cannot share
Worker's blocking client (XREADGROUP BLOCK) and QueueEvents (XREAD BLOCK) tie up the connection's read loop. These always get their own dedicated connection - you cannot inject a shared client into them.
QueueEvents will throw if you try to pass a client:
// Throws: "QueueEvents does not accept an injected `client`"
new QueueEvents('jobs', { connection, client } as any);Tradeoffs
| Dedicated (default) | Shared | |
|---|---|---|
| Connections | N+2 per setup (1 per Queue/FlowProducer + 2 per Worker + 1 per QueueEvents) | 2 (shared + blocking) |
| Throughput | Baseline | Same or slightly better (fewer NAPI wake callbacks) |
| Latency | Baseline | Same (p50/p95/p99 identical in benchmarks) |
| Isolation | Each component has its own connection - failures are independent | All components sharing a client are affected by a disconnect |
| Reconnection | Each component reconnects independently | Worker emits error if shared client is unreachable - you manage reconnection |
| Lifecycle | Component creates and closes its own client | You create the client, you close it. close() on a component does not destroy the shared client. |
| Simplicity | Pass connection - done | Must create client upfront, pass it around, close in correct order |
| Memory | Slightly higher (N client objects + Rust state machines) | Lower (1 client object shared) |
When to use shared
- Many Queue instances pointing to different queue names (e.g., multi-tenant routing)
- Queue + FlowProducer on the same process - saves 1 connection
- Connection count is a concern (cloud Valkey with connection limits)
When to stick with dedicated
- Simple setup with one Queue and one Worker - the default is fine
- You want full isolation between components
- You don't want to manage client lifecycle manually
Constraints
- Worker always requires
connectioneven whencommandClientis provided, because the blocking client must be auto-created. - Don't close the shared client while components are alive. Close components first, then the client.
- Don't mutate shared client state externally (e.g.,
SELECTto change database). commandClientandclientare aliases on Worker - provide one or the other, not both.
Close order
// Correct: close components first, then shared client
await queue.close(); // detaches from shared client (does not close it)
await worker.close(); // closes only the auto-created blocking client
await flow.close(); // detaches from shared client
client.close(); // now safe - no components using itProducer with an external client
Producer also supports external client injection. When opts.client is provided the Producer borrows the connection without taking ownership - close() will not destroy it. This is the recommended pattern for serverless environments where the connection lifecycle must align with the request lifecycle:
import { GlideClient } from '@glidemq/speedkey';
import { Producer } from 'glide-mq';
export async function handler(event) {
const client = await GlideClient.createClient({ addresses: [{ host: process.env.VALKEY_HOST }] });
const producer = new Producer('jobs', { client });
for (const job of event.jobs) {
await producer.add(job.name, job.data);
}
// producer.close() does NOT close the client when client was injected
await producer.close();
client.close(); // caller owns lifecycle
}For connection reuse across warm invocations, use ServerlessPool instead - see Serverless.
inflightRequestsLimit
GLIDE defaults to 1000 concurrent in-flight requests per client. For high-concurrency setups, you can tune this:
const connection = {
addresses: [{ host: 'localhost' }],
inflightRequestsLimit: 2000,
};At Worker concurrency=50, peak inflight is ~55 commands. The 1000 default supports up to ~950 concurrent job activations across all components sharing one client.
Job Schedulers
Use upsertJobScheduler to define repeatable jobs driven by a cron expression or a fixed interval. Schedulers survive worker restarts - the next run time is stored in Valkey.
const queue = new Queue('tasks', { connection });
// Cron: run "daily-report" every day at 08:00 UTC
await queue.upsertJobScheduler(
'daily-report',
{ pattern: '0 8 * * *' },
{ name: 'generate-report', data: { type: 'daily' } },
);
// Bound a scheduler to a campaign window and stop after 36 runs
await queue.upsertJobScheduler(
'black-friday-deals',
{
pattern: '0 */2 * * *',
startDate: new Date('2026-11-28T00:00:00Z'),
endDate: new Date('2026-12-01T00:00:00Z'),
limit: 36,
},
{ name: 'promote-deal', data: { campaign: 'black-friday' } },
);
// Interval: run "cleanup" every 5 minutes
await queue.upsertJobScheduler(
'cleanup',
{ every: 5 * 60 * 1_000 }, // ms
{ name: 'cleanup-old-records', data: {} },
);
// List all registered schedulers
const schedulers = await queue.getRepeatableJobs();
// Remove a scheduler (does not cancel jobs already in flight)
await queue.removeJobScheduler('cleanup');Repeat-after-complete mode
repeatAfterComplete schedules the next job only after the current one completes (or terminally fails). Unlike every, which fires at fixed intervals regardless of processing time, repeatAfterComplete ensures no overlap between successive runs.
// Poll a sensor every 5 seconds after the previous poll finishes
await queue.upsertJobScheduler('sensor-poll', {
repeatAfterComplete: 5000, // 5s after previous job completes
}, { name: 'poll', data: { sensor: 'temp-1' } });This mode is useful for:
- Polling - avoid stacking requests when the upstream is slow.
- Sequential pipelines - each step must finish before the next begins.
- Adaptive intervals - combine with a custom processor that adjusts
repeatAfterCompleteviaupsertJobSchedulerbased on results.
repeatAfterComplete is mutually exclusive with pattern and every. Bounded options (startDate, endDate, limit) work normally with this mode.
Bounded schedulers
All three scheduler modes (pattern, every, repeatAfterComplete) support bounding via startDate, endDate, and limit:
| Option | Type | Effect |
|---|---|---|
startDate | Date | number | Defer the first eligible run until this time. |
endDate | Date | number | Auto-remove the scheduler when the next scheduled time would exceed this date. |
limit | number | Auto-remove the scheduler after creating this many jobs. |
// Run a cron job during a specific campaign window, max 36 runs
await queue.upsertJobScheduler(
'black-friday-deals',
{
pattern: '0 */2 * * *',
startDate: new Date('2026-11-28T00:00:00Z'),
endDate: new Date('2026-12-01T00:00:00Z'),
limit: 36,
},
{ name: 'promote-deal', data: { campaign: 'black-friday' } },
);
// Interval with a delayed start and a hard stop after 100 iterations
await queue.upsertJobScheduler(
'warmup-cache',
{
every: 30_000,
startDate: Date.now() + 60_000, // first run delayed 1 minute
endDate: new Date('2026-12-31'), // stop scheduling after this date
limit: 100, // auto-remove after 100 runs
},
{ name: 'warmup', data: { region: 'us-east' } },
);getJobScheduler() / getRepeatableJobs() expose the stored bounds together with iterationCount so you can inspect how many runs have already fired.
The internal Scheduler class fires a promotion loop that converts due scheduler entries into real jobs, then re-registers the next occurrence.
LIFO Mode
Set lifo: true in JobOptions to process jobs in last-in-first-out order. The most recently added job is picked up first.
await queue.add('render', { frame: 100 }, { lifo: true });
await queue.add('render', { frame: 101 }, { lifo: true });
await queue.add('render', { frame: 102 }, { lifo: true });
// Processing order: 102, 101, 100Ordering precedence
Workers check sources in this order: priority > LIFO > FIFO. Priority jobs (those with priority > 0) are always fetched first. Among non-priority jobs, LIFO jobs are fetched before FIFO jobs sitting in the stream.
Constraints
- Cannot combine with
ordering.key. Throws at enqueue time:Error: lifo and ordering.key cannot be used together - LIFO jobs are stored in a dedicated Valkey LIST (
glide:{queueName}:lifo), separate from the main stream. This means LIFO and FIFO jobs in the same queue coexist - LIFO jobs are drained first. - Under
concurrency > 1, multiple LIFO jobs may run in parallel; strict reverse ordering is only guaranteed withconcurrency: 1. - Works with all job types: delayed jobs return to the LIFO list after their delay expires, and schedulers can produce LIFO jobs via the template
opts.
See also: Adding jobs for the full JobOptions reference.
Job TTL
Set ttl in JobOptions to auto-expire jobs that are not processed within a time window. The value is in milliseconds.
// Expire if not processed within 30 seconds
await queue.add('time-sensitive', { alert: 'server-down' }, {
ttl: 30_000,
});
// TTL works with delayed jobs — the clock starts at creation time
await queue.add('offer', { code: 'FLASH50' }, {
delay: 5_000,
ttl: 60_000, // must be processed within 60s of creation, not of becoming active
});
// TTL works with priority jobs
await queue.add('urgent', data, {
priority: 1,
ttl: 10_000,
});When a job's TTL elapses, it is failed with the reason 'expired' during the next activation check. Jobs that are already active are not interrupted - TTL is checked at fetch time, not mid-processing. Use timeout in JobOptions to limit active processing time.
See also: Adding jobs for other per-job options.
Pluggable Serializers
By default, glide-mq uses JSON.stringify / JSON.parse for job data, return values, and progress payloads. You can replace this with any synchronous serializer.
The Serializer interface
import type { Serializer } from 'glide-mq';
interface Serializer {
/** Serialize a value to a string for storage in Valkey. */
serialize(data: unknown): string;
/** Deserialize a string from Valkey back to a value. */
deserialize(raw: string): unknown;
}Both methods must be synchronous. If serialize throws, the job is treated as a processor failure (in Worker) or skipped (in Scheduler).
Example: MessagePack serializer
import { Queue, Worker } from 'glide-mq';
import { encode, decode } from '@msgpack/msgpack';
const msgpackSerializer: Serializer = {
serialize: (data) => Buffer.from(encode(data)).toString('base64'),
deserialize: (raw) => decode(Buffer.from(raw, 'base64')),
};
const queue = new Queue('tasks', {
connection,
serializer: msgpackSerializer,
});
const worker = new Worker('tasks', processor, {
connection,
serializer: msgpackSerializer, // must match the Queue
});What is serialized
The serializer is applied to:
data- the job payload passed toqueue.add().returnvalue- the value returned by the processor.progress- the value passed tojob.updateProgress().
Consistency requirement
The same serializer must be configured on every Queue, Worker, and FlowProducer instance that operates on the same queue. A mismatch causes silent data corruption - the consumer will see {} and the job's deserializationFailed flag will be true.
Default export
The built-in JSON serializer is exported for use in conditional logic or testing:
import { JSON_SERIALIZER } from 'glide-mq';
const serializer = process.env.USE_MSGPACK === '1'
? msgpackSerializer
: JSON_SERIALIZER;
const queue = new Queue('tasks', { connection, serializer });See also: Worker and Queue for where serializer appears in options.
Ordering and Group Concurrency
Sequential processing (concurrency=1)
Add ordering.key to a job to guarantee that all jobs with the same key are processed one at a time, in the order they were added.
// All jobs with ordering.key = 'user:42' are processed sequentially
await queue.add('process-payment', { userId: 42, amount: 100 }, {
ordering: { key: 'user:42' },
});
await queue.add('send-receipt', { userId: 42 }, {
ordering: { key: 'user:42' },
});Group concurrency (concurrency > 1)
Set ordering.concurrency to allow up to N jobs per key to run in parallel across all workers:
// Max 3 concurrent jobs for tenant-42, regardless of worker count
await queue.add('process', data, {
ordering: { key: 'tenant-42', concurrency: 3 },
});Jobs exceeding the group limit are parked in a per-group wait list and automatically released when a slot opens.
// Multi-tenant isolation: each client gets max 2 concurrent jobs
for (const job of jobs) {
await queue.add('task', job.data, {
ordering: { key: `client-${job.clientId}`, concurrency: 2 },
});
}Per-group rate limiting
Limit how many jobs per ordering key can start within a time window, independent of concurrency:
// Max 10 jobs per 60 seconds for each tenant
await queue.add('sync', data, {
ordering: {
key: `tenant-${tenantId}`,
concurrency: 3,
rateLimit: { max: 10, duration: 60_000 },
},
});When both concurrency and rateLimit are set, both gates apply - a job must have a free concurrency slot and remaining rate capacity to start. Jobs that hit the rate limit are parked in a scheduler-managed promotion queue and released when the window resets.
- Promotion latency: rate-limited jobs are promoted by the scheduler loop. Worst-case latency is one
promotionInterval(default 5 s). LowerpromotionIntervalon the worker if tighter latency is needed. - Retried jobs consume rate slots - a retried job counts against the rate window like any new job.
Token bucket rate limiting
Use ordering.tokenBucket to enforce cost-based rate limiting per ordering key. Unlike the sliding window (rateLimit), which counts jobs, the token bucket assigns a cost to each job and deducts from a refilling bucket:
// Each API call costs 1 token (default), bulk exports cost 10
await queue.add('api-call', data, {
ordering: {
key: `tenant-${tenantId}`,
concurrency: 5,
tokenBucket: { capacity: 100, refillRate: 10 }, // 100 tokens max, 10 tokens/s
},
cost: 1,
});
await queue.add('bulk-export', data, {
ordering: {
key: `tenant-${tenantId}`,
concurrency: 5,
tokenBucket: { capacity: 100, refillRate: 10 },
},
cost: 10, // consumes 10 tokens
});How it works: tokens refill at refillRate tokens per second up to capacity. When a job is activated, its cost is deducted from the bucket. If insufficient tokens remain, the job is parked and promoted once enough tokens have refilled. Internally, tokens are tracked as millitokens (1 token = 1000 millitokens) for sub-integer precision.
Check order: when both concurrency, token bucket, and sliding window are configured, the gates are checked in order: concurrency -> token bucket -> sliding window. All applicable limits must pass. Strict FIFO is maintained - jobs never skip ahead of earlier jobs in the same group.
Cost validation: a job with cost greater than capacity is rejected at enqueue time. If a previously valid job becomes invalid (e.g., capacity was lowered), it is moved to the DLQ at activation.
Differences from sliding window (rateLimit):
Sliding window (rateLimit) | Token bucket (tokenBucket) | |
|---|---|---|
| Unit | Job count | Weighted cost per job |
| Config | { max, duration } | { capacity, refillRate } |
| Default cost | 1 job | cost: 1 token |
| Refill | Window resets after duration ms | Continuous refill at refillRate/s |
| Use case | "Max N jobs per window" | "Max N units of work per second" |
- Promotion latency: same as sliding window - worst-case one
promotionInterval(default 5 s). - Composition: token bucket composes with concurrency, sliding window, and global rate limits. All gates are enforced.
Runtime group rate limiting
The static rate limits above (rateLimit, tokenBucket) are set at enqueue time. For dynamic scenarios - like a crawler hitting a 429 response - use runtime rate limiting to pause a specific group from inside or outside the processor.
From inside the processor
const worker = new Worker('crawl', async (job) => {
const res = await fetch(job.data.url);
if (res.status === 429) {
const retryAfter = parseInt(res.headers.get('retry-after') || '60') * 1000;
// Pause this domain group - other domains keep processing
await job.rateLimitGroup(retryAfter);
}
return { html: await res.text() };
}, { connection });job.rateLimitGroup(duration, opts?) re-parks the current job in the group queue and pauses the entire group for duration milliseconds. The job resumes automatically when the duration expires.
Throw-style sugar
import { GroupRateLimitError } from 'glide-mq';
const worker = new Worker('crawl', async (job) => {
const res = await fetch(job.data.url);
if (res.status === 429) {
throw new GroupRateLimitError(60_000);
}
return res.text();
}, { connection });From outside the processor
// From a webhook, health check, or admin API
await queue.rateLimitGroup('example.com', 60_000);queue.rateLimitGroup(groupKey, duration, opts?) registers the group as rate-limited. Jobs already in the group queue are held until the duration expires.
Options
All three APIs accept the same options:
| Option | Values | Default | Description |
|---|---|---|---|
currentJob | 'requeue' | 'fail' | 'requeue' | Re-park the job (no retry consumed) or fail it |
requeuePosition | 'front' | 'back' | 'front' | Where to place the re-parked job in the group queue |
extend | 'max' | 'replace' | 'max' | Never shorten an existing pause, or overwrite it |
await job.rateLimitGroup(30_000, {
currentJob: 'requeue', // default: re-park without consuming retry
requeuePosition: 'front', // default: this job resumes first
extend: 'max', // default: if already paused for longer, keep the longer pause
});How it works
- The current job is atomically re-parked in the per-group ZSET queue
- The group is registered in the
ratelimitedsorted set with a resume timestamp - The scheduler's promotion loop (
promoteRateLimited) checks this set on every cycle - When the resume timestamp passes, queued jobs are promoted back to the stream
- The re-parked job resumes as a "returning" activation - no ordering violations
Notes
- Jobs with different ordering keys (or no ordering key) are processed concurrently as normal.
- Ordering keys are limited to 256 characters.
concurrency=1(or omitted) preserves strict FIFO ordering per key.concurrency > 1caps parallelism but does not guarantee FIFO within the group.- Group concurrency and global concurrency (
setGlobalConcurrency) compose: both limits are enforced. - Per-group rate limiting, token bucket, group concurrency, and global concurrency all compose: all applicable limits are enforced.
- Group slots are released on job complete, fail, retry, DLQ move, and stall recovery.
Custom Job IDs
By default glide-mq assigns a monotonically increasing integer ID to each job. You can supply your own ID via opts.jobId to get deterministic, idempotent job creation:
// Deterministic job: safe to call multiple times
const job = await queue.add('send-email', { to: 'user@example.com' }, {
jobId: 'email-user-42',
});
// job is null if a job with this ID already exists (silent skip)Constraints
- Max 256 characters.
- Must not contain control characters (U+0000-U+001F, U+007F), curly braces (
{,}), or colons (:). - Violating either constraint throws synchronously before the network call.
Duplicate behaviour by surface
| Surface | Behaviour on duplicate ID |
|---|---|
Queue.add | Returns null (silent skip) |
Queue.addBulk | Silently omits the duplicate from the returned array |
FlowProducer.add | Throws - flows cannot be partially created |
TestQueue.add | Returns null (mirrors production) |
Interaction with deduplication
opts.jobId and opts.deduplication are independent mechanisms. When both are set the deduplication check runs first; if the job is deduplicated, the custom ID is never stored. If the dedup check passes, the custom ID collision check runs next.
Deduplication
Prevent duplicate jobs from entering the queue using deduplication.id. Three modes are supported:
| Mode | Behaviour |
|---|---|
simple | Skip the new job if any job with the same ID already exists (any state). |
throttle | Accept only the first job in a TTL window; later arrivals are dropped. |
debounce | Accept only the last job in a TTL window; earlier arrivals are cancelled. |
// Simple: skip if a job with this ID is already queued / active / completed
await queue.add('send-welcome', { userId: 99 }, {
deduplication: { id: 'welcome-99', mode: 'simple' },
});
// Throttle: at most one "sync" job per 10 s
await queue.add('sync', { region: 'eu' }, {
deduplication: { id: 'sync-eu', mode: 'throttle', ttl: 10_000 },
});
// Debounce: only the last "search" job within 500 ms is actually queued
await queue.add('search', { query: 'hello' }, {
deduplication: { id: 'search-user-1', mode: 'debounce', ttl: 500 },
});queue.add() returns null when a job is skipped by deduplication.
Global Concurrency
Limit the total number of concurrently active jobs across all workers sharing a queue, regardless of per-worker concurrency settings.
const queue = new Queue('tasks', { connection });
// Allow at most 20 active jobs across all workers at once
await queue.setGlobalConcurrency(20);
// Remove the limit
await queue.setGlobalConcurrency(0);Workers check this limit atomically before picking up each job via the checkConcurrency server function.
Global Rate Limiting
Cap the total job throughput across all workers sharing a queue. The config is stored in the Valkey meta hash and picked up dynamically by workers within one scheduler tick.
const queue = new Queue('tasks', { connection });
// Max 500 jobs per minute across all workers
await queue.setGlobalRateLimit({ max: 500, duration: 60_000 });
// Read current config
const limit = await queue.getGlobalRateLimit();
// { max: 500, duration: 60000 } or null if not set
// Remove the limit
await queue.removeGlobalRateLimit();- Global rate limit takes precedence over
WorkerOptions.limiter. When both are set, the stricter limit wins. - Changes are picked up by workers within one scheduler tick (no restart needed).
Job Revocation
Cooperatively cancel a job that is waiting, delayed, or currently being processed.
const job = await queue.add('long-task', { input: 'data' });
// Later...
const result = await queue.revoke(job.id);
// 'revoked' — job was waiting/delayed and is now in the failed set
// 'flagged' — job is active; the worker will abort it cooperatively
// 'not_found'— job does not existIn your processor, use job.abortSignal to react to revocation:
const worker = new Worker('tasks', async (job) => {
for (const chunk of largeDataset) {
if (job.abortSignal?.aborted) {
throw new Error('Job revoked');
}
await processChunk(chunk);
}
return { done: true };
}, { connection });job.abortSignal is an AbortSignal. You can pass it directly to fetch, axios, or any AbortSignal-aware API.
Transparent Compression
Enable gzip compression at the queue level. Workers decompress automatically - no changes required in processors.
const queue = new Queue('tasks', {
connection,
compression: 'gzip',
});
// Payload is gzip-compressed before storing in Valkey
await queue.add('process-large', { report: '... 15 KB of data ...' });
// Stored size: ~300 bytes (98% savings on repetitive data)Payload size limit: job data must be ≤ 1 MB after serialisation but before compression. Larger payloads throw immediately:
Error: Job data exceeds maximum size (1234567 bytes > 1MB).
Use smaller payloads or store large data externally.Store large blobs in S3/GCS/object storage and pass a reference URL in the job data instead.
Retries and Backoff
Configure retry behaviour per job via attempts and backoff:
await queue.add('send-email', data, {
attempts: 5,
backoff: { type: 'exponential', delay: 1_000 },
// delay sequence: 1s, 2s, 4s, 8s (capped at attempts)
});
// Fixed delay
await queue.add('webhook', data, {
attempts: 3,
backoff: { type: 'fixed', delay: 2_000 },
});
// Exponential with jitter (avoids thundering herd)
await queue.add('poll', data, {
attempts: 10,
backoff: { type: 'exponential', delay: 500, jitter: 0.1 },
});
// Custom strategy — register on the Worker
const worker = new Worker('tasks', processor, {
connection,
backoffStrategies: {
'rate-limited': (attemptsMade, err) => {
// Respect Retry-After header
if (err.retryAfter) return err.retryAfter * 1_000;
return attemptsMade * 3_000;
},
},
});
await queue.add('api-call', data, {
attempts: 5,
backoff: { type: 'rate-limited', delay: 0 },
});When attempts is exhausted the job moves to the failed state (or the DLQ if configured).
Dead Letter Queues
Route permanently failed jobs to a separate queue for later inspection or manual retry.
const worker = new Worker('tasks', processor, {
connection,
deadLetterQueue: { name: 'tasks-dlq' },
});
// Inspect DLQ contents
const dlqQueue = new Queue('tasks-dlq', { connection });
const failedJobs = await dlqQueue.getJobs('waiting');
// Or use the convenience method on the original queue
const dlqJobs = await queue.getDeadLetterJobs(0, 49);Jobs in the DLQ are ordinary jobs - you can inspect, retry, or remove them like any other job.