Skip to main content

NATS JetStream Transport

Cloud-native transport using NATS JetStream for persistent messaging.

Installation​

npm install @saga-bus/transport-nats nats

Basic Usage​

import { NatsTransport } from '@saga-bus/transport-nats';

const transport = new NatsTransport({
servers: ['localhost:4222'],
});

const bus = createBus({
transport,
store,
sagas: [{ definition: orderSaga }],
});

await bus.start();

Configuration​

OptionTypeDefaultDescription
serversstring[]RequiredNATS server addresses
userstring-Username for auth
passstring-Password for auth
tokenstring-Token for auth
nkeystring-NKey seed for auth
streamPrefixstring'saga-bus'Prefix for stream names
consumerPrefixstring'saga-bus'Prefix for consumers
ackWaitnumber30000Ack wait time (ms)
maxDelivernumber3Max delivery attempts

Full Configuration Example​

import { NatsTransport } from '@saga-bus/transport-nats';

const transport = new NatsTransport({
// Connection
servers: [
'nats://nats-1.example.com:4222',
'nats://nats-2.example.com:4222',
'nats://nats-3.example.com:4222',
],

// Authentication
user: process.env.NATS_USER,
pass: process.env.NATS_PASSWORD,

// Or token auth
token: process.env.NATS_TOKEN,

// TLS
tls: {
caFile: '/path/to/ca.pem',
certFile: '/path/to/cert.pem',
keyFile: '/path/to/key.pem',
},

// JetStream settings
streamPrefix: 'orders',
consumerPrefix: 'order-service',

// Delivery settings
ackWait: 30000,
maxDeliver: 5,
});

Stream Naming​

JetStream streams and consumers are created automatically:

Streams:
{streamPrefix}_{SagaName}

Subjects:
{streamPrefix}.{SagaName}.{MessageType}

Consumers:
{consumerPrefix}_{SagaName}

For example:

  • Stream: orders_OrderSaga
  • Subject: orders.OrderSaga.OrderSubmitted
  • Consumer: order-service_OrderSaga

JetStream Features​

Persistence​

Messages are persisted to disk:

const transport = new NatsTransport({
servers: ['localhost:4222'],
storage: 'file', // 'file' or 'memory'
replicas: 3, // Number of replicas
});

Message Replay​

Replay messages from a specific point:

// Built-in support for:
// - Replay by sequence number
// - Replay by time
// - Replay all messages

Exactly-Once Delivery​

Enable exactly-once semantics:

const transport = new NatsTransport({
servers: ['localhost:4222'],
deliverPolicy: 'exactly_once',
});

Authentication Methods​

Username/Password​

const transport = new NatsTransport({
servers: ['localhost:4222'],
user: 'myuser',
pass: 'mypassword',
});

Token​

const transport = new NatsTransport({
servers: ['localhost:4222'],
token: process.env.NATS_TOKEN,
});

NKey​

import { fromSeed } from 'nats';

const transport = new NatsTransport({
servers: ['localhost:4222'],
authenticator: nkeyAuthenticator(
new TextEncoder().encode(process.env.NATS_NKEY_SEED)
),
});

JWT/Creds File​

const transport = new NatsTransport({
servers: ['localhost:4222'],
credsFile: '/path/to/user.creds',
});

Docker Setup​

# docker-compose.yml
services:
nats:
image: nats:latest
ports:
- "4222:4222"
- "8222:8222" # Monitoring
command:
- "--jetstream"
- "--store_dir=/data"
volumes:
- nats_data:/data

volumes:
nats_data:

NATS Cluster​

For clustered deployments:

# docker-compose.yml
services:
nats-1:
image: nats:latest
command:
- "--cluster_name=saga-cluster"
- "--cluster=nats://0.0.0.0:6222"
- "--routes=nats://nats-2:6222,nats://nats-3:6222"
- "--jetstream"

nats-2:
image: nats:latest
command:
- "--cluster_name=saga-cluster"
- "--cluster=nats://0.0.0.0:6222"
- "--routes=nats://nats-1:6222,nats://nats-3:6222"
- "--jetstream"

nats-3:
image: nats:latest
command:
- "--cluster_name=saga-cluster"
- "--cluster=nats://0.0.0.0:6222"
- "--routes=nats://nats-1:6222,nats://nats-2:6222"
- "--jetstream"

Message Acknowledgment​

Configure acknowledgment behavior:

const transport = new NatsTransport({
servers: ['localhost:4222'],

// Wait time before redelivery
ackWait: 30000, // 30 seconds

// Ack policy
ackPolicy: 'explicit', // 'none', 'all', 'explicit'
});

Dead Letter Queue​

Handle failed messages:

const transport = new NatsTransport({
servers: ['localhost:4222'],
maxDeliver: 3, // Move to DLQ after 3 attempts
});

// Failed messages go to:
// {streamPrefix}.dlq

Best Practices​

Use Multiple Servers​

// Connect to multiple servers for HA
const transport = new NatsTransport({
servers: [
'nats://nats-1:4222',
'nats://nats-2:4222',
'nats://nats-3:4222',
],
});

Configure Appropriate Replicas​

// For production, use 3+ replicas
const transport = new NatsTransport({
servers: ['localhost:4222'],
replicas: 3,
});

Monitor with NATS CLI​

# Check stream info
nats stream info orders_OrderSaga

# Check consumer info
nats consumer info orders_OrderSaga order-service_OrderSaga

# View pending messages
nats consumer next orders_OrderSaga order-service_OrderSaga --count 10

Performance Tuning​

const transport = new NatsTransport({
servers: ['localhost:4222'],

// Batch fetching
maxMessages: 100,
maxBytes: 1024 * 1024, // 1MB

// Connection settings
maxReconnectAttempts: -1, // Infinite
reconnectTimeWait: 2000,
});

Leafnodes​

For edge deployments:

// Connect to leafnode
const transport = new NatsTransport({
servers: ['nats://leafnode:4222'],
});

See Also​