In a distributed system, ensuring data consistency among multiple concurrent processes is critical. Race conditions can cause data corruption or financial discrepancies when processing sensitive operations such as account withdrawals. This article describes how to use mutexes (mutex locks) with RabbitMQ and Node.js to safely handle concurrent operations while maintaining efficient memory management.
question
Imagine a scenario where multiple RabbitMQ worker threads are processing withdrawal requests from the same account at the same time. Without proper synchronization, two workers could read the same initial balance, subtract a different amount, and then update the account, resulting in an incorrect balance.
Race condition example:
- Initial balance: $1000
- Worker A reads the balance: $1000
- Worker B reads the balance: $1000
- Worker A subtracts $300 and updates the balance: $700
- Worker B subtracts $200 and updates the balance: $800 (covers Worker A’s update)
Final balance: $800 (should be $500).
Solution: Use a mutex
Mutexes ensure that only one worker can handle operations on a specific account at any time. Here’s how we implemented a mutex-based solution in a RabbitMQ-powered Node.js system.
gradually implemented
- Prerequisites
Install necessary packages:
npm install async-mutex amqplib
- async-mutex: Provides easy-to-use mutexes for asynchronous operations.
- amqplib: enables interaction with RabbitMQ.
- Program code implementation
Below is the complete implementation, including powerful memory management for mutex cleanup.
one. Mutual exclusion management
We use a map to maintain a mutex for each account. Mutexes are created as needed and deleted when no longer needed.
b. Memory optimization
To avoid memory bloat, we implement:
- Idle timeout: Automatically delete mutexes for inactive accounts after 5 minutes.
- Periodic Cleanup: The daemon ensures that stale mutexes are removed every 1 minute.
c.Full implementation
const { Mutex } = require('async-mutex');
const amqp = require('amqplib');
// Mutex for account operations with automatic cleanup
const accountMutexes = new Map(); // Store one mutex per account
const accountTimeouts = new Map(); // Store timeout references for cleanup
const CLEANUP_INTERVAL_MS = 60000; // 1-minute cleanup interval
const IDLE_TIMEOUT_MS = 300000; // 5-minute idle timeout per account
// Function to get or create a mutex for a specific account
function getAccountMutex(accountId) {
if (!accountMutexes.has(accountId)) {
const mutex = new Mutex();
accountMutexes.set(accountId, mutex);
resetAccountTimeout(accountId); // Start idle timeout cleanup
}
return accountMutexes.get(accountId);
}
// Function to reset idle timeout for an account
function resetAccountTimeout(accountId) {
if (accountTimeouts.has(accountId)) {
clearTimeout(accountTimeouts.get(accountId));
}
const timeout = setTimeout(() => {
accountMutexes.delete(accountId);
accountTimeouts.delete(accountId);
console.log(`Mutex for account ${accountId} removed due to inactivity.`);
}, IDLE_TIMEOUT_MS);
accountTimeouts.set(accountId, timeout);
}
// Periodic cleanup process
function startPeriodicCleanup() {
setInterval(() => {
accountTimeouts.forEach((_, accountId) => {
if (!accountMutexes.has(accountId)) {
accountTimeouts.delete(accountId);
}
});
}, CLEANUP_INTERVAL_MS);
console.log(`Periodic cleanup started: checking every ${CLEANUP_INTERVAL_MS / 1000} seconds.`);
}
// Simulated database of accounts
const accounts = {
"123": { balance: 1000 },
"456": { balance: 2000 },
};
// Process withdrawal
async function processWithdrawal(accountId, amount) {
const mutex = getAccountMutex(accountId);
const release = await mutex.acquire();
try {
console.log(`Processing withdrawal for account ${accountId}`);
const account = accounts[accountId];
if (!account) {
throw new Error('Account not found');
}
if (account.balance < amount) {
throw new Error('Insufficient funds');
}
account.balance -= amount;
console.log(`Withdrawal successful! New balance for account ${accountId}: ${account.balance}`);
} catch (error) {
console.error(`Error processing withdrawal for account ${accountId}:`, error.message);
} finally {
release();
resetAccountTimeout(accountId);
}
}
// RabbitMQ message handler
async function handleMessage(message) {
const { accountId, amount } = JSON.parse(message.content.toString());
await processWithdrawal(accountId, amount);
}
// Connect to RabbitMQ and consume messages
(async () => {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queueName = 'withdrawals';
await channel.assertQueue(queueName, { durable: true });
console.log(`Waiting for messages in queue: ${queueName}`);
channel.consume(queueName, async (msg) => {
if (msg) {
await handleMessage(msg);
channel.ack(msg); // Acknowledge message after processing
}
});
startPeriodicCleanup(); // Start periodic cleanup
})();
how it works
- Account-specific mutex:
- Each account has its own mutexes (accountMutexes), allowing safe concurrency of different accounts.
- The mutex is dynamically created on the first visit.
- Key parts:
- The processWithdrawal function locks the mutex to ensure that only one worker can modify the account’s balance.
- Memory management:
- Idle timeout: The mutex is deleted after 5 minutes of inactivity.
- Periodic Cleanup: A backgrounder runs every minute to clean up stale or unreferenced mutexes.
advantage
- Race condition prevention:
- Ensure that only one staff member handles withdrawals from a given account at a time.
- Efficient memory management:
- Automatically delete mutexes for inactive accounts to prevent memory bloat.
- High throughput:
- Simultaneous processing of different accounts is not affected, maintaining the scalability of the system.
- Powerful error handling:
- Proper handling of account errors and lock releases in finally blocks ensures a consistent system.
sample output
Enter queue message:
{ "accountId": "123", "amount": 100 }
{ "accountId": "456", "amount": 200 }
{ "accountId": "123", "amount": 300 }
Console output:
Waiting for messages in queue: withdrawals
Periodic cleanup started: checking every 60 seconds.
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 900
Processing withdrawal for account 456
Withdrawal successful! New balance for account 456: 1800
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 600
Mutex for account 123 removed due to inactivity.
Mutex for account 456 removed due to inactivity.
in conclusion
By combining mutexes with RabbitMQ, you can safely handle concurrent operations in Node.js systems.
New idle timeouts and periodic cleanup ensure efficient memory management, making this solution scalable and robust for real-world use cases.