Skip to content

Advanced Features

Table of Contents


Shared Client

By default, each glide-mq component creates its own GLIDE client (one TCP connection). You can optionally inject a shared client to reduce connection count.

Default behavior (dedicated connections)

typescript
const connection = { addresses: [{ host: 'localhost', port: 6379 }] };

const queue  = new Queue('jobs', { connection });        // 1 connection
const flow   = new FlowProducer({ connection });          // 1 connection
const worker = new Worker('jobs', handler, { connection });// 2 connections (command + blocking)
const events = new QueueEvents('jobs', { connection });   // 1 connection
// Total: 5 TCP connections

Shared client (opt-in)

typescript
import { GlideClient } from '@glidemq/speedkey';

const client = await GlideClient.createClient({ addresses: [{ host: 'localhost' }] });
const connection = { addresses: [{ host: 'localhost' }] };

const queue  = new Queue('jobs', { client });
const flow   = new FlowProducer({ client });
const worker = new Worker('jobs', handler, { connection, commandClient: client });
const events = new QueueEvents('jobs', { connection });
// Total: 2 TCP connections (shared + Worker's blocking client)

What can share

Queue, FlowProducer, and Worker's command client all perform non-blocking operations (FCALL, HGET, ZADD, etc.) and can safely share a single GLIDE client. GLIDE's Rust core multiplexes commands over one TCP connection with up to 1000 concurrent in-flight requests.

What cannot share

Worker's blocking client (XREADGROUP BLOCK) and QueueEvents (XREAD BLOCK) tie up the connection's read loop. These always get their own dedicated connection - you cannot inject a shared client into them.

QueueEvents will throw if you try to pass a client:

typescript
// Throws: "QueueEvents does not accept an injected `client`"
new QueueEvents('jobs', { connection, client } as any);

Tradeoffs

Dedicated (default)Shared
ConnectionsN+2 per setup (1 per Queue/FlowProducer + 2 per Worker + 1 per QueueEvents)2 (shared + blocking)
ThroughputBaselineSame or slightly better (fewer NAPI wake callbacks)
LatencyBaselineSame (p50/p95/p99 identical in benchmarks)
IsolationEach component has its own connection - failures are independentAll components sharing a client are affected by a disconnect
ReconnectionEach component reconnects independentlyWorker emits error if shared client is unreachable - you manage reconnection
LifecycleComponent creates and closes its own clientYou create the client, you close it. close() on a component does not destroy the shared client.
SimplicityPass connection - doneMust create client upfront, pass it around, close in correct order
MemorySlightly higher (N client objects + Rust state machines)Lower (1 client object shared)

When to use shared

  • Many Queue instances pointing to different queue names (e.g., multi-tenant routing)
  • Queue + FlowProducer on the same process - saves 1 connection
  • Connection count is a concern (cloud Valkey with connection limits)

When to stick with dedicated

  • Simple setup with one Queue and one Worker - the default is fine
  • You want full isolation between components
  • You don't want to manage client lifecycle manually

Constraints

  • Worker always requires connection even when commandClient is provided, because the blocking client must be auto-created.
  • Don't close the shared client while components are alive. Close components first, then the client.
  • Don't mutate shared client state externally (e.g., SELECT to change database).
  • commandClient and client are aliases on Worker - provide one or the other, not both.

Close order

typescript
// Correct: close components first, then shared client
await queue.close();    // detaches from shared client (does not close it)
await worker.close();   // closes only the auto-created blocking client
await flow.close();     // detaches from shared client
client.close();         // now safe - no components using it

Producer with an external client

Producer also supports external client injection. When opts.client is provided the Producer borrows the connection without taking ownership - close() will not destroy it. This is the recommended pattern for serverless environments where the connection lifecycle must align with the request lifecycle:

typescript
import { GlideClient } from '@glidemq/speedkey';
import { Producer } from 'glide-mq';

export async function handler(event) {
  const client = await GlideClient.createClient({ addresses: [{ host: process.env.VALKEY_HOST }] });
  const producer = new Producer('jobs', { client });

  for (const job of event.jobs) {
    await producer.add(job.name, job.data);
  }

  // producer.close() does NOT close the client when client was injected
  await producer.close();
  client.close(); // caller owns lifecycle
}

For connection reuse across warm invocations, use ServerlessPool instead - see Serverless.

inflightRequestsLimit

GLIDE defaults to 1000 concurrent in-flight requests per client. For high-concurrency setups, you can tune this:

typescript
const connection = {
  addresses: [{ host: 'localhost' }],
  inflightRequestsLimit: 2000,
};

At Worker concurrency=50, peak inflight is ~55 commands. The 1000 default supports up to ~950 concurrent job activations across all components sharing one client.


Job Schedulers

Use upsertJobScheduler to define repeatable jobs driven by a cron expression or a fixed interval. Schedulers survive worker restarts - the next run time is stored in Valkey.

typescript
const queue = new Queue('tasks', { connection });

// Cron: run "daily-report" every day at 08:00 UTC
await queue.upsertJobScheduler(
  'daily-report',
  { pattern: '0 8 * * *' },
  { name: 'generate-report', data: { type: 'daily' } },
);

// Bound a scheduler to a campaign window and stop after 36 runs
await queue.upsertJobScheduler(
  'black-friday-deals',
  {
    pattern: '0 */2 * * *',
    startDate: new Date('2026-11-28T00:00:00Z'),
    endDate: new Date('2026-12-01T00:00:00Z'),
    limit: 36,
  },
  { name: 'promote-deal', data: { campaign: 'black-friday' } },
);

// Interval: run "cleanup" every 5 minutes
await queue.upsertJobScheduler(
  'cleanup',
  { every: 5 * 60 * 1_000 },  // ms
  { name: 'cleanup-old-records', data: {} },
);

// List all registered schedulers
const schedulers = await queue.getRepeatableJobs();

// Remove a scheduler (does not cancel jobs already in flight)
await queue.removeJobScheduler('cleanup');

Repeat-after-complete mode

repeatAfterComplete schedules the next job only after the current one completes (or terminally fails). Unlike every, which fires at fixed intervals regardless of processing time, repeatAfterComplete ensures no overlap between successive runs.

typescript
// Poll a sensor every 5 seconds after the previous poll finishes
await queue.upsertJobScheduler('sensor-poll', {
  repeatAfterComplete: 5000, // 5s after previous job completes
}, { name: 'poll', data: { sensor: 'temp-1' } });

This mode is useful for:

  • Polling - avoid stacking requests when the upstream is slow.
  • Sequential pipelines - each step must finish before the next begins.
  • Adaptive intervals - combine with a custom processor that adjusts repeatAfterComplete via upsertJobScheduler based on results.

repeatAfterComplete is mutually exclusive with pattern and every. Bounded options (startDate, endDate, limit) work normally with this mode.

Bounded schedulers

All three scheduler modes (pattern, every, repeatAfterComplete) support bounding via startDate, endDate, and limit:

OptionTypeEffect
startDateDate | numberDefer the first eligible run until this time.
endDateDate | numberAuto-remove the scheduler when the next scheduled time would exceed this date.
limitnumberAuto-remove the scheduler after creating this many jobs.
typescript
// Run a cron job during a specific campaign window, max 36 runs
await queue.upsertJobScheduler(
  'black-friday-deals',
  {
    pattern: '0 */2 * * *',
    startDate: new Date('2026-11-28T00:00:00Z'),
    endDate: new Date('2026-12-01T00:00:00Z'),
    limit: 36,
  },
  { name: 'promote-deal', data: { campaign: 'black-friday' } },
);

// Interval with a delayed start and a hard stop after 100 iterations
await queue.upsertJobScheduler(
  'warmup-cache',
  {
    every: 30_000,
    startDate: Date.now() + 60_000,  // first run delayed 1 minute
    endDate: new Date('2026-12-31'), // stop scheduling after this date
    limit: 100,                       // auto-remove after 100 runs
  },
  { name: 'warmup', data: { region: 'us-east' } },
);

getJobScheduler() / getRepeatableJobs() expose the stored bounds together with iterationCount so you can inspect how many runs have already fired.

The internal Scheduler class fires a promotion loop that converts due scheduler entries into real jobs, then re-registers the next occurrence.


LIFO Mode

Set lifo: true in JobOptions to process jobs in last-in-first-out order. The most recently added job is picked up first.

typescript
await queue.add('render', { frame: 100 }, { lifo: true });
await queue.add('render', { frame: 101 }, { lifo: true });
await queue.add('render', { frame: 102 }, { lifo: true });
// Processing order: 102, 101, 100

Ordering precedence

Workers check sources in this order: priority > LIFO > FIFO. Priority jobs (those with priority > 0) are always fetched first. Among non-priority jobs, LIFO jobs are fetched before FIFO jobs sitting in the stream.

Constraints

  • Cannot combine with ordering.key. Throws at enqueue time:
    Error: lifo and ordering.key cannot be used together
  • LIFO jobs are stored in a dedicated Valkey LIST (glide:{queueName}:lifo), separate from the main stream. This means LIFO and FIFO jobs in the same queue coexist - LIFO jobs are drained first.
  • Under concurrency > 1, multiple LIFO jobs may run in parallel; strict reverse ordering is only guaranteed with concurrency: 1.
  • Works with all job types: delayed jobs return to the LIFO list after their delay expires, and schedulers can produce LIFO jobs via the template opts.

See also: Adding jobs for the full JobOptions reference.


Job TTL

Set ttl in JobOptions to auto-expire jobs that are not processed within a time window. The value is in milliseconds.

typescript
// Expire if not processed within 30 seconds
await queue.add('time-sensitive', { alert: 'server-down' }, {
  ttl: 30_000,
});

// TTL works with delayed jobs — the clock starts at creation time
await queue.add('offer', { code: 'FLASH50' }, {
  delay: 5_000,
  ttl: 60_000, // must be processed within 60s of creation, not of becoming active
});

// TTL works with priority jobs
await queue.add('urgent', data, {
  priority: 1,
  ttl: 10_000,
});

When a job's TTL elapses, it is failed with the reason 'expired' during the next activation check. Jobs that are already active are not interrupted - TTL is checked at fetch time, not mid-processing. Use timeout in JobOptions to limit active processing time.

See also: Adding jobs for other per-job options.


Pluggable Serializers

By default, glide-mq uses JSON.stringify / JSON.parse for job data, return values, and progress payloads. You can replace this with any synchronous serializer.

The Serializer interface

typescript
import type { Serializer } from 'glide-mq';

interface Serializer {
  /** Serialize a value to a string for storage in Valkey. */
  serialize(data: unknown): string;
  /** Deserialize a string from Valkey back to a value. */
  deserialize(raw: string): unknown;
}

Both methods must be synchronous. If serialize throws, the job is treated as a processor failure (in Worker) or skipped (in Scheduler).

Example: MessagePack serializer

typescript
import { Queue, Worker } from 'glide-mq';
import { encode, decode } from '@msgpack/msgpack';

const msgpackSerializer: Serializer = {
  serialize: (data) => Buffer.from(encode(data)).toString('base64'),
  deserialize: (raw) => decode(Buffer.from(raw, 'base64')),
};

const queue = new Queue('tasks', {
  connection,
  serializer: msgpackSerializer,
});

const worker = new Worker('tasks', processor, {
  connection,
  serializer: msgpackSerializer, // must match the Queue
});

What is serialized

The serializer is applied to:

  • data - the job payload passed to queue.add().
  • returnvalue - the value returned by the processor.
  • progress - the value passed to job.updateProgress().

Consistency requirement

The same serializer must be configured on every Queue, Worker, and FlowProducer instance that operates on the same queue. A mismatch causes silent data corruption - the consumer will see {} and the job's deserializationFailed flag will be true.

Default export

The built-in JSON serializer is exported for use in conditional logic or testing:

typescript
import { JSON_SERIALIZER } from 'glide-mq';

const serializer = process.env.USE_MSGPACK === '1'
  ? msgpackSerializer
  : JSON_SERIALIZER;

const queue = new Queue('tasks', { connection, serializer });

See also: Worker and Queue for where serializer appears in options.


Ordering and Group Concurrency

Sequential processing (concurrency=1)

Add ordering.key to a job to guarantee that all jobs with the same key are processed one at a time, in the order they were added.

typescript
// All jobs with ordering.key = 'user:42' are processed sequentially
await queue.add('process-payment', { userId: 42, amount: 100 }, {
  ordering: { key: 'user:42' },
});
await queue.add('send-receipt', { userId: 42 }, {
  ordering: { key: 'user:42' },
});

Group concurrency (concurrency > 1)

Set ordering.concurrency to allow up to N jobs per key to run in parallel across all workers:

typescript
// Max 3 concurrent jobs for tenant-42, regardless of worker count
await queue.add('process', data, {
  ordering: { key: 'tenant-42', concurrency: 3 },
});

Jobs exceeding the group limit are parked in a per-group wait list and automatically released when a slot opens.

typescript
// Multi-tenant isolation: each client gets max 2 concurrent jobs
for (const job of jobs) {
  await queue.add('task', job.data, {
    ordering: { key: `client-${job.clientId}`, concurrency: 2 },
  });
}

Per-group rate limiting

Limit how many jobs per ordering key can start within a time window, independent of concurrency:

typescript
// Max 10 jobs per 60 seconds for each tenant
await queue.add('sync', data, {
  ordering: {
    key: `tenant-${tenantId}`,
    concurrency: 3,
    rateLimit: { max: 10, duration: 60_000 },
  },
});

When both concurrency and rateLimit are set, both gates apply - a job must have a free concurrency slot and remaining rate capacity to start. Jobs that hit the rate limit are parked in a scheduler-managed promotion queue and released when the window resets.

  • Promotion latency: rate-limited jobs are promoted by the scheduler loop. Worst-case latency is one promotionInterval (default 5 s). Lower promotionInterval on the worker if tighter latency is needed.
  • Retried jobs consume rate slots - a retried job counts against the rate window like any new job.

Token bucket rate limiting

Use ordering.tokenBucket to enforce cost-based rate limiting per ordering key. Unlike the sliding window (rateLimit), which counts jobs, the token bucket assigns a cost to each job and deducts from a refilling bucket:

typescript
// Each API call costs 1 token (default), bulk exports cost 10
await queue.add('api-call', data, {
  ordering: {
    key: `tenant-${tenantId}`,
    concurrency: 5,
    tokenBucket: { capacity: 100, refillRate: 10 }, // 100 tokens max, 10 tokens/s
  },
  cost: 1,
});

await queue.add('bulk-export', data, {
  ordering: {
    key: `tenant-${tenantId}`,
    concurrency: 5,
    tokenBucket: { capacity: 100, refillRate: 10 },
  },
  cost: 10, // consumes 10 tokens
});

How it works: tokens refill at refillRate tokens per second up to capacity. When a job is activated, its cost is deducted from the bucket. If insufficient tokens remain, the job is parked and promoted once enough tokens have refilled. Internally, tokens are tracked as millitokens (1 token = 1000 millitokens) for sub-integer precision.

Check order: when both concurrency, token bucket, and sliding window are configured, the gates are checked in order: concurrency -> token bucket -> sliding window. All applicable limits must pass. Strict FIFO is maintained - jobs never skip ahead of earlier jobs in the same group.

Cost validation: a job with cost greater than capacity is rejected at enqueue time. If a previously valid job becomes invalid (e.g., capacity was lowered), it is moved to the DLQ at activation.

Differences from sliding window (rateLimit):

Sliding window (rateLimit)Token bucket (tokenBucket)
UnitJob countWeighted cost per job
Config{ max, duration }{ capacity, refillRate }
Default cost1 jobcost: 1 token
RefillWindow resets after duration msContinuous refill at refillRate/s
Use case"Max N jobs per window""Max N units of work per second"
  • Promotion latency: same as sliding window - worst-case one promotionInterval (default 5 s).
  • Composition: token bucket composes with concurrency, sliding window, and global rate limits. All gates are enforced.

Runtime group rate limiting

The static rate limits above (rateLimit, tokenBucket) are set at enqueue time. For dynamic scenarios - like a crawler hitting a 429 response - use runtime rate limiting to pause a specific group from inside or outside the processor.

From inside the processor

typescript
const worker = new Worker('crawl', async (job) => {
  const res = await fetch(job.data.url);
  if (res.status === 429) {
    const retryAfter = parseInt(res.headers.get('retry-after') || '60') * 1000;
    // Pause this domain group - other domains keep processing
    await job.rateLimitGroup(retryAfter);
  }
  return { html: await res.text() };
}, { connection });

job.rateLimitGroup(duration, opts?) re-parks the current job in the group queue and pauses the entire group for duration milliseconds. The job resumes automatically when the duration expires.

Throw-style sugar

typescript
import { GroupRateLimitError } from 'glide-mq';

const worker = new Worker('crawl', async (job) => {
  const res = await fetch(job.data.url);
  if (res.status === 429) {
    throw new GroupRateLimitError(60_000);
  }
  return res.text();
}, { connection });

From outside the processor

typescript
// From a webhook, health check, or admin API
await queue.rateLimitGroup('example.com', 60_000);

queue.rateLimitGroup(groupKey, duration, opts?) registers the group as rate-limited. Jobs already in the group queue are held until the duration expires.

Options

All three APIs accept the same options:

OptionValuesDefaultDescription
currentJob'requeue' | 'fail''requeue'Re-park the job (no retry consumed) or fail it
requeuePosition'front' | 'back''front'Where to place the re-parked job in the group queue
extend'max' | 'replace''max'Never shorten an existing pause, or overwrite it
typescript
await job.rateLimitGroup(30_000, {
  currentJob: 'requeue',     // default: re-park without consuming retry
  requeuePosition: 'front',  // default: this job resumes first
  extend: 'max',             // default: if already paused for longer, keep the longer pause
});

How it works

  1. The current job is atomically re-parked in the per-group ZSET queue
  2. The group is registered in the ratelimited sorted set with a resume timestamp
  3. The scheduler's promotion loop (promoteRateLimited) checks this set on every cycle
  4. When the resume timestamp passes, queued jobs are promoted back to the stream
  5. The re-parked job resumes as a "returning" activation - no ordering violations

Notes

  • Jobs with different ordering keys (or no ordering key) are processed concurrently as normal.
  • Ordering keys are limited to 256 characters.
  • concurrency=1 (or omitted) preserves strict FIFO ordering per key.
  • concurrency > 1 caps parallelism but does not guarantee FIFO within the group.
  • Group concurrency and global concurrency (setGlobalConcurrency) compose: both limits are enforced.
  • Per-group rate limiting, token bucket, group concurrency, and global concurrency all compose: all applicable limits are enforced.
  • Group slots are released on job complete, fail, retry, DLQ move, and stall recovery.

Custom Job IDs

By default glide-mq assigns a monotonically increasing integer ID to each job. You can supply your own ID via opts.jobId to get deterministic, idempotent job creation:

typescript
// Deterministic job: safe to call multiple times
const job = await queue.add('send-email', { to: 'user@example.com' }, {
  jobId: 'email-user-42',
});
// job is null if a job with this ID already exists (silent skip)

Constraints

  • Max 256 characters.
  • Must not contain control characters (U+0000-U+001F, U+007F), curly braces ({, }), or colons (:).
  • Violating either constraint throws synchronously before the network call.

Duplicate behaviour by surface

SurfaceBehaviour on duplicate ID
Queue.addReturns null (silent skip)
Queue.addBulkSilently omits the duplicate from the returned array
FlowProducer.addThrows - flows cannot be partially created
TestQueue.addReturns null (mirrors production)

Interaction with deduplication

opts.jobId and opts.deduplication are independent mechanisms. When both are set the deduplication check runs first; if the job is deduplicated, the custom ID is never stored. If the dedup check passes, the custom ID collision check runs next.


Deduplication

Prevent duplicate jobs from entering the queue using deduplication.id. Three modes are supported:

ModeBehaviour
simpleSkip the new job if any job with the same ID already exists (any state).
throttleAccept only the first job in a TTL window; later arrivals are dropped.
debounceAccept only the last job in a TTL window; earlier arrivals are cancelled.
typescript
// Simple: skip if a job with this ID is already queued / active / completed
await queue.add('send-welcome', { userId: 99 }, {
  deduplication: { id: 'welcome-99', mode: 'simple' },
});

// Throttle: at most one "sync" job per 10 s
await queue.add('sync', { region: 'eu' }, {
  deduplication: { id: 'sync-eu', mode: 'throttle', ttl: 10_000 },
});

// Debounce: only the last "search" job within 500 ms is actually queued
await queue.add('search', { query: 'hello' }, {
  deduplication: { id: 'search-user-1', mode: 'debounce', ttl: 500 },
});

queue.add() returns null when a job is skipped by deduplication.


Global Concurrency

Limit the total number of concurrently active jobs across all workers sharing a queue, regardless of per-worker concurrency settings.

typescript
const queue = new Queue('tasks', { connection });

// Allow at most 20 active jobs across all workers at once
await queue.setGlobalConcurrency(20);

// Remove the limit
await queue.setGlobalConcurrency(0);

Workers check this limit atomically before picking up each job via the checkConcurrency server function.


Global Rate Limiting

Cap the total job throughput across all workers sharing a queue. The config is stored in the Valkey meta hash and picked up dynamically by workers within one scheduler tick.

typescript
const queue = new Queue('tasks', { connection });

// Max 500 jobs per minute across all workers
await queue.setGlobalRateLimit({ max: 500, duration: 60_000 });

// Read current config
const limit = await queue.getGlobalRateLimit();
// { max: 500, duration: 60000 } or null if not set

// Remove the limit
await queue.removeGlobalRateLimit();
  • Global rate limit takes precedence over WorkerOptions.limiter. When both are set, the stricter limit wins.
  • Changes are picked up by workers within one scheduler tick (no restart needed).

Job Revocation

Cooperatively cancel a job that is waiting, delayed, or currently being processed.

typescript
const job = await queue.add('long-task', { input: 'data' });

// Later...
const result = await queue.revoke(job.id);
// 'revoked'  — job was waiting/delayed and is now in the failed set
// 'flagged'  — job is active; the worker will abort it cooperatively
// 'not_found'— job does not exist

In your processor, use job.abortSignal to react to revocation:

typescript
const worker = new Worker('tasks', async (job) => {
  for (const chunk of largeDataset) {
    if (job.abortSignal?.aborted) {
      throw new Error('Job revoked');
    }
    await processChunk(chunk);
  }
  return { done: true };
}, { connection });

job.abortSignal is an AbortSignal. You can pass it directly to fetch, axios, or any AbortSignal-aware API.


Transparent Compression

Enable gzip compression at the queue level. Workers decompress automatically - no changes required in processors.

typescript
const queue = new Queue('tasks', {
  connection,
  compression: 'gzip',
});

// Payload is gzip-compressed before storing in Valkey
await queue.add('process-large', { report: '... 15 KB of data ...' });
// Stored size: ~300 bytes (98% savings on repetitive data)

Payload size limit: job data must be ≤ 1 MB after serialisation but before compression. Larger payloads throw immediately:

Error: Job data exceeds maximum size (1234567 bytes > 1MB).
       Use smaller payloads or store large data externally.

Store large blobs in S3/GCS/object storage and pass a reference URL in the job data instead.


Retries and Backoff

Configure retry behaviour per job via attempts and backoff:

typescript
await queue.add('send-email', data, {
  attempts: 5,
  backoff: { type: 'exponential', delay: 1_000 },
  // delay sequence: 1s, 2s, 4s, 8s (capped at attempts)
});

// Fixed delay
await queue.add('webhook', data, {
  attempts: 3,
  backoff: { type: 'fixed', delay: 2_000 },
});

// Exponential with jitter (avoids thundering herd)
await queue.add('poll', data, {
  attempts: 10,
  backoff: { type: 'exponential', delay: 500, jitter: 0.1 },
});

// Custom strategy — register on the Worker
const worker = new Worker('tasks', processor, {
  connection,
  backoffStrategies: {
    'rate-limited': (attemptsMade, err) => {
      // Respect Retry-After header
      if (err.retryAfter) return err.retryAfter * 1_000;
      return attemptsMade * 3_000;
    },
  },
});

await queue.add('api-call', data, {
  attempts: 5,
  backoff: { type: 'rate-limited', delay: 0 },
});

When attempts is exhausted the job moves to the failed state (or the DLQ if configured).


Dead Letter Queues

Route permanently failed jobs to a separate queue for later inspection or manual retry.

typescript
const worker = new Worker('tasks', processor, {
  connection,
  deadLetterQueue: { name: 'tasks-dlq' },
});

// Inspect DLQ contents
const dlqQueue = new Queue('tasks-dlq', { connection });
const failedJobs = await dlqQueue.getJobs('waiting');

// Or use the convenience method on the original queue
const dlqJobs = await queue.getDeadLetterJobs(0, 49);

Jobs in the DLQ are ordinary jobs - you can inspect, retry, or remove them like any other job.

Released under the Apache-2.0 License.