MongoDB Store
Document-oriented store using MongoDB with flexible schema support.
Installation​
npm install @saga-bus/store-mongodb mongodb
Basic Usage​
import { MongoSagaStore } from '@saga-bus/store-mongodb';
const store = new MongoSagaStore({
connectionString: 'mongodb://localhost:27017',
database: 'sagas',
});
const bus = createBus({
transport,
store,
sagas: [{ definition: orderSaga }],
});
await bus.start();
Configuration​
| Option | Type | Default | Description |
|---|---|---|---|
connectionString | string | Required | MongoDB connection string |
database | string | Required | Database name |
collection | string | 'sagas' | Collection name |
client | MongoClient | - | Existing MongoClient |
Full Configuration Example​
import { MongoSagaStore, createIndexes } from '@saga-bus/store-mongodb';
import { MongoClient } from 'mongodb';
// Option 1: Connection string
const store = new MongoSagaStore({
connectionString: process.env.MONGODB_URL,
database: 'myapp',
collection: 'saga_instances',
});
// Option 2: Existing client
const client = new MongoClient(process.env.MONGODB_URL, {
maxPoolSize: 50,
minPoolSize: 10,
retryWrites: true,
w: 'majority',
});
await client.connect();
const store = new MongoSagaStore({
client,
database: 'myapp',
collection: 'saga_instances',
});
// Create indexes
await createIndexes(client.db('myapp').collection('saga_instances'));
Index Setup​
Automatic​
import { createIndexes } from '@saga-bus/store-mongodb';
await createIndexes(store.collection);
Manual​
// Create indexes via mongosh
db.sagas.createIndex(
{ sagaName: 1, sagaId: 1 },
{ unique: true }
);
db.sagas.createIndex(
{ sagaName: 1, correlationId: 1 }
);
db.sagas.createIndex(
{ sagaName: 1, isCompleted: 1 }
);
db.sagas.createIndex(
{ createdAt: 1 },
{ expireAfterSeconds: 2592000 } // 30 days TTL
);
Document Schema​
interface SagaDocument {
_id: ObjectId;
sagaName: string;
sagaId: string;
correlationId: string;
version: number;
state: Record<string, unknown>;
isCompleted: boolean;
createdAt: Date;
updatedAt: Date;
}
MongoDB Atlas​
For MongoDB Atlas:
const store = new MongoSagaStore({
connectionString: 'mongodb+srv://user:password@cluster.mongodb.net/?retryWrites=true&w=majority',
database: 'sagas',
});
Replica Set​
For replica sets:
const store = new MongoSagaStore({
connectionString: 'mongodb://mongo1:27017,mongo2:27017,mongo3:27017/sagas?replicaSet=rs0',
database: 'sagas',
});
Docker Setup​
# docker-compose.yml
services:
mongodb:
image: mongo:7
ports:
- "27017:27017"
environment:
MONGO_INITDB_DATABASE: sagas
volumes:
- mongo_data:/data/db
volumes:
mongo_data:
Replica Set for Development​
services:
mongo1:
image: mongo:7
command: mongod --replSet rs0 --bind_ip_all
ports:
- "27017:27017"
mongo-init:
image: mongo:7
depends_on:
- mongo1
command: >
mongosh --host mongo1:27017 --eval "rs.initiate({_id: 'rs0', members: [{_id: 0, host: 'mongo1:27017'}]})"
Optimistic Concurrency​
Version-based updates with findOneAndUpdate:
// Atomic update with version check
const result = await collection.findOneAndUpdate(
{
sagaName: 'OrderSaga',
sagaId: '123',
version: expectedVersion,
},
{
$set: { state, updatedAt: new Date() },
$inc: { version: 1 },
}
);
if (!result.value) {
throw new ConcurrencyError();
}
TTL for Completed Sagas​
Automatically remove completed sagas:
// Create TTL index on completed sagas
db.sagas.createIndex(
{ completedAt: 1 },
{ expireAfterSeconds: 2592000, partialFilterExpression: { isCompleted: true } }
);
Best Practices​
Use Write Concern​
const client = new MongoClient(url, {
w: 'majority',
journal: true,
});
Configure Read Preference​
const client = new MongoClient(url, {
readPreference: 'primaryPreferred',
});
Enable Compression​
const client = new MongoClient(url, {
compressors: ['zstd', 'snappy'],
});