Examples¶
Practical examples of using Frontrun to test concurrent code.
Bank Account Transfer (Lost Update)¶
Two concurrent transfers each read the balance before either writes. The second write overwrites the first, losing one transfer:
from frontrun.common import Schedule, Step
from frontrun.trace_markers import TraceExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def transfer(self, amount):
current = self.balance # frontrun: read_balance
new_balance = current + amount
self.balance = new_balance # frontrun: write_balance
return new_balance
account = BankAccount(balance=100)
# Both threads read before either writes
schedule = Schedule([
Step("transfer1", "read_balance"), # T1 reads 100
Step("transfer2", "read_balance"), # T2 reads 100
Step("transfer1", "write_balance"), # T1 writes 150
Step("transfer2", "write_balance"), # T2 writes 150
])
executor = TraceExecutor(schedule)
executor.run("transfer1", lambda: account.transfer(50))
executor.run("transfer2", lambda: account.transfer(50))
executor.wait(timeout=5.0)
assert account.balance == 150 # should be 200; one transfer lost
Correct Synchronization with Locks¶
The same code with a lock eliminates the race. Frontrun’s trace markers still control the scheduling, but the lock serializes the critical section regardless of which thread arrives first:
import threading
from frontrun.common import Schedule, Step
from frontrun.trace_markers import TraceExecutor
class SyncedBankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.Lock()
def transfer(self, amount):
with self.lock:
current = self.balance # frontrun: read_balance
new_balance = current + amount
self.balance = new_balance # frontrun: write_balance
return new_balance
account = SyncedBankAccount(balance=100)
schedule = Schedule([
Step("transfer1", "read_balance"),
Step("transfer2", "read_balance"),
Step("transfer1", "write_balance"),
Step("transfer2", "write_balance"),
])
executor = TraceExecutor(schedule)
executor.run("transfer1", lambda: account.transfer(50))
executor.run("transfer2", lambda: account.transfer(50))
executor.wait(timeout=5.0)
assert account.balance == 200 # both transfers applied correctly
Automatic Race Finding with DPOR¶
DPOR systematically explores all meaningfully different interleavings. No markers needed — it detects shared-memory conflicts automatically:
from frontrun.dpor import explore_dpor
class Counter:
def __init__(self):
self.value = 0
def increment(self):
temp = self.value
self.value = temp + 1
result = explore_dpor(
setup=Counter,
threads=[lambda c: c.increment(), lambda c: c.increment()],
invariant=lambda c: c.value == 2,
)
assert result.property_holds, result.explanation
Output when the race is found:
Race condition found after 2 interleavings.
Write-write conflict: threads 0 and 1 both wrote to value.
Thread 0 | counter.py:7 temp = self.value
| [read Counter.value]
Thread 0 | counter.py:8 self.value = temp + 1
| [write Counter.value]
Thread 1 | counter.py:7 temp = self.value
| [read Counter.value]
Thread 1 | counter.py:8 self.value = temp + 1
| [write Counter.value]
Reproduced 10/10 times (100%)
Automatic Race Finding with Bytecode Exploration¶
Bytecode exploration generates random opcode-level schedules. It often finds races very quickly and can catch races invisible to DPOR (e.g. shared state in C extensions), but the error traces are less interpretable:
from frontrun.bytecode import explore_interleavings
class Counter:
def __init__(self, value=0):
self.value = value
def increment(self):
temp = self.value
self.value = temp + 1
result = explore_interleavings(
setup=lambda: Counter(value=0),
threads=[
lambda c: c.increment(),
lambda c: c.increment(),
],
invariant=lambda c: c.value == 2,
max_attempts=200,
max_ops=200,
seed=42,
)
assert result.property_holds, result.explanation
Output:
Race condition found after 1 interleavings.
Lost update: threads 0 and 1 both read value before either wrote it back.
Thread 1 | counter.py:7 temp = self.value
| [read value]
Thread 0 | counter.py:7 temp = self.value
| [read value]
Thread 1 | counter.py:8 self.value = temp + 1
| [write value]
Thread 0 | counter.py:8 self.value = temp + 1
| [write value]
Reproduced 10/10 times (100%)
Async Concurrency Control¶
Async trace markers let you control interleaving at await boundaries:
from frontrun.async_trace_markers import AsyncTraceExecutor
from frontrun.common import Schedule, Step
class AsyncBankAccount:
def __init__(self, balance=0):
self.balance = balance
async def get_balance(self):
return self.balance
async def set_balance(self, value):
self.balance = value
async def transfer(self, amount):
# frontrun: read_balance
current = await self.get_balance()
new_balance = current + amount
# frontrun: write_balance
await self.set_balance(new_balance)
account = AsyncBankAccount(balance=100)
schedule = Schedule([
Step("task1", "read_balance"),
Step("task2", "read_balance"),
Step("task1", "write_balance"),
Step("task2", "write_balance"),
])
executor = AsyncTraceExecutor(schedule)
executor.run({
"task1": lambda: account.transfer(50),
"task2": lambda: account.transfer(50),
})
assert account.balance == 150 # one transfer lost
Real-World Case Study: SQLAlchemy ORM¶
For a walkthrough of a real lost-update race in SQLAlchemy ORM code running against PostgreSQL, see SQLAlchemy Lost-Update Race Condition. That case study demonstrates detection with trace markers and bytecode exploration, and discusses why DPOR cannot detect this particular race (the shared state lives in the database, not in Python memory).