Skip to content

Serverless & Infrastructure

Serverless producers, IAM authentication, HTTP proxy, and cluster mode.

Serverless Producer

Demonstrates Producer and ServerlessPool for lightweight job enqueue in serverless/edge environments. Producer is a minimal alternative to Queue (no EventEmitter, returns string IDs), and ServerlessPool caches connections for warm Lambda/Edge invocations.

ProducerQueue
WeightMinimal (no EventEmitter)Full-featured
ReturnsString ID or nullJob instance
Methodsadd(), addBulk(), close()25+ methods
WorkersNoYes
EventsNoYes
Use caseServerless, edge, fire-and-forgetApplication servers
typescript
import { Producer, ServerlessPool, serverlessPool, Worker } from 'glide-mq';
import type { Job, ProducerOptions } from 'glide-mq';
import { setTimeout } from 'timers/promises';

const connection = { addresses: [{ host: 'localhost', port: 6379 }] };

// Producer is a lightweight alternative to Queue for serverless/edge environments.
// No EventEmitter, no Job instances, no state tracking - just add() and addBulk().
// Returns plain string IDs instead of full Job objects.
//
// ServerlessPool caches Producer instances by connection fingerprint so warm
// Lambda/Edge invocations reuse connections instead of creating new ones.

// --- 1. Direct Producer usage ---

const producer = new Producer('tasks', { connection });

console.log('--- Direct Producer ---\n');

const id1 = await producer.add('process-image', { url: 'https://example.com/photo.jpg', width: 800 });
console.log(`Added job: ${id1}`);

const id2 = await producer.add('send-email', { to: 'user@example.com', subject: 'Welcome' });
console.log(`Added job: ${id2}`);

// Custom job ID for idempotency
const id3 = await producer.add('charge', { amount: 99.99 }, { jobId: 'order-123' });
console.log(`Added job: ${id3} (custom ID)`);

// Duplicate returns null
const dup = await producer.add('charge', { amount: 99.99 }, { jobId: 'order-123' });
console.log(`Duplicate: ${dup} (null = skipped)\n`);

// --- 2. Bulk enqueue ---

console.log('--- Bulk enqueue ---\n');

const ids = await producer.addBulk([
  { name: 'resize', data: { file: 'a.png' } },
  { name: 'resize', data: { file: 'b.png' } },
  { name: 'resize', data: { file: 'c.png' } },
  { name: 'resize', data: { file: 'd.png' } },
  { name: 'resize', data: { file: 'e.png' } },
]);
console.log(`Bulk added ${ids.length} jobs: [${ids.join(', ')}]\n`);

// --- 3. ServerlessPool (connection reuse for Lambda/Edge) ---

console.log('--- ServerlessPool ---\n');

const opts: ProducerOptions = { connection };

// Simulating multiple Lambda invocations - same pool, same cached connection
const p1 = serverlessPool.getProducer('notifications', opts);
const p2 = serverlessPool.getProducer('notifications', opts);
console.log(`Same instance: ${p1 === p2}`); // true - cached by fingerprint

const p3 = serverlessPool.getProducer('analytics', opts);
console.log(`Different queue: ${p1 === p3}`); // false - different queue name

await p1.add('push', { token: 'device-abc', message: 'New order!' });
await p3.add('track', { event: 'page_view', page: '/checkout' });
console.log('Enqueued via pooled producers\n');

// --- 4. Worker consumes the jobs ---

console.log('--- Worker processing ---\n');

const worker = new Worker('tasks', async (job: Job) => {
  console.log(`[worker] ${job.name}: ${JSON.stringify(job.data)}`);
  return { done: true };
}, { connection, concurrency: 5 });

worker.on('error', () => {});

await setTimeout(1500);

const notifWorker = new Worker('notifications', async (job: Job) => {
  console.log(`[notifications] ${job.name}: ${JSON.stringify(job.data)}`);
  return { sent: true };
}, { connection, concurrency: 2 });

notifWorker.on('error', () => {});

await setTimeout(1000);

// --- Shutdown ---
await Promise.all([
  producer.close(),
  serverlessPool.closeAll(),
  worker.close(),
  notifWorker.close(),
]);
console.log('\nDone.');

View full source


IAM Auth

AWS IAM authentication for ElastiCache and MemoryDB with glide-mq. The client generates SigV4 tokens automatically using the default AWS credentials chain -- no AWS SDK dependency needed in your app.

typescript
import { Queue, Worker } from 'glide-mq';
import type { Job, ConnectionOptions, IamCredentials } from 'glide-mq';
import { setTimeout } from 'timers/promises';

// --- AWS IAM Authentication for ElastiCache / MemoryDB ---
//
// glide-mq supports IAM auth natively via the valkey-glide client.
// The client generates SigV4 tokens automatically using the default
// AWS credentials chain (env vars, ~/.aws/credentials, EC2 instance role, etc.).
//
// TLS is REQUIRED for IAM auth - AWS enforces encrypted connections.

const REGION = process.env.AWS_REGION ?? 'us-east-1';

// --- 1. ElastiCache Serverless with IAM ---

const elasticacheConnection: ConnectionOptions = {
  addresses: [{ host: 'my-cache.serverless.use1.cache.amazonaws.com', port: 6379 }],
  useTLS: true, // Required for IAM auth
  credentials: {
    type: 'iam',
    serviceType: 'elasticache',
    region: REGION,
    userId: 'my-iam-user',               // IAM user created in ElastiCache
    clusterName: 'my-cache',             // ElastiCache cluster name
    refreshIntervalSeconds: 300,          // Token refresh interval (default: 300s)
  } satisfies IamCredentials,
};

// --- 2. MemoryDB with IAM ---

const memorydbConnection: ConnectionOptions = {
  addresses: [{ host: 'my-memorydb.abc123.memorydb.us-east-1.amazonaws.com', port: 6379 }],
  useTLS: true,
  credentials: {
    type: 'iam',
    serviceType: 'memorydb',
    region: REGION,
    userId: 'my-iam-user',
    clusterName: 'my-memorydb',
  } satisfies IamCredentials,
};

// --- 3. ElastiCache cluster mode with IAM + read replicas ---

const clusterConnection: ConnectionOptions = {
  addresses: [
    { host: 'my-cluster.abc123.clustercfg.use1.cache.amazonaws.com', port: 6379 },
  ],
  useTLS: true,
  clusterMode: true,
  credentials: {
    type: 'iam',
    serviceType: 'elasticache',
    region: REGION,
    userId: 'my-iam-user',
    clusterName: 'my-cluster',
  } satisfies IamCredentials,
  readFrom: 'preferReplica', // Route reads to replicas for lower latency
};

// --- Pick which connection to use ---
// For this demo, we default to the ElastiCache serverless connection.
// Change this to memorydbConnection or clusterConnection as needed.

const connection = elasticacheConnection;

// --- Queue + Worker (same API as always) ---
// IAM auth is transparent - once the connection is configured, everything
// works exactly the same as password auth or no auth.

const queue = new Queue('iam-demo', { connection });

const worker = new Worker('iam-demo', async (job: Job) => {
  console.log(`[worker] Processing ${job.name}: ${JSON.stringify(job.data)}`);
  await setTimeout(50);
  const creds = connection.credentials;
  const region = creds && 'type' in creds && creds.type === 'iam' ? creds.region : 'unknown';
  return { processed: true, region };
}, { connection, concurrency: 2 });

worker.on('completed', (job, result) => {
  console.log(`[worker] Completed ${job.id}:`, result);
});
worker.on('error', (err) => console.error('[worker] Error:', err));

// --- Produce jobs ---

console.log('Adding 3 jobs via IAM-authenticated connection...\n');
await queue.add('compute', { value: 1 });
await queue.add('compute', { value: 2 });
await queue.add('compute', { value: 3 });

await setTimeout(1000);

const counts = await queue.getJobCounts();
console.log('\nJob counts:', counts);

// --- Shutdown ---
await worker.close();
await queue.close();
console.log('Done.');

View full source


HTTP Proxy

Cross-language job enqueuing via the glide-mq HTTP proxy. Any HTTP client (Python, Go, Ruby, curl) can enqueue jobs through a simple REST API. The proxy is an Express app that maps HTTP requests to queue operations.

typescript
import { createProxyServer } from 'glide-mq/proxy';
import { Worker } from 'glide-mq';
import type { Job } from 'glide-mq';
import { setTimeout } from 'timers/promises';

const connection = { addresses: [{ host: 'localhost', port: 6379 }] };

// --- 1. Start the HTTP proxy ---
// createProxyServer returns an Express app that maps HTTP requests to queue ops.
// Any language that can make HTTP calls can enqueue and query jobs.
// NOTE: Add authentication middleware (API key, JWT) before production use.

const proxy = createProxyServer({
  connection,
  queues: ['emails', 'orders'], // optional allowlist
});

const PORT = 3456;
const server = await new Promise<ReturnType<typeof proxy.app.listen>>((resolve) => {
  const s = proxy.app.listen(PORT, () => {
    console.log(`HTTP proxy listening on http://localhost:${PORT}`);
    console.log('Endpoints:');
    console.log('  POST   /queues/:name/jobs       - add a job');
    console.log('  POST   /queues/:name/jobs/bulk   - add jobs in bulk');
    console.log('  GET    /queues/:name/jobs/:id    - get job by ID');
    console.log('  GET    /queues/:name/counts      - get job counts');
    console.log('  GET    /health                   - health check\n');
    resolve(s);
  });
});

// --- 2. Workers process jobs as usual ---
// The proxy only enqueues - workers are separate Node.js processes.

const emailWorker = new Worker('emails', async (job: Job) => {
  console.log(`[email] Sending to ${job.data.to}: ${job.data.subject}`);
  return { sent: true };
}, { connection, concurrency: 2 });

emailWorker.on('error', (err) => console.error('[email] Error:', err));

const orderWorker = new Worker('orders', async (job: Job) => {
  console.log(`[order] Processing ${job.data.orderId}: $${job.data.amount}`);
  return { processed: true, orderId: job.data.orderId };
}, { connection, concurrency: 2 });

orderWorker.on('error', (err) => console.error('[order] Error:', err));

// --- 3. Demo: enqueue via HTTP (simulating a Python/Go/Ruby client) ---

async function httpPost(path: string, body: unknown): Promise<unknown> {
  const res = await fetch(`http://localhost:${PORT}${path}`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(body),
  });
  return res.json();
}

async function httpGet(path: string): Promise<unknown> {
  const res = await fetch(`http://localhost:${PORT}${path}`);
  return res.json();
}

// Add a single job
const emailResult = await httpPost('/queues/emails/jobs', {
  name: 'welcome',
  data: { to: 'alice@example.com', subject: 'Welcome!' },
});
console.log('POST /queues/emails/jobs:', emailResult);

// Add jobs in bulk
const bulkResult = await httpPost('/queues/orders/jobs/bulk', {
  jobs: [
    { name: 'process', data: { orderId: 'ORD-001', amount: 99.99 } },
    { name: 'process', data: { orderId: 'ORD-002', amount: 24.50 } },
    { name: 'process', data: { orderId: 'ORD-003', amount: 150.00 } },
  ],
});
console.log('POST /queues/orders/jobs/bulk:', bulkResult);

// Wait for processing
await setTimeout(500);

// Query job counts
const emailCounts = await httpGet('/queues/emails/counts');
console.log('\nGET /queues/emails/counts:', emailCounts);

const orderCounts = await httpGet('/queues/orders/counts');
console.log('GET /queues/orders/counts:', orderCounts);

// Health check
const health = await httpGet('/health');
console.log('GET /health:', health);

// Queue not in allowlist
const forbidden = await fetch(`http://localhost:${PORT}/queues/secret/jobs`, {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ name: 'hack' }),
});
console.log(`\nPOST /queues/secret/jobs: ${forbidden.status}`, await forbidden.json());

// --- Shutdown ---
await new Promise<void>((resolve) => server.close(() => resolve()));
await emailWorker.close();
await orderWorker.close();
await proxy.close();
console.log('\nDone.');

View full source


Valkey Cluster

Running glide-mq with a Valkey/Redis cluster. The only code change vs. standalone is clusterMode: true in ConnectionOptions. All glide-mq keys are hash-tagged (glide:{queueName}:*) so they always land in the same hash slot -- no cross-slot issues.

typescript
import { Queue, Worker } from 'glide-mq';
import type { Job, ConnectionOptions } from 'glide-mq';
import { setTimeout } from 'timers/promises';

// --- Valkey/Redis Cluster Connection ---
// glide-mq uses hash-tagged keys (glide:{queueName}:*) so all keys for a
// queue land in the same hash slot - no cross-slot issues.

const clusterConnection: ConnectionOptions = {
  addresses: [
    { host: 'localhost', port: 7000 },
    { host: 'localhost', port: 7001 },
    { host: 'localhost', port: 7002 },
  ],
  clusterMode: true,
};

// --- Queue + Worker (same API as standalone) ---
// The only difference is `clusterMode: true` in the connection options.
// All queue operations, Lua functions, and streams work identically.

const queue = new Queue('cluster-demo', { connection: clusterConnection });

const worker = new Worker('cluster-demo', async (job: Job) => {
  console.log(`[worker] Processing ${job.name} on cluster: ${JSON.stringify(job.data)}`);
  await setTimeout(50);
  return { processed: true, node: 'some-cluster-node' };
}, { connection: clusterConnection, concurrency: 3 });

worker.on('completed', (job, result) => {
  console.log(`[worker] Completed ${job.id}:`, result);
});
worker.on('error', (err) => console.error('[worker] Error:', err));

// --- Produce jobs ---

console.log('Adding 5 jobs to cluster queue...\n');
for (let i = 0; i < 5; i++) {
  await queue.add('compute', { index: i, payload: `data-${i}` });
}

// Wait for processing
await setTimeout(1000);

const counts = await queue.getJobCounts();
console.log('\nJob counts:', counts);

// --- Scheduler on cluster ---
// Schedulers also work on cluster mode - keys are hash-tagged.

await queue.upsertJobScheduler(
  'cluster-heartbeat',
  { every: 500 },
  { name: 'heartbeat', data: { source: 'cluster' } },
);
console.log('Scheduler added on cluster.');

await setTimeout(2000);

const schedulerCounts = await queue.getJobCounts();
console.log('After scheduler:', schedulerCounts);

await queue.removeJobScheduler('cluster-heartbeat');

// --- Shutdown ---
await worker.close();
await queue.close();
console.log('\nDone.');

View full source

Released under the Apache-2.0 License.