What Is 2PC
2PC, or two phase commit, is a protocol used in order to achieve a robust distributed transaction. There are certain times where it might be necessary to achieve atomicity across multiple database nodes for one reason or another. With 2PC, this becomes possible. The following is a rough outline of the 2PC protocol, and how it operates at a high level
- A coordinator node first sends a request to all other nodes participanting in the transaction to determine whether they are able to perform the required work. Each participating node must respond and indicate whether it can or cannot achieve the work
- After receiving all responses from each participating node, the coordinator is able to determine whether the transaction can succeed or not. If all participants have voted yes, then the coordinator sends a follow-up message to each of the nodes to commit the changes. If any participating node cannot perform the required work, the coodinator sends a message indicating that each node should not commit the changes preapred in the prior step
These two bullets represent the two main phases of two phase commit. A prepare phase, and a commit/rollback phase. With all of this, the core control flow of the protocol can be understood
Possible Issues With 2PC: Coordinator Failures and Synchronous Nature
With the basics understood, it can be seen that two phase commit is a synchronous protocol, which in it of itself is quite limiting. But if we dive a bit deeper, we can identify some specific issues that might arise and how to mitigate them. Firstly, we have a potential participant node crash. This can happen in a couple of places
- Before recieving the initial prepare transaction message
- After responding to the inital prepare transaction message, but before recieving a commit/rollback message
WALs
In order to combat coordinator failures, it is first important to understand what a WAL, or write ahead log is. A write ahead log is a technique used in distributed systems to ensure that if a server says it will perform some action, it will do so despite a server crash. A write ahead log can be implemnted in many different ways, althought the core principals stay the same. Upon receiving a message, the server first writes the action or list of actions that it needs to perform to a persistant storage, ensuring it can perform said action until the action is actually done (by aquiring locks, or by other means). Then it can perform the action at some point in the future. If the sever crashes before the action is performed, the WAL can be used to understand what outstanding action needs to be done when the server comes back up that has not been completed yet.
In our case, we will use a modified write ahead log to understand what stage of coordination we are at, so that if the coordinator dies it knows what actions to perform for specific transactions when it comes back up. Here are the following failure scenarios for the coordinator
- Coordinator goes offline after sending the initial prepare statement to all nodes, and before receiving one or many responses
- Coordinator goes offline after receiving the responses from one or many participanting nodes, but before it can send the commit/rollback message
- Coordinator goes offline after sending partial commit/rollback messages, but not all
For each of the steps that the coordinator goes through, it will write in the write ahead log what it is about to perform, so that it if ever goes down then it can pick right back up where it left off. A key point of this is that all actions done by the coordinator must be idempotent, so that if we crash before writing the next step to our write ahead log, then we will not impact the state of our transaction negatively. The process of the WAL + coordinator roughly looks like so
- The coordinator generates a unique transaction ID and writes it to the WAL
- The coordinator sends prepare statements to each participanting node with the previously generated ID, and waits for responses. If the coordinator crashes during this step, it first checks to see if any prepared transaction with the ID exist in the participanting nodes before sending the create message again upon, achieving idempotency.
- The coordinator receives responses from each node, and writes the final outcome to the database. If the coordinator crashes before it receives the responses from the nodes, it will rerun the idepotent stage 2. If a prepared transaction exists, the coordinator knows that the node was able to perform the work. If one does not, the coordinator could have either crashed before sending prepare message to that node, or the node failed to perform the prepare transaction. In that case we need to assume we never sent the message, and retry. After this, either "commit" or "rollback" is written to the WAL in order to indicate what should be done in phase 2.
- Based on the commit/rollback value, the coordinator either sends commit or rollback messages to each node involved. If the coordinator crashes during this phase, we can simply re-read the commit/rollback value from the WAL and continue. If we arleady sent some messages to nodes before we crashed, it is as simple as sending the messages again. This is because if the node says that no transaction with the provided ID can be found, then the coordinator knows the commit/rollback message was processed by the participating node.
PostgresSQL "Prepare Transaction", "Commit Prepared", "Rollback Prepared"
In order to achieve this with PostgresSQL databases as the participant nodes, postgres implements something called prepared transactions. This gives postgres the ability to "reserve" rows that would be impacted via a defined transaction, and wait for a commit prepared or rollback prepared message before flushing changes to disk and releasing locks. With this functionality, we can implement phase 1 by utilizing the prepare transaction statement, and phase 2 via the commit transaction or rollback transaction respectively.
Basic Contriveved Example
In order to implement and demonstrate 2PC in the simplest way possible, I've created a very basic repository which has a horribly designed set of databases. In this project, there are three databases. One for the coordinator, one for storing information about a person such as their name and age, and one for storing address information for the same person, such as city and state. In order to update both personal and address information atomically, 2PC will be used.
Code Walk
In order to create this project, I'm using express.js in tandem with typeorm. I've got a couple of different entities which represent the person, address, and WAL that I discussed before. Again these are quite basic, and only for demonstration purposes
@Entity('addresses')
export class Address {
@PrimaryGeneratedColumn()
id: number
@Column()
city: string
@Column()
state: string
@Column()
zip: string
}
@Entity('persons')
export class Person {
@PrimaryGeneratedColumn()
id: number
@Column()
firstname: string
@Column()
lastname: string
@Column()
age: number
}
@Entity('coordinator_logs')
export class CoordinatorLog {
constructor(transactionId: UUID, status?: STATUS) {
this.transactionId = transactionId
this.status = status
}
@Column({type: 'enum', enum: STATUS, nullable: true})
status: STATUS | null
@Column({type: 'uuid', primary: true})
transactionId: UUID
}
The coordinator log entity will act as our write ahead log for the coordinator for any given transaction. In order to have an extensible implementation for coordinatable nodes, I created an interface which each participanting node in the 2PC protocol must implement
export interface ICoordinatableService {
prepare(txid: UUID): Promise
commit(txid: UUID)
rollback(txid: UUID)
}
The implementation for the address service and persons service have hard-coded update statements in the prepare function, to keep things simple and focused on the 2PC protocl itself
export class AddressesService implements ICoordinatableService {
constructor(private datasource: DataSource) {}
async prepare(txid: UUID): Promise {
const txExists = await this.txExists(txid)
if (txExists) {
return true
}
try {
const query = `BEGIN TRANSACTION; UPDATE ADDRESSES SET CITY = 'New City' WHERE id = 1; PREPARE TRANSACTION '${txid}';`
await this.datasource.query(query)
return true
} catch (e) {
console.log(e.message)
return false
}
}
async commit(txid: UUID) {
const txExists = await this.txExists(txid)
if (!txExists) {
return true
}
const query = `COMMIT PREPARED '${txid}'`
await this.datasource.query(query)
return true
}
async rollback(txid: UUID) {
const txExists = await this.txExists(txid)
if (!txExists) {
return true
}
const query = `ROLLBACK PREPARED '${txid}'`
await this.datasource.query(query)
return true
}
private async txExists(txid: UUID) {
const query = `
SELECT EXISTS (
SELECT 1
FROM pg_prepared_xacts
WHERE gid = '${txid}'
);
`
const result = await this.datasource.query(query)
return result[0].exists
}
}
export class PersonsService implements ICoordinatableService {
constructor(private datasource: DataSource) {}
async prepare(txid: UUID): Promise {
const txExists = await this.txExists(txid)
if (txExists) {
return true
}
try {
const query = `BEGIN TRANSACTION; UPDATE PERSONS SET FIRSTNAME = 'New Firstname' WHERE id = 1; PREPARE TRANSACTION '${txid}';`
await this.datasource.query(query)
return true
} catch (e) {
console.log(e.message)
return false
}
}
async commit(txid: UUID) {
const txExists = await this.txExists(txid)
if (!txExists) {
return true
}
const query = `COMMIT PREPARED '${txid}'`
await this.datasource.query(query)
return true
}
async rollback(txid: UUID) {
const txExists = await this.txExists(txid)
if (!txExists) {
return true
}
const query = `ROLLBACK PREPARED '${txid}'`
await this.datasource.query(query)
return true
}
private async txExists(txid: UUID) {
const query = `
SELECT EXISTS (
SELECT 1
FROM pg_prepared_xacts
WHERE gid = '${txid}'
);
`
const result = await this.datasource.query(query)
return result[0].exists
}
}
In order to utilize these functions I created a transaction coordinator service. For now the coordinator service is hardcoded to accept these two sub-services,
although it could be refactored to accept an array of ICoordinatblaeService implementations
export class TransactionCoordinator {
constructor(
private personsService: ICoordinatableService,
private addressService: ICoordinatableService,
private coordinatorRepository: Repository
) {}
async begin() {
const txId = randomUUID()
const log = new CoordinatorLog(txId)
await this.coordinatorRepository.save(log)
await this.phase1(log)
await this.phase2(log)
}
private async phase1(log: CoordinatorLog) {
let personResponse = false
let addressResponse = false
try {
personResponse = await this.personsService.prepare(log.transactionId)
} catch (e) {
console.log(`Failed to prepare transaction with id ${log.transactionId} for person`)
}
try {
addressResponse = await this.addressService.prepare(log.transactionId)
} catch (e) {
console.log(`Failed to prepare transaction with id ${log.transactionId} for address`)
}
log.status = personResponse && addressResponse ? STATUS.COMMIT : STATUS.ROLLBACK
await this.coordinatorRepository.save(log)
}
private async phase2(log: CoordinatorLog) {
if (log.status === STATUS.COMMIT) {
await this.commit(log.transactionId)
} else {
await this.rollback(log.transactionId)
}
await this.coordinatorRepository.update(
{ transactionId: log.transactionId },
{ status: STATUS.DONE }
)
}
async rollback(txid: UUID) {
await backOff(() => this.personsService.rollback(txid))
await backOff(() => this.addressService.rollback(txid))
}
async commit(txid: UUID) {
await backOff(() => this.personsService.commit(txid))
await backOff(() => this.addressService.commit(txid))
}
async recover() {
const incompleteTxs = await this.coordinatorRepository.find({
where: { status: Not(STATUS.DONE) }
})
for (const log of incompleteTxs) {
try {
if (!log.status) {
await this.phase1(log)
}
await this.phase2(log)
} catch (error) {
console.error(`Recovery failed for transaction ${log.transactionId}:`, error)
}
}
}
}
Finally, there is the main server class which sets up the express JS server, and the main.ts which bootstraps the entire project
export class Server {
private app
private port
constructor(private transactionCoordinator: TransactionCoordinator) {
this.app = express()
this.port = 3000
}
async init() {
this.app.get('/', (req: Request, res: Response) => {
res.send('Hello World!')
})
this.app.post('/update', async (req: Request, res: Response) => {
await this.transactionCoordinator.begin()
res.status(200).send()
})
this.app.listen(this.port, () => {
console.log(`Example app listening on port ${this.port}`)
})
}
}
const main = async () => {
await personDatasource.initialize()
await addressDatasource.initialize()
await coordinatorDatasource.initialize()
const coordinatorRepository = coordinatorDatasource.getRepository(CoordinatorLog)
const addressService = new AddressesService(addressDatasource)
const personService = new PersonsService(personDatasource)
const transactionCoordinator = new TransactionCoordinator(personService, addressService, coordinatorRepository)
const server = new Server(transactionCoordinator)
cawait server.init()
}
Link to the GitHub repository