Recap
This is a small continuation of my other work around sagas, in which I first attempted to implement a orchestration based saga, but resorted to a choreography based saga. I outlined all of the patterns and techniques revolving around robust microservice communication, and some of the challenges I had along the way in another post. Although the implementation worked, I was not satisfied with both the code on an organizational level, and the choreography based approach. For this implementation, I've decided to utilize all of the same robustness techniques, although created a more modular, clean implementation of an orchestration based saga.
General Patterns, High Level Overview
This post will not go into great detail about design patterns used, although I will give a brief overview for clarity. The main patterns used to communicate between microservices in this are: saga, idempotent consumer, and transactional outbox. I also use some simple OOP patterns, such as the factory and builder.
Infrastructure
This project depends on a couple of different infrastructure technologies.- Postgres: Used for persistence of the saga, inbox and outbox messages, and overall "work" that individual microservices needed to accomplish as apart of the saga.
- RabbitMQ: Facilitates messaging between microservices
- Docker: Used in order to easily spin up the infrastructure dependencies
Orders, Inventory, and Shipping services
The overall goal of this project is to utilize a crash-resilient orchestration based saga to create a microservice system that is eventually consistent. Regardless of service crashes at any instant, whether it be the orchestrator or worker nodes. The simulated environment for this is an oversimplified e-commerce system with three microservices: orders, inventory, and shipping.Orders Service
The orders service acts as the entrypoint for new requests, and is therefore the saga orchestrator. Requests reach the orders service via HTTP. For simplicity, requests only contain a productId and a quantity field located in a JSON body. After receiving the request, it creates an order in a pending state, and a new orders saga. Throughout the entire process, the orders service continues to facilitate messages between the other services until the order is shipped, after which it will finalize the order status. If anything goes wrong along the way, the orders service is responsible for canceling the order by updating its local database, and sending messages to microservices which need to rollback any successful changes that were performed.Inventory Service
The inventory service is responsible for managing how much of a specific product is available. In theory is would validate there is enough product for the given order requested, and remove the inventory from the system. In reality, most of this project was centered around the orchestrator, so the inventory service performs "fake" work, and fails 10% of the time.Shipping Service
The shipping service is responsible for shipping the product once the inventory has been removed. This would be the last step in the process, besides finalizing the order in the orders service. Once again, this service doesn't do much of anything, besides sending messages back to the orchestrator and failing 10% of the timeSaga Step Interface
To parse the implementation, we will work our way up from the smallest building block to the largest. Firstly, we'll talk about the saga step interface. Thankfully, it is quite minimal. There is an invoke function and a compensate function. When implemented, the invoke function's purpose is to perform the work of the step. The compensate function should revert whatever occurred during the invoke step for rollback purposes. There is also a field on the step, which is helpful in restoring sagas from the database. More on this later.
import { STEP } from "../../../db/entities/types"
export interface SagaStepInterface<T, U> {
step: STEP
invoke(data: T): Promise<void>
compenstate(data: U): Promise<void>
}
OrderSaga Steps
In order to implement the saga, some concrete steps are required. The following are the steps created for the order saga. All steps must be idempotent, and all actions have to occur in the same database transaction in order to achieve consistency.Create Order
export class CreateOrderStep implements SagaStepInterface<OrderSagaStepData, OrderSagaStepData> {
public step: STEP = STEP.CREATE_ORDER
constructor(private datasource: DataSource) {}
async invoke(data: OrderSagaStepData): Promise<void> {
console.log(`creating order with id ${data.orderId}`)
await this.datasource.transaction(async manager => {
const orderRepository = manager.getRepository(Order)
const outboxRepository = manager.getRepository(OutboxMessage)
const orderSagaRepository = manager.getRepository(OrderSagaEntity)
const existingOrder = await orderRepository.findOneBy({orderId: data.orderId})
if (existingOrder) {
console.log(`already created order with id ${data.orderId}, skipping`)
return
}
const order = new Order(data.orderId, data.productId, data.quantity)
const outboxMessage = new OutboxMessage(data.orderId, data.productId, data.quantity, OUTBOX_MESSAGE_TYPE.REMOVE_INVENTORY_LOCAL)
const orderSagaEntity = new OrderSagaEntity(data.orderId, data.quantity, data.productId, STEP.CREATE_ORDER)
await orderRepository.save(order)
await outboxRepository.save(outboxMessage)
await orderSagaRepository.save(orderSagaEntity)
})
}
async compenstate(data: OrderSagaStepData): Promise<void> {
console.log(`compensating create order step`)
await this.datasource.transaction(async manager => {
const orderRepository = manager.getRepository(Order)
const sagaRepository = manager.getRepository(OrderSagaEntity)
const sagaEntity = new OrderSagaEntity(data.orderId, data.productId, data.quantity, STEP.COMPENSATE)
const existingOrder = await orderRepository.findOneBy({orderId: data.orderId})
existingOrder.status = ORDER_STATUS.CANCELED
await sagaRepository.save(sagaEntity)
await orderRepository.save(existingOrder)
})
}
}
Remove Inventory
export class RemoveInventoryStep implements SagaStepInterface<OrderSagaStepData, OrderSagaStepData> {
public step: STEP = STEP.REMOVE_INVENTORY
constructor(private datasource: DataSource) {}
async invoke(data: OrderSagaStepData): Promise<void> {
console.log(`removing inventory for order with id ${data.orderId}`)
await this.datasource.transaction(async manager => {
const inboxRepository = manager.getRepository(InboxMessage)
const existingMessage = await inboxRepository.findOneBy({id: data.messageId, messageType: INBOX_MESSAGE_TYPE.REMOVE_INVENTORY_LOCAL})
if (existingMessage) {
console.log(`already removed inventory for order with id ${data.orderId}, skipping`)
return
}
const outboxRepository = manager.getRepository(OutboxMessage)
const sagaRepository = manager.getRepository(OrderSagaEntity)
const inboxMessage = new InboxMessage(data.messageId, data.orderId, INBOX_MESSAGE_TYPE.REMOVE_INVENTORY_LOCAL, true)
const outboxMessage = new OutboxMessage(data.orderId, data.productId, data.quantity, OUTBOX_MESSAGE_TYPE.REMOVE_INVENTORY)
const sagaEntity = new OrderSagaEntity(data.orderId, data.productId, data.quantity, STEP.REMOVE_INVENTORY)
await inboxRepository.save(inboxMessage)
await outboxRepository.save(outboxMessage)
await sagaRepository.save(sagaEntity)
})
}
async compenstate(data: OrderSagaStepData): Promise<void> {
console.log(`compensating remove inventory step`)
await this.datasource.transaction(async manager => {
const outboxRepository = manager.getRepository(OutboxMessage)
const existingMessage = await outboxRepository.findOneBy({orderId: data.orderId, messageType: OUTBOX_MESSAGE_TYPE.RESTORE_INVENTORY})
if (existingMessage) {
console.log('already compensated, skipping')
return
}
const outboxMessage = new OutboxMessage(data.orderId, data.productId, data.quantity, OUTBOX_MESSAGE_TYPE.RESTORE_INVENTORY)
await outboxRepository.save(outboxMessage)
})
}
}
Ship Order
export class ShipOrderStep implements SagaStepInterface<OrderSagaStepData, OrderSagaStepData> {
public step: STEP = STEP.SHIP_ORDER
constructor(private datasource: DataSource) {}
async invoke(data: OrderSagaStepData): Promise<void> {
console.log(`shipping order for order with id ${data.orderId}`)
await this.datasource.transaction(async manager => {
const inboxRepository = manager.getRepository(InboxMessage)
const existingMessage = await inboxRepository.findOneBy({id: data.messageId, messageType: INBOX_MESSAGE_TYPE.INVENTORY_RESPONSE})
if (existingMessage) {
console.log(`already shipped order for order with id ${data.orderId}, skipping`)
return
}
const outboxRepository = manager.getRepository(OutboxMessage)
const sagaRepository = manager.getRepository(OrderSagaEntity)
const inboxMessage = new InboxMessage(data.messageId, data.orderId, INBOX_MESSAGE_TYPE.INVENTORY_RESPONSE, true)
const outboxMessage = new OutboxMessage(data.orderId, data.productId, data.quantity, OUTBOX_MESSAGE_TYPE.SHIP_ORDER)
const sagaEntity = new OrderSagaEntity(data.orderId, data.productId, data.quantity, STEP.SHIP_ORDER)
await inboxRepository.save(inboxMessage)
await outboxRepository.save(outboxMessage)
await sagaRepository.save(sagaEntity)
})
}
async compenstate(data: OrderSagaStepData): Promise<void> {
console.log(`compensating ship order step`)
await this.datasource.transaction(async manager => {
const outboxRepository = manager.getRepository(OutboxMessage)
const existingMessage = await outboxRepository.findOneBy({orderId: data.orderId, messageType: OUTBOX_MESSAGE_TYPE.SHIP_ORDER_CANCEL})
if (existingMessage) {
console.log('already compensated, skipping')
return
}
const outboxMessage = new OutboxMessage(data.orderId, data.productId, data.quantity, OUTBOX_MESSAGE_TYPE.SHIP_ORDER_CANCEL)
await outboxRepository.save(outboxMessage)
})
}
}
Finalizing Order
export class FinalizeOrderStep implements SagaStepInterface<OrderSagaStepData, OrderSagaStepData> {
public step: STEP = STEP.FINALIZE_ORDER
constructor(private datasource: DataSource) {}
async invoke(data: OrderSagaStepData): Promise<void> {
console.log(`finalizing order with id ${data.orderId}`)
await this.datasource.transaction(async manager => {
const inboxRepository = manager.getRepository(InboxMessage)
const existingMessage = await inboxRepository.findOneBy({id: data.messageId, messageType: INBOX_MESSAGE_TYPE.SHIPPING_RESPONSE})
if (existingMessage) {
console.log(`already finalized order with id ${data.orderId}, skipping`)
return;
}
const sagaRepository = manager.getRepository(OrderSagaEntity)
const orderRepository = manager.getRepository(Order)
const inboxMessage = new InboxMessage(data.messageId, data.orderId, INBOX_MESSAGE_TYPE.SHIPPING_RESPONSE, true)
const sagaEntity = new OrderSagaEntity(data.orderId, data.productId, data.quantity, STEP.FINALIZE_ORDER)
const order = await orderRepository.findOneBy({orderId: data.orderId})
order.status = ORDER_STATUS.FULFILLED
await inboxRepository.save(inboxMessage)
await sagaRepository.save(sagaEntity)
await orderRepository.save(order)
})
}
async compenstate(): Promise<void> {
// last step, no compensation
}
}
Saga Class
In order to string these steps together, I created a Saga class which contains all of the aforementioned steps. The class object contains an array of all steps, keeps track of the next step to be completed via an index, and keeps track of which steps have completed so far in another array. It might be cleaner to implement this as a linked list to better represent an ordering of the steps, although I'm not convinced it is critically important. The saga additionally keeps information about the order that it is coordinating, such as orderId, productId, and quantity.
export class OrderSaga {
public steps: SagaStepInterface<OrderSagaStepData, OrderSagaStepData>[]
public completed: SagaStepInterface<OrderSagaStepData, OrderSagaStepData>[]
public index: number
public orderId: UUID
public productId: number
public quantity: number
constructor(orderId: UUID, productId: number, quantity: number) {
this.orderId = orderId
this.productId = productId
this.quantity = quantity
this.index = 0
this.steps = []
this.completed = []
}
async invokeNext(messageId?: UUID) {
if (this.index == this.steps.length) {
console.log('No more steps left, saga is completed')
return
}
await this.steps[this.index].invoke({messageId, orderId: this.orderId, productId: this.productId, quantity: this.quantity, orderSaga: this})
this.completed.push(this.steps[this.index])
this.index+=1
}
async compensate(messageId?: UUID) {
console.log(`compensating saga with orderId: ${this.orderId}`)
const stepsToRollback = this.completed.slice(0, -1).reverse()
for (const step of stepsToRollback) {
await step.compenstate({messageId, orderId: this.orderId, productId: this.productId, quantity: this.quantity, orderSaga: this})
}
}
}
Builder Pattern
In order to easily create a saga, I'm using the builder pattern to add steps incrementally.
export class OrderSagaBuilder {
private orderSaga: OrderSaga
constructor(orderId: UUID, productId: number, quantity: number) {
this.orderSaga = new OrderSaga(orderId, productId, quantity)
}
addStep(step: SagaStepInterface<OrderSagaStepData, OrderSagaStepData>) {
this.orderSaga.steps.push(step)
return this
}
setStep(lastCompletedStep: STEP) {
if (!lastCompletedStep) return
for (const step of this.orderSaga.steps) {
if (step.step === lastCompletedStep) {
this.orderSaga.completed.push(step)
this.orderSaga.index+=1
return
}
this.orderSaga.completed.push(step)
this.orderSaga.index+=1
}
}
build() {
return this.orderSaga
}
}
Factory Pattern
I utilize the builder pattern within my factory pattern. The main reason I'm using the factory pattern is so that I do not have to worry about adding steps in the correct order each time I want to instantiate a saga. The pattern becomes very useful when recreating sagas from persistence.
export class OrderSagaFactory {
constructor(private datasource: DataSource) {}
createSaga(orderId: UUID, productId: number, quantity: number, lastCompletedStep?: STEP) {
const builder = new OrderSagaBuilder(orderId, productId, quantity)
builder.addStep(new CreateOrderStep(this.datasource))
.addStep(new RemoveInventoryStep(this.datasource))
.addStep(new ShipOrderStep(this.datasource))
.addStep(new FinalizeOrderStep(this.datasource))
builder.setStep(lastCompletedStep)
return builder.build()
}
}
Saga Orchestrator
The orchestrator is the class which manages the life cycle of all sagas. For each order there exists an instance of the saga class which manages the life cycle of the order. This orchestrator keeps track of all of these via a map from unique orderId to saga. It then is trivial to call functions related to a saga given an orderId.
export class OrderSagaOrchestrator {
private sagas = new Map<UUID, OrderSaga>();
constructor(private orderSagaFactory: OrderSagaFactory, private datasource: DataSource) {}
async newSaga(productId: number, quantity: number) {
const orderId = randomUUID()
const saga = this.orderSagaFactory.createSaga(orderId, productId, quantity)
this.sagas.set(orderId, saga)
await this.invokeNext(orderId)
}
async invokeNext(orderId: UUID, messageId?: UUID) {
const saga = this.sagas.get(orderId)
if (!saga) {
console.log(`No saga with orderId ${orderId} found`)
return
}
await saga.invokeNext(messageId)
}
async compensateSaga(orderId: UUID, messageId?: UUID) {
const saga = this.sagas.get(orderId)
await saga.compensate(messageId)
}
async restoreFromDb() {
const sagaRepository = this.datasource.getRepository(OrderSagaEntity)
const sagaEntities = await sagaRepository.find()
const sagas = sagaEntities.filter(entity => entity.lastCompletedStep !== STEP.FINALIZE_ORDER && entity.lastCompletedStep !== STEP.COMPENSATE).map(entity => {
console.log(`restoring saga with order id ${entity.orderId}. Last completed step was ${entity.lastCompletedStep}`)
return this.orderSagaFactory.createSaga(
entity.orderId,
entity.productId,
entity.quantity,
entity.lastCompletedStep
)
})
sagas.forEach(saga => {
this.sagas.set(saga.orderId, saga)
})
}
}
Orchestrator Recovery
The most interesting piece of this class is the restoreFromDb method. This utilizes the order saga factory and Typeorm repository in order to fetch all sagas that have been persisted to the database, recreate them in memory, and put them in the right step. If any sagas have completed, successfully or not, they are not instantiated again. After a while, sagas within this set that are completed should also be cleaned up, to avoid a memory leak.HTTP Server
There is basic http server built using express to accept incoming requests to the order service, and create a new saga.
export class Server {
private app: express.Express
constructor(private port: number, private orderSagaOrchestrator: OrderSagaOrchestrator) {}
init() {
this.app = express()
this.app.use(bodyParser.json())
this.registerRoute('/', HTTP_METHOD.POST, this.placeOrder)
this.listen()
}
private placeOrder = async (req: express.Request, res: express.Response) => {
const {productId, quantity}: {productId: number, quantity: number} = req.body
if (!productId || !quantity) {
res.status(400).send('No product or quantity')
}
await this.orderSagaOrchestrator.newSaga(productId, quantity)
res.send()
}
private registerRoute = (path: string, method: HTTP_METHOD, func: (req: express.Request, res: express.Response) => any) => {
switch(method) {
case HTTP_METHOD.GET:
this.app.get(path, func)
break;
case HTTP_METHOD.POST:
this.app.post(path, func)
break;
default:
throw new Error('not supported')
}
}
private listen = () => {
this.app.listen(this.port, () => {
console.log(`Example app listening on port ${this.port}`)
})
}
}
RabbitMQ
The RabbitMQ service is what listens to the channels and routes messages back to the saga orchestrator class. There is also a critical method called pollOutbox, which is used to send the outbox messages that have been persisted to the database to the downstream services.
export class RabbitMQService {
private channel: amqplib.Channel
constructor(private datasource: DataSource, private orderSagaOrchestrator: OrderSagaOrchestrator) {
}
async init() {
const connection = await amqplib.connect({
username: process.env.RABBITMQ_USER,
password: process.env.RABBITMQ_PASSWORD
});
const channel = await connection.createChannel();
this.channel = channel;
for (const queue of Object.values(OUTBOX_MESSAGE_TYPE)) {
await channel.assertQueue(queue)
}
for (const queue of Object.values(INBOX_MESSAGE_TYPE)) {
await channel.assertQueue(queue)
this.listenForMessage(queue)
}
this.pollOutbox()
}
listenForMessage = async (queue: INBOX_MESSAGE_TYPE) => {
await this.channel.consume(queue, async (msg) => {
if (msg !== null) {
const message: ResponseMessage = JSON.parse(msg.content.toString())
console.log(`Received message with orderId ${message.orderId} and status ${message.success}`);
if (message.success) {
await this.orderSagaOrchestrator.invokeNext(message.orderId, message.id)
} else {
await this.orderSagaOrchestrator.compensateSaga(message.orderId)
}
this.channel.ack(msg)
}
})
}
pollOutbox = () => {
const outboxRepository = this.datasource.getRepository(OutboxMessage)
setInterval(async () => {
const outboxMessages = await outboxRepository.find()
outboxMessages.forEach(async outboxMessage => {
const json = outboxMessage.toJson();
const buffer = Buffer.from(JSON.stringify(json))
console.log(`sending outbox message to ${outboxMessage.messageType} queue`)
this.channel.sendToQueue(outboxMessage.messageType, buffer)
await outboxRepository.remove(outboxMessage)
})
}, 5000)
}
}
OrderId on Messages
On each message between services, there is an orderId. This is passed to the saga orchestrator, allowing the orchestrator to find the saga that is corresponding to that message.Crash Mitigation Walk Through
Finally, I will briefly go over the main checkpoints at which the orchestrator may crash, and how the architecture of the application mitigates the concern.