Skip to main content

Kafka Transport

High-throughput transport using Apache Kafka with consumer groups.

Installation​

npm install @saga-bus/transport-kafka kafkajs

Basic Usage​

import { KafkaTransport } from '@saga-bus/transport-kafka';

const transport = new KafkaTransport({
clientId: 'my-app',
brokers: ['localhost:9092'],
});

const bus = createBus({
transport,
store,
sagas: [{ definition: orderSaga }],
});

await bus.start();

Configuration​

OptionTypeDefaultDescription
clientIdstringRequiredKafka client identifier
brokersstring[]RequiredKafka broker addresses
groupIdstring'saga-bus'Consumer group ID
topicPrefixstring'saga-bus'Prefix for topic names
sslboolean | objectfalseSSL/TLS configuration
saslobject-SASL authentication
connectionTimeoutnumber10000Connection timeout (ms)
requestTimeoutnumber30000Request timeout (ms)

Full Configuration Example​

const transport = new KafkaTransport({
clientId: 'order-service',
brokers: [
'kafka-1.example.com:9092',
'kafka-2.example.com:9092',
'kafka-3.example.com:9092',
],
groupId: 'order-saga-consumers',
topicPrefix: 'orders',

// SSL configuration
ssl: {
rejectUnauthorized: true,
ca: [fs.readFileSync('/path/to/ca.pem', 'utf-8')],
},

// SASL authentication
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},

// Timeouts
connectionTimeout: 10000,
requestTimeout: 30000,
});

Consumer Groups​

Kafka uses consumer groups for parallel processing:

// Multiple instances share the same group
const transport = new KafkaTransport({
clientId: `order-service-${process.env.INSTANCE_ID}`,
brokers: ['localhost:9092'],
groupId: 'order-saga-group', // Same group = shared consumption
});

Topic Naming​

Topics are created automatically with the pattern:

{topicPrefix}.{sagaName}.{messageType}

For example:

  • orders.OrderSaga.OrderSubmitted
  • orders.OrderSaga.PaymentCaptured

Partitioning​

Messages are partitioned by correlation ID for ordering:

// Messages with same orderId go to same partition
// Ensures in-order processing per saga instance

Docker Setup​

# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

AWS MSK​

For Amazon MSK:

const transport = new KafkaTransport({
clientId: 'my-app',
brokers: [
'b-1.mycluster.kafka.us-east-1.amazonaws.com:9092',
'b-2.mycluster.kafka.us-east-1.amazonaws.com:9092',
],
ssl: true,
sasl: {
mechanism: 'aws',
authorizationIdentity: process.env.AWS_ACCESS_KEY_ID,
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
},
});

Confluent Cloud​

const transport = new KafkaTransport({
clientId: 'my-app',
brokers: ['pkc-xxxxx.us-east-1.aws.confluent.cloud:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: process.env.CONFLUENT_API_KEY,
password: process.env.CONFLUENT_API_SECRET,
},
});

Best Practices​

Use Meaningful Client IDs​

// Include service name and instance
clientId: `${serviceName}-${hostname}-${pid}`

Configure Appropriate Timeouts​

// For high-latency networks
connectionTimeout: 30000,
requestTimeout: 60000,

Monitor Consumer Lag​

Use Kafka tools to monitor consumer group lag:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-saga-group --describe

Error Handling​

Failed messages are sent to a dead letter topic:

{topicPrefix}.{sagaName}.dlq

See Also​