Skip to main content

Core API Reference

Complete API reference for @saga-bus/core.

Functions​

defineSaga<TState, TMessages>(options)​

Creates a saga definition with typed state and message handlers.

import { defineSaga } from '@saga-bus/core';

interface OrderState {
orderId: string;
status: string;
}

const orderSaga = defineSaga<OrderState>({
name: 'OrderSaga',
initialState: () => ({
orderId: '',
status: 'initial',
}),
correlationId: (message) => message.orderId,
handlers: {
OrderSubmitted: async (ctx) => {
// Handler implementation
},
},
});

Parameters:

ParameterTypeDescription
namestringUnique saga identifier
initialState() => TStateFactory function returning initial state
correlationId(message: any) => stringExtract correlation ID from messages
handlersRecord<string, Handler>Message type to handler mapping

Returns: SagaDefinition<TState>


createBus(config)​

Creates and configures a message bus instance.

import { createBus } from '@saga-bus/core';

const bus = createBus({
transport,
store,
sagas: [{ definition: orderSaga }],
middleware: [loggingMiddleware],
concurrency: 10,
});

Parameters:

ParameterTypeRequiredDescription
transportTransportYesMessage transport implementation
storeSagaStoreNoState persistence store
sagasSagaRegistration[]YesSaga definitions to register
middlewareMiddleware[]NoMiddleware pipeline
concurrencynumberNoMax concurrent message processing (default: 10)
retryRetryConfigNoRetry configuration
loggerLoggerNoCustom logger

Returns: Bus


createMiddleware(options)​

Creates a custom middleware.

import { createMiddleware } from '@saga-bus/core';

const loggingMiddleware = createMiddleware({
name: 'logging',
beforeHandle: async ({ message, context }) => {
console.log('Processing:', message.type);
},
afterHandle: async ({ message, context, result }) => {
console.log('Completed:', message.type);
},
onError: async ({ error, message, context }) => {
console.error('Error:', error.message);
},
});

Parameters:

ParameterTypeDescription
namestringMiddleware identifier
beforeHandle(ctx) => Promise<void>Called before handler execution
afterHandle(ctx) => Promise<void>Called after successful execution
onError(ctx) => Promise<void>Called on handler error

Returns: Middleware


createHandler(options)​

Creates a standalone message handler (not part of a saga).

import { createHandler } from '@saga-bus/core';

const paymentHandler = createHandler({
messageType: 'PaymentRequested',
handler: async (ctx) => {
const result = await paymentService.capture(ctx.message);
ctx.publish({
type: 'PaymentCaptured',
transactionId: result.transactionId,
});
},
});

Classes​

Bus​

Main message bus class.

Methods​

start(): Promise<void>​

Start the bus and begin processing messages.

await bus.start();
stop(): Promise<void>​

Stop the bus and cease message processing.

await bus.stop();
drain(options?): Promise<void>​

Wait for in-flight messages to complete before stopping.

await bus.drain({ timeout: 30000 });
isRunning(): boolean​

Check if the bus is currently running.

if (bus.isRunning()) {
console.log('Bus is active');
}
publish<T>(message: T, options?): Promise<void>​

Publish a message to the transport.

await bus.publish({
type: 'OrderSubmitted',
orderId: '123',
total: 99.99,
});

// With options
await bus.publish(message, {
correlationId: 'custom-id',
metadata: { userId: 'user-123' },
});

Options:

OptionTypeDescription
correlationIdstringOverride correlation ID
metadataRecord<string, any>Additional message metadata
delaynumberDelay delivery (ms)
getSagaState(sagaName: string, correlationId: string): Promise<TState | null>​

Get the current state of a saga instance.

const state = await bus.getSagaState('OrderSaga', 'order-123');
if (state) {
console.log('Order status:', state.status);
}

Interfaces​

SagaContext<TState>​

Context passed to saga handlers.

interface SagaContext<TState> {
// Current saga state
state: TState;

// Incoming message
message: any;

// Update saga state
setState(newState: TState): void;

// Publish a message
publish(message: any): void;

// Mark saga as completed
complete(): void;

// Schedule a timeout
scheduleTimeout(options: TimeoutOptions): void;

// Cancel a timeout
cancelTimeout(timeoutId: string): void;

// Correlation ID for this saga instance
correlationId: string;

// Saga name
sagaName: string;

// Message metadata
metadata: Map<string, any>;

// Logger instance
logger: Logger;
}

Methods​

setState(newState: TState): void​

Update the saga's persisted state.

ctx.setState({
...ctx.state,
status: 'paid',
transactionId: ctx.message.transactionId,
});
publish(message: any): void​

Publish a new message.

ctx.publish({
type: 'PaymentRequested',
orderId: ctx.state.orderId,
amount: ctx.state.total,
});
complete(): void​

Mark the saga as completed (no more messages will be processed).

ctx.setState({ ...ctx.state, status: 'completed' });
ctx.complete();
scheduleTimeout(options): void​

Schedule a delayed message.

ctx.scheduleTimeout({
type: 'PaymentTimeout',
orderId: ctx.state.orderId,
delay: 15 * 60 * 1000, // 15 minutes
});

Transport​

Interface for message transport implementations.

interface Transport {
// Connect to the transport
connect(): Promise<void>;

// Disconnect from the transport
disconnect(): Promise<void>;

// Subscribe to messages
subscribe(
handler: MessageHandler,
options?: SubscribeOptions
): Promise<Subscription>;

// Publish a message
publish(message: any, options?: PublishOptions): Promise<void>;

// Check connection status
isConnected(): boolean;
}

SagaStore<TState>​

Interface for saga state persistence.

interface SagaStore<TState> {
// Get saga state by correlation ID
getByCorrelationId(
sagaName: string,
correlationId: string
): Promise<SagaRecord<TState> | null>;

// Insert new saga state
insert(
sagaName: string,
correlationId: string,
state: TState
): Promise<void>;

// Update existing saga state
update(
sagaName: string,
correlationId: string,
state: TState,
version: number
): Promise<void>;

// Mark saga as completed
complete(
sagaName: string,
correlationId: string
): Promise<void>;

// Close store connection
close(): Promise<void>;
}

BusConfig​

Configuration options for createBus().

interface BusConfig {
// Message transport
transport: Transport;

// State store (optional for stateless handlers)
store?: SagaStore;

// Saga registrations
sagas: SagaRegistration[];

// Middleware pipeline
middleware?: Middleware[];

// Concurrent message processing limit
concurrency?: number;

// Retry configuration
retry?: RetryConfig;

// Custom logger
logger?: Logger;

// Metrics collector
metrics?: MetricsCollector;

// Error handler
errorHandler?: ErrorHandler;
}

RetryConfig​

Configuration for automatic retries.

interface RetryConfig {
// Maximum retry attempts
maxAttempts: number;

// Initial retry delay (ms)
initialDelay: number;

// Maximum retry delay (ms)
maxDelay: number;

// Backoff multiplier
backoffMultiplier: number;

// Add random jitter to delays
jitter: boolean;
}

Defaults:

const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxAttempts: 5,
initialDelay: 1000,
maxDelay: 60000,
backoffMultiplier: 2,
jitter: true,
};

PublishOptions​

Options for publishing messages.

interface PublishOptions {
// Override correlation ID
correlationId?: string;

// Additional metadata
metadata?: Record<string, any>;

// Delay delivery (ms)
delay?: number;

// Partition key (for partitioned transports)
partitionKey?: string;

// Message priority
priority?: number;
}

Constants​

DEFAULT_RETRY_POLICY​

const DEFAULT_RETRY_POLICY = {
maxRetries: 5,
initialDelayMs: 1000,
maxDelayMs: 30000,
backoffMultiplier: 2,
};

DEFAULT_TIMEOUT_BOUNDS​

const DEFAULT_TIMEOUT_BOUNDS = {
minTimeoutMs: 1000,
maxTimeoutMs: 7 * 24 * 60 * 60 * 1000, // 7 days
};

DEFAULT_CONCURRENCY​

const DEFAULT_CONCURRENCY = 10;

See Also​