Skip to main content

Redis Streams Transport

Low-latency transport using Redis Streams with consumer groups.

Installation​

npm install @saga-bus/transport-redis ioredis

Basic Usage​

import { RedisTransport } from '@saga-bus/transport-redis';

const transport = new RedisTransport({
url: 'redis://localhost:6379',
});

const bus = createBus({
transport,
store,
sagas: [{ definition: orderSaga }],
});

await bus.start();

Configuration​

OptionTypeDefaultDescription
urlstringRequiredRedis connection URL
hoststring'localhost'Redis host (alternative to URL)
portnumber6379Redis port
passwordstring-Redis password
dbnumber0Database number
streamPrefixstring'saga-bus'Prefix for stream names
consumerGroupstring'saga-bus'Consumer group name
blockTimeoutnumber5000Block timeout for reads (ms)
batchSizenumber10Messages per read

Full Configuration Example​

import { RedisTransport } from '@saga-bus/transport-redis';

const transport = new RedisTransport({
// Connection
url: 'redis://:password@redis-host:6379/0',

// Or individual settings
host: 'redis-host',
port: 6379,
password: process.env.REDIS_PASSWORD,
db: 0,

// TLS
tls: {
rejectUnauthorized: true,
},

// Streams settings
streamPrefix: 'orders',
consumerGroup: 'order-service',

// Processing
blockTimeout: 5000,
batchSize: 10,
});

Stream Naming​

Streams are created automatically:

{streamPrefix}:{SagaName}:{MessageType}

For example:

  • orders:OrderSaga:OrderSubmitted
  • orders:OrderSaga:PaymentCaptured

Consumer Groups​

Messages are distributed across consumers using Redis consumer groups:

// Multiple instances share the same consumer group
const transport = new RedisTransport({
url: 'redis://localhost:6379',
consumerGroup: 'order-service', // Same group = shared consumption
consumerName: `consumer-${process.env.INSTANCE_ID}`,
});

Message Acknowledgment​

Messages are acknowledged after successful processing:

// Automatic acknowledgment on success
// Manual acknowledgment available for complex scenarios

// XACK is called automatically after handler completes

Pending Messages​

Handle pending (unacknowledged) messages:

const transport = new RedisTransport({
url: 'redis://localhost:6379',

// Claim pending messages after timeout
claimTimeout: 30000, // 30 seconds

// Process pending on startup
processPendingOnStart: true,
});

Redis Cluster​

For Redis Cluster deployments:

import { RedisTransport } from '@saga-bus/transport-redis';
import Redis from 'ioredis';

const cluster = new Redis.Cluster([
{ host: 'node1', port: 6379 },
{ host: 'node2', port: 6379 },
{ host: 'node3', port: 6379 },
]);

const transport = new RedisTransport({
client: cluster,
streamPrefix: 'orders',
});

Redis Sentinel​

For high availability with Sentinel:

import Redis from 'ioredis';

const client = new Redis({
sentinels: [
{ host: 'sentinel1', port: 26379 },
{ host: 'sentinel2', port: 26379 },
{ host: 'sentinel3', port: 26379 },
],
name: 'mymaster',
});

const transport = new RedisTransport({
client,
streamPrefix: 'orders',
});

Docker Setup​

# docker-compose.yml
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis_data:/data

volumes:
redis_data:

Stream Trimming​

Configure automatic stream trimming:

const transport = new RedisTransport({
url: 'redis://localhost:6379',

// Trim strategy
trimStrategy: 'MAXLEN',
trimThreshold: 10000, // Keep last 10k messages

// Or approximate trimming (faster)
trimApproximate: true,
});

Best Practices​

Use Appropriate Block Timeout​

// Balance between latency and CPU usage
const transport = new RedisTransport({
url: 'redis://localhost:6379',
blockTimeout: 5000, // 5 seconds is a good default
});

Configure Consumer Names​

// Unique consumer names for tracking
const transport = new RedisTransport({
url: 'redis://localhost:6379',
consumerName: `${hostname}-${pid}`,
});

Monitor Stream Length​

# Check stream length
redis-cli XLEN saga-bus:OrderSaga:OrderSubmitted

# Check pending messages
redis-cli XPENDING saga-bus:OrderSaga:OrderSubmitted order-service

Error Handling​

Failed messages remain in pending:

// Messages are automatically retried
// After max retries, moved to dead letter stream

const transport = new RedisTransport({
url: 'redis://localhost:6379',
maxRetries: 3,
deadLetterStream: 'saga-bus:dlq',
});

Performance Tuning​

const transport = new RedisTransport({
url: 'redis://localhost:6379',

// Batch processing for throughput
batchSize: 100,

// Pipeline commands
enablePipelining: true,

// Connection pool
maxConnections: 10,
});

See Also​