Skip to main content

NestJS Integration

Full dependency injection support with decorators for NestJS applications.

Installation​

npm install @saga-bus/nestjs @saga-bus/core

Basic Usage​

import { Module } from '@nestjs/common';
import { SagaBusModule } from '@saga-bus/nestjs';

@Module({
imports: [
SagaBusModule.forRoot({
transport: {
type: 'rabbitmq',
url: process.env.RABBITMQ_URL,
},
store: {
type: 'postgres',
connectionString: process.env.DATABASE_URL,
},
}),
],
})
export class AppModule {}

Configuration​

Synchronous Configuration​

SagaBusModule.forRoot({
transport: {
type: 'rabbitmq',
url: 'amqp://localhost:5672',
},
store: {
type: 'postgres',
connectionString: 'postgresql://localhost/sagas',
},
middleware: [
{ type: 'logging', level: 'info' },
{ type: 'tracing', serviceName: 'order-service' },
],
})

Async Configuration​

SagaBusModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
transport: {
type: 'rabbitmq',
url: config.get('RABBITMQ_URL'),
},
store: {
type: 'postgres',
connectionString: config.get('DATABASE_URL'),
},
}),
inject: [ConfigService],
})

Defining Sagas​

Using Decorators​

import { Injectable } from '@nestjs/common';
import { Saga, SagaHandler, InitialState, CorrelatedBy } from '@saga-bus/nestjs';

interface OrderState {
orderId: string;
status: string;
customerId: string;
}

@Injectable()
@Saga('OrderSaga')
export class OrderSaga {
@InitialState()
createInitialState(): OrderState {
return {
orderId: '',
status: 'pending',
customerId: '',
};
}

@CorrelatedBy('orderId')
@SagaHandler('OrderSubmitted', { initiates: true })
async handleOrderSubmitted(context: SagaContext<OrderState, OrderSubmitted>) {
context.setState({
orderId: context.message.orderId,
status: 'submitted',
customerId: context.message.customerId,
});

await context.publish({
type: 'RequestPayment',
orderId: context.message.orderId,
amount: context.message.total,
});
}

@SagaHandler('PaymentCaptured')
async handlePaymentCaptured(context: SagaContext<OrderState, PaymentCaptured>) {
context.setState({ status: 'paid' });

await context.publish({
type: 'ReserveInventory',
orderId: context.state.orderId,
});
}

@SagaHandler('InventoryReserved')
async handleInventoryReserved(context: SagaContext<OrderState, InventoryReserved>) {
context.setState({ status: 'completed' });
context.complete();
}
}

Registering Sagas​

@Module({
imports: [
SagaBusModule.forRoot({ ... }),
SagaBusModule.forFeature([OrderSaga, PaymentSaga]),
],
providers: [OrderSaga, PaymentSaga],
})
export class OrderModule {}

Publishing Messages​

Inject Bus Service​

import { Injectable } from '@nestjs/common';
import { SagaBusService } from '@saga-bus/nestjs';

@Injectable()
export class OrderService {
constructor(private readonly sagaBus: SagaBusService) {}

async createOrder(data: CreateOrderDto) {
await this.sagaBus.publish({
type: 'OrderSubmitted',
orderId: generateId(),
customerId: data.customerId,
items: data.items,
total: data.total,
});
}
}

In Controllers​

import { Controller, Post, Body } from '@nestjs/common';
import { SagaBusService } from '@saga-bus/nestjs';

@Controller('orders')
export class OrderController {
constructor(private readonly sagaBus: SagaBusService) {}

@Post()
async createOrder(@Body() dto: CreateOrderDto) {
const orderId = generateId();

await this.sagaBus.publish({
type: 'OrderSubmitted',
orderId,
...dto,
});

return { orderId };
}
}

Health Checks​

Built-in Health Indicator​

import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { SagaBusModule, SagaBusHealthIndicator } from '@saga-bus/nestjs';

@Module({
imports: [
TerminusModule,
SagaBusModule.forRoot({ ... }),
],
providers: [SagaBusHealthIndicator],
})
export class HealthModule {}

Health Controller​

import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { SagaBusHealthIndicator } from '@saga-bus/nestjs';

@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private sagaBus: SagaBusHealthIndicator,
) {}

@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.sagaBus.isHealthy('saga-bus'),
]);
}
}

Dependency Injection in Sagas​

@Injectable()
@Saga('OrderSaga')
export class OrderSaga {
constructor(
private readonly orderService: OrderService,
private readonly paymentGateway: PaymentGateway,
private readonly notificationService: NotificationService,
) {}

@SagaHandler('OrderSubmitted', { initiates: true })
async handleOrderSubmitted(context: SagaContext<OrderState, OrderSubmitted>) {
// Use injected services
const order = await this.orderService.create(context.message);
await this.notificationService.sendOrderConfirmation(order);

context.setState({ orderId: order.id, status: 'submitted' });
}
}

Testing​

import { Test } from '@nestjs/testing';
import { SagaBusModule, TestSagaBusModule } from '@saga-bus/nestjs';

describe('OrderSaga', () => {
let module: TestingModule;
let sagaBus: SagaBusService;

beforeEach(async () => {
module = await Test.createTestingModule({
imports: [
TestSagaBusModule.forRoot(), // Uses in-memory transport/store
],
providers: [OrderSaga],
}).compile();

sagaBus = module.get(SagaBusService);
});

it('processes order flow', async () => {
await sagaBus.publish({ type: 'OrderSubmitted', orderId: '123' });

// Wait for processing
await sagaBus.waitForSaga('OrderSaga', '123');

const state = await sagaBus.getSagaState('OrderSaga', '123');
expect(state.status).toBe('submitted');
});
});

See Also​