How to Propegate Transactions Across Repositories
May 7, 2025
Often times when handling requests within a backend application, multiple database queries need to occur in order to acomplish some business functionality. Dealing with multiple queries to the database often means that a transaction is required in order to ensure data consistency. Assuming a basic three layer architecture, an application typically has some form of repositories or DAO living in the infrastructure layer which is used to interface with databases. When utilizing multiple repositories within a single request, one database connection which starts a transaction must be shared between multiple separate repositories. There are multiple methods to achieve this, but depending on how the underlying data is structured, it might not be necessary at all.
DDD
With a smart domain model emphasized by DDD transactions which span repositories is actually a code smell. Repositories fetch aggregate roots, which contains enough information to make decisions about business functionality which must be transactionally consistent. With this approach, if transactions must span mutliple repositories the domain model must be reconsidered.
Repository Functions Accepting DB Connections
The most naive approach to managing consistency across repositories is have each repository method accept an optional database connection. If one is provided, the method will utilize it to query to the database, and otherwise it can grab a connection from a database pool to make an isolated request. The following example comes from a golang distributed backend system for a website that mimics the functionality of letterboxd, but for music.
func(p *PostsDAO) LikePost(executor db.QueryExecutor, spotifyID string, posterSpotifyID string, songID string) error { query := `INSERT INTO post_votes (voterspotifyid, posterspotifyid, postsongid, createdat, updatedat, liked) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (voterspotifyid, posterspotifyid, postsongid) DO UPDATE SET updatedat=$5, liked=$6` res, err := executor.Exec(query, spotifyID, posterSpotifyID, songID, time.Now().UTC(), time.Now().UTC(), true) if err != nil { return customerrors.WrapBasicError(err) } rows, err := res.RowsAffected() if err != nil { return customerrors.WrapBasicError(err) } if rows < 1 { return customerrors.WrapBasicError(sql.ErrNoRows) } return nil }The first argument to the function is a db.QueryExectutor, an interface that wraps a database connection so that the underlying database connection can be replaced with minimal changes. This implementation works, although it creates unnecssary complexity within the application layer. Services must manage the database connection which causes an abstraction leak between the application layer and infrastructure layer.
Context/Async/Thread Local Storage and Interceptors
A more sophisticated solution of sharing a database connection amongst repositories involves utilizing thread local storage. Also known as Async and Context local storage, thread local storage helps to store information isolated to a single execution thread. This makes it possible to initilize a transaction and expose the database connection via thread local storage, and for each repository check if there is an avaialble connection in the thread local storage. If one exists, it will be used to execute any given query. If not, the repository can grab a connection from the pool. In order to create the initial database connection and start the transaction, it must be known whether or not a service method requires a transaction.
Interceptors allow for code execution before service methods. Most backend frameworks have some form of interceptors, although for this example I'll be using NestJS. Interceptors are implemented as decorators, and can help to grab a database connection from a pool, begin a transaction, and store the connection into context local storage. The connection can then be accessed by the repositories used in the method itself. The Interceptor manages the entire lifecyle of the transaction, starting the transaction and observing the http response. If an error is detected, it will rollback the transaction.
export class TransactionInterceptor implements NestInterceptor { constructor( private dataSource: DataSource, private cls: ClsService, ) {} async intercept( context: ExecutionContext, next: CallHandler, ): Promise> { const queryRunner = this.dataSource.createQueryRunner(); queryRunner.connect(); queryRunner.startTransaction(); const manager = queryRunner.manager; this.cls.set('connection', manager); return next.handle().pipe( concatMap(async (data) => { await queryRunner.commitTransaction(); return data; }), catchError(async (error) => { await queryRunner.rollbackTransaction(); throw error; }), finalize(async () => { await queryRunner.release(); }), ); } }
@Injectable({ scope: Scope.REQUEST }) export class InventoryDAO { private queryRunner: EntityManager; constructor(cls: ClsService, dataSource: DataSource) { constructor( private cls: ClsService, private dataSource: DataSource, ) { this.queryRunner = cls.get('connection') ?? dataSource.manager; } // return the id of the item if in stock async getStock(itemName: string): Promise{ this.queryRunner = this.cls.get('connection') ?? this.dataSource.manager; const res = await this.queryRunner.query ( `SELECT count(*) FROM inventory WHERE "itemName" = $1`, [itemName], ); return res[0].count; }
export class OrdersService { constructor( private paymentService: PaymentService, private inventoryService: InventoryService, private invoiceService: InvoiceService, ) {} @UseInterceptors(TransactionInterceptor) async placeOrder(itemName: string, user: string, price: number) { await this.inventoryService.checkAndReduceStock(itemName); await this.paymentService.checkAndChargePayment(user, price); await this.invoiceService.provideInvoice(user, price); } }
@Injectable() export class InventoryService { constructor(private inventoryDAO: InventoryDAO) {} async checkAndReduceStock(itemName: string) { const stock = await this.inventoryDAO.getStock(itemName); if (stock <= 0) { throw new NotFoundException( 'unable to process request. Not enough inventory', ); } this.inventoryDAO.reduceStock(itemName); } }
The individual services utilize repositories in order to interact with the database, and those repositories will utilize the shared connection created by the interceptor. With frameworks that implement decorators such as Springboot's @Transactional(), this is typically how it is implemented. However I wanted a more foundational understanding of how this could be achieved, and discovered the Unit of Work Pattern.
Unit of Work Pattern
The Unit of Work pattern is a way to abstract all functionality that occurs within the database across the lifetime of a request. It allows an application to make changes across multiple different repositories commit them all at once. At its core, the Unit of Work pattern manages request scoped instance of repositories. Upon a new request, it starts a database transaction and provides the request scoped repositories with an instance of the database connection. Methods exist on the Unit of Work class to fetch these repositories. There are also typically methods to commit and rollback the overarching transaction shared between repositories. This implementation has many benefits. The Unit of Work is injected into services so it can be mocked for testing, it eliminates the leaky abstraction by encapsulating the database connection, hiding it away from the application service. It can also be implemented in a slightly different manner, returning the connection to the user and having them manually begin, commit, and rollback connections. This does leak the abstraction, but it was the implementation that I went with as I was inspired by Typeorm. The Unit of Work class is implemented below as the TransactionManager
import { Injectable } from '@nestjs/common'; import { Pool, PoolClient } from 'pg'; import { DAO } from '../types'; Injectable(); export class TransactionManager { constructor(private pool: Pool) {} /* the end user must call repository.close() when done with use on transaction, that way the connection is closed! */ async getDAOsForTransaction( ...constructors: DAOGiven the names of repositories, the TransactionManager instantiates the list of repositories with the individual connection taken from the pool. It returns the repositories and the connection to the user.[] ): Promise<[any[], PoolClient]> { const connection = await this.pool.connect(); return [ constructors.map((constructor) => new constructor(connection)), connection, ]; } }
@Injectable() export class OrdersService { constructor( private paymentService: PaymentService, private inventoryService: InventoryService, private invoiceService: InvoiceService, private transactionManager: TransactionManager, ) {} async placeOrder(itemName: string, user: string) { const price = await this.inventoryService.getPrice(itemName); await this.paymentService.checkAndChargePayment(user, price); await this.inventoryService.checkAndReduceStock(itemName); await this.invoiceService.provideInvoice(user, price); } async placeOrderTransaction(itemName: string, user: string) { const [daosForTransaction, connection] = await this.transactionManager.getDAOsForTransaction( InventoryDAO, InvoiceDAO, AccountBalanceDAO, ); const inventoryService = new InventoryService(daosForTransaction[0]); const invoiceService = new InvoiceService(daosForTransaction[1]); const paymentService = new PaymentService(daosForTransaction[2]); connection.query('BEGIN TRANSACTION'); try { const price = await this.inventoryService.getPrice(itemName); await paymentService.checkAndChargePayment(user, price); await inventoryService.checkAndReduceStock(itemName); await invoiceService.provideInvoice(user, price); } catch (e: unknown) { connection.query('ROLLBACK'); throw e; } finally { connection.release(); } } }The OrdersServiceinjects the TransactionManager, fetches repositories from it, and then starts a transaction. It executes multiple repository functions all which utilize the same transaction, and then performs a commit or rollback accordingly. It also releases the connection back to the pool.
Two example repos were used in this article. The first is a Golang repository which is a distributed backend service. The second is an isolated NestJS repository specifically created to examine this problem