Examples¶
This section contains practical examples of using Frontrun to test concurrent code.
Example 1: Bank Account Transfer¶
A classic example showing how a race condition can cause lost updates:
from frontrun.trace_markers import Schedule, Step, TraceExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def transfer(self, amount):
"""Transfer funds (has a race condition)."""
# frontrun: after_read
current = self.balance
# Simulate processing time
new_balance = current + amount
# frontrun: before_write
self.balance = new_balance
return new_balance
# Create an account with $100
account = BankAccount(balance=100)
# Define a schedule that triggers the race condition
# Both threads read before either writes
schedule = Schedule([
Step("transfer1", "after_read"), # T1 reads 100
Step("transfer2", "after_read"), # T2 reads 100
Step("transfer1", "before_write"), # T1 writes 150
Step("transfer2", "before_write"), # T2 writes 150
])
# Execute with controlled interleaving
executor = TraceExecutor(schedule)
executor.run("transfer1", lambda: account.transfer(50))
executor.run("transfer2", lambda: account.transfer(50))
executor.wait(timeout=5.0)
# Race condition detected: both deposits were counted as 150
# instead of 200 (100 + 50 + 50)
assert account.balance == 150, f"Expected 150, got {account.balance}"
print("Race condition: one $50 transfer was lost!")
Example 2: Correct Synchronization with Locks¶
Comparing the same code with proper synchronization:
import threading
from frontrun.trace_markers import Schedule, Step, TraceExecutor
class SyncedBankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.Lock()
def transfer(self, amount):
"""Transfer funds with proper synchronization."""
with self.lock:
# frontrun: after_read
current = self.balance
new_balance = current + amount
# frontrun: before_write
self.balance = new_balance
return new_balance
# Create an account with $100
account = SyncedBankAccount(balance=100)
# Same schedule as before
schedule = Schedule([
Step("transfer1", "after_read"),
Step("transfer2", "after_read"),
Step("transfer1", "before_write"),
Step("transfer2", "before_write"),
])
# Execute with controlled interleaving
executor = TraceExecutor(schedule)
executor.run("transfer1", lambda: account.transfer(50))
executor.run("transfer2", lambda: account.transfer(50))
executor.wait(timeout=5.0)
# With proper locking, the balance is correct
assert account.balance == 200, f"Expected 200, got {account.balance}"
print("Synchronization works: both transfers applied correctly!")
Example 3: Producer-Consumer Pattern¶
Testing a simple producer-consumer queue:
from frontrun.trace_markers import Schedule, Step, TraceExecutor
class SimpleQueue:
def __init__(self):
self.items = []
def put(self, item):
"""Add item to queue."""
# frontrun: before_append
self.items.append(item)
# frontrun: after_append
def get(self):
"""Remove and return first item."""
# frontrun: before_check
if not self.items:
return None
# frontrun: after_check
# frontrun: before_pop
item = self.items.pop(0)
# frontrun: after_pop
return item
queue = SimpleQueue()
# Producer puts, consumer gets
schedule = Schedule([
Step("producer", "before_append"),
Step("consumer", "before_check"),
Step("consumer", "after_check"),
Step("producer", "after_append"),
Step("consumer", "before_pop"),
Step("consumer", "after_pop"),
])
executor = TraceExecutor(schedule)
executor.run("producer", lambda: queue.put("data"))
executor.run("consumer", lambda: queue.get())
executor.wait(timeout=5.0)
print("Producer-consumer executed with controlled interleaving")
Example 4: Async Concurrency Control¶
Testing race conditions in async code using async trace markers:
import asyncio
from frontrun.async_trace_markers import AsyncTraceExecutor
from frontrun.common import Schedule, Step
class AsyncBankAccount:
def __init__(self, balance=0):
self.balance = balance
async def transfer(self, amount):
"""Transfer funds (has a race condition at await points)."""
# frontrun: after_read
current = self.balance
await asyncio.sleep(0) # Yield point for marker
new_balance = current + amount
# frontrun: before_write
await asyncio.sleep(0) # Yield point for marker
self.balance = new_balance
account = AsyncBankAccount(balance=100)
# Define a schedule that triggers the race condition
schedule = Schedule([
Step("task1", "after_read"), # Task1 reads 100
Step("task2", "after_read"), # Task2 reads 100
Step("task1", "before_write"), # Task1 writes 150
Step("task2", "before_write"), # Task2 writes 150
])
executor = AsyncTraceExecutor(schedule)
executor.run({
"task1": lambda: account.transfer(50),
"task2": lambda: account.transfer(50),
})
# Race condition: one transfer was lost
assert account.balance == 150, f"Expected 150, got {account.balance}"
print("Async race condition detected!")
Example 5: Finding Race Conditions Automatically¶
Using bytecode instrumentation to automatically explore interleavings:
from frontrun.bytecode import explore_interleavings
class Counter:
def __init__(self, value=0):
self.value = value
def increment(self):
temp = self.value
self.value = temp + 1
# Automatically explore different interleavings
result = explore_interleavings(
setup=lambda: Counter(value=0),
threads=[
lambda c: c.increment(),
lambda c: c.increment(),
],
invariant=lambda c: c.value == 2, # Should hold if no races
max_attempts=200,
max_ops=200,
seed=42,
)
if not result.property_holds:
print(f"Race condition found!")
print(f"Explored {result.num_explored} different interleavings")
print(f"Counterexample: value = {result.counterexample.value}")
else:
print("No race conditions found in explored interleavings")
Real-World Case Study: SQLAlchemy ORM Race Condition¶
For a comprehensive walkthrough of a real-world race condition in SQLAlchemy ORM code running against PostgreSQL, see SQLAlchemy Lost-Update Race Condition. That case study demonstrates how Frontrun detects lost-update race conditions using both trace markers and bytecode exploration approaches.