Skip to main content

Express Integration

Middleware and utilities for Express.js applications.

Installation​

npm install @saga-bus/express @saga-bus/core express

Basic Usage​

import express from 'express';
import { createBus } from '@saga-bus/core';
import { sagaBusMiddleware, createHealthRouter, setupGracefulShutdown } from '@saga-bus/express';

const app = express();
app.use(express.json());

// Create bus
const bus = createBus({
transport,
store,
sagas: [{ definition: orderSaga }],
});

// Add middleware
app.use(sagaBusMiddleware({ bus }));

// Add health check routes
app.use('/health', createHealthRouter(bus));

// Start server
const server = app.listen(3000, async () => {
await bus.start();
console.log('Server started');
});

// Graceful shutdown
setupGracefulShutdown(bus, server);

Middleware​

Request Context​

import { sagaBusMiddleware, getBus } from '@saga-bus/express';

app.use(sagaBusMiddleware({ bus }));

app.post('/orders', async (req, res) => {
const bus = getBus(req);

await bus.publish({
type: 'OrderSubmitted',
orderId: generateId(),
...req.body,
});

res.json({ success: true });
});

Correlation ID​

import { correlationMiddleware } from '@saga-bus/express';

// Automatically extracts/generates correlation ID
app.use(correlationMiddleware());

app.post('/orders', async (req, res) => {
// Correlation ID available in req.correlationId
await bus.publish({
type: 'OrderSubmitted',
correlationId: req.correlationId,
...req.body,
});
});

Health Checks​

Basic Health Router​

import { createHealthRouter } from '@saga-bus/express';

app.use('/health', createHealthRouter(bus));

// Endpoints:
// GET /health - Overall health
// GET /health/live - Liveness probe
// GET /health/ready - Readiness probe

Custom Health Checks​

import { createHealthRouter } from '@saga-bus/express';

app.use('/health', createHealthRouter(bus, {
checks: {
database: async () => {
await pool.query('SELECT 1');
return { status: 'healthy' };
},
redis: async () => {
await redis.ping();
return { status: 'healthy' };
},
},
includeDetails: process.env.NODE_ENV !== 'production',
}));

Health Response​

{
"status": "healthy",
"timestamp": "2024-01-15T10:30:00.000Z",
"checks": {
"transport": { "status": "healthy" },
"store": { "status": "healthy" },
"database": { "status": "healthy" }
}
}

Graceful Shutdown​

import { setupGracefulShutdown } from '@saga-bus/express';

const server = app.listen(3000);

setupGracefulShutdown(bus, server, {
timeout: 30000, // Max 30s for shutdown
signals: ['SIGTERM', 'SIGINT'],
onShutdown: async () => {
console.log('Shutting down...');
await closeOtherConnections();
},
});

Manual Shutdown​

import { gracefulShutdown } from '@saga-bus/express';

process.on('SIGTERM', async () => {
await gracefulShutdown(bus, server, { timeout: 30000 });
process.exit(0);
});

Error Handling​

Error Middleware​

import { sagaErrorHandler } from '@saga-bus/express';

// Add after all routes
app.use(sagaErrorHandler({
logger: console,
includeStack: process.env.NODE_ENV !== 'production',
}));

Custom Error Handler​

app.post('/orders', async (req, res, next) => {
try {
await bus.publish({ type: 'OrderSubmitted', ...req.body });
res.json({ success: true });
} catch (error) {
next(error);
}
});

app.use((error, req, res, next) => {
if (error.name === 'SagaPublishError') {
return res.status(503).json({
error: 'Service temporarily unavailable',
});
}
next(error);
});

Full Example​

import express from 'express';
import { createBus } from '@saga-bus/core';
import { RabbitMqTransport } from '@saga-bus/transport-rabbitmq';
import { PostgresSagaStore } from '@saga-bus/store-postgres';
import {
sagaBusMiddleware,
correlationMiddleware,
createHealthRouter,
setupGracefulShutdown,
sagaErrorHandler,
} from '@saga-bus/express';

const app = express();
app.use(express.json());
app.use(correlationMiddleware());

// Create bus
const transport = new RabbitMqTransport({ url: process.env.RABBITMQ_URL });
const store = new PostgresSagaStore({ connectionString: process.env.DATABASE_URL });

const bus = createBus({
transport,
store,
sagas: [
{ definition: orderSaga },
{ definition: paymentSaga },
],
});

// Middleware
app.use(sagaBusMiddleware({ bus }));

// Health
app.use('/health', createHealthRouter(bus));

// Routes
app.post('/orders', async (req, res) => {
const orderId = crypto.randomUUID();

await bus.publish({
type: 'OrderSubmitted',
orderId,
correlationId: req.correlationId,
...req.body,
});

res.status(201).json({ orderId });
});

app.get('/orders/:id', async (req, res) => {
const state = await store.getByCorrelationId('OrderSaga', req.params.id);

if (!state) {
return res.status(404).json({ error: 'Not found' });
}

res.json(state);
});

// Error handling
app.use(sagaErrorHandler());

// Start
const server = app.listen(3000, async () => {
await bus.start();
console.log('Server running on port 3000');
});

setupGracefulShutdown(bus, server);

Docker Setup​

FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
EXPOSE 3000
CMD ["node", "dist/index.js"]

See Also​