Examples

Practical examples of using Frontrun to test concurrent code.

Bank Account Transfer (Lost Update)

Two concurrent transfers each read the balance before either writes. The second write overwrites the first, losing one transfer:

from frontrun.common import Schedule, Step
from frontrun.trace_markers import TraceExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    def transfer(self, amount):
        current = self.balance  # frontrun: read_balance
        new_balance = current + amount
        self.balance = new_balance  # frontrun: write_balance
        return new_balance

account = BankAccount(balance=100)

# Both threads read before either writes
schedule = Schedule([
    Step("transfer1", "read_balance"),    # T1 reads 100
    Step("transfer2", "read_balance"),    # T2 reads 100
    Step("transfer1", "write_balance"),   # T1 writes 150
    Step("transfer2", "write_balance"),   # T2 writes 150
])

executor = TraceExecutor(schedule)
executor.run("transfer1", lambda: account.transfer(50))
executor.run("transfer2", lambda: account.transfer(50))
executor.wait(timeout=5.0)

assert account.balance == 150  # should be 200; one transfer lost

Correct Synchronization with Locks

The same code with a lock eliminates the race. Frontrun’s trace markers still control the scheduling, but the lock serializes the critical section regardless of which thread arrives first:

import threading
from frontrun.common import Schedule, Step
from frontrun.trace_markers import TraceExecutor

class SyncedBankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.lock = threading.Lock()

    def transfer(self, amount):
        with self.lock:
            current = self.balance  # frontrun: read_balance
            new_balance = current + amount
            self.balance = new_balance  # frontrun: write_balance
        return new_balance

account = SyncedBankAccount(balance=100)

schedule = Schedule([
    Step("transfer1", "read_balance"),
    Step("transfer2", "read_balance"),
    Step("transfer1", "write_balance"),
    Step("transfer2", "write_balance"),
])

executor = TraceExecutor(schedule)
executor.run("transfer1", lambda: account.transfer(50))
executor.run("transfer2", lambda: account.transfer(50))
executor.wait(timeout=5.0)

assert account.balance == 200  # both transfers applied correctly

Automatic Race Finding with DPOR

DPOR systematically explores all meaningfully different interleavings. No markers needed — it detects shared-memory conflicts automatically:

from frontrun.dpor import explore_dpor

class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        temp = self.value
        self.value = temp + 1

result = explore_dpor(
    setup=Counter,
    threads=[lambda c: c.increment(), lambda c: c.increment()],
    invariant=lambda c: c.value == 2,
)

assert result.property_holds, result.explanation

Output when the race is found:

Race condition found after 2 interleavings.

  Write-write conflict: threads 0 and 1 both wrote to value.

  Thread 0 | counter.py:7             temp = self.value
           | [read Counter.value]
  Thread 0 | counter.py:8             self.value = temp + 1
           | [write Counter.value]
  Thread 1 | counter.py:7             temp = self.value
           | [read Counter.value]
  Thread 1 | counter.py:8             self.value = temp + 1
           | [write Counter.value]

  Reproduced 10/10 times (100%)

Automatic Race Finding with Bytecode Exploration

Bytecode exploration generates random opcode-level schedules. It often finds races very quickly and can catch races invisible to DPOR (e.g. shared state in C extensions), but the error traces are less interpretable:

from frontrun.bytecode import explore_interleavings

class Counter:
    def __init__(self, value=0):
        self.value = value

    def increment(self):
        temp = self.value
        self.value = temp + 1

result = explore_interleavings(
    setup=lambda: Counter(value=0),
    threads=[
        lambda c: c.increment(),
        lambda c: c.increment(),
    ],
    invariant=lambda c: c.value == 2,
    max_attempts=200,
    max_ops=200,
    seed=42,
)

assert result.property_holds, result.explanation

Output:

Race condition found after 1 interleavings.

  Lost update: threads 0 and 1 both read value before either wrote it back.

  Thread 1 | counter.py:7             temp = self.value
           | [read value]
  Thread 0 | counter.py:7             temp = self.value
           | [read value]
  Thread 1 | counter.py:8             self.value = temp + 1
           | [write value]
  Thread 0 | counter.py:8             self.value = temp + 1
           | [write value]

  Reproduced 10/10 times (100%)

Async Concurrency Control

Async trace markers let you control interleaving at await boundaries:

from frontrun import TraceExecutor
from frontrun.common import Schedule, Step

class AsyncBankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    async def get_balance(self):
        return self.balance

    async def set_balance(self, value):
        self.balance = value

    async def transfer(self, amount):
        # frontrun: read_balance
        current = await self.get_balance()
        new_balance = current + amount
        # frontrun: write_balance
        await self.set_balance(new_balance)

account = AsyncBankAccount(balance=100)

schedule = Schedule([
    Step("task1", "read_balance"),
    Step("task2", "read_balance"),
    Step("task1", "write_balance"),
    Step("task2", "write_balance"),
])

executor = TraceExecutor(schedule)
executor.run({
    "task1": lambda: account.transfer(50),
    "task2": lambda: account.transfer(50),
})

assert account.balance == 150  # one transfer lost

Interactive HTML Exploration Reports

explore_dpor() can write a self-contained interactive HTML report that lets you step through every explored execution, inspect thread switch-points, and see the conflicting attribute accesses that caused each reordering.

Generating a report from pytest — pass --frontrun-report PATH to the frontrun test runner:

frontrun pytest tests/ --frontrun-report dpor_report.html

Generating a report from a script — set _global_report_path before calling explore_dpor():

import frontrun._report
from frontrun.dpor import explore_dpor

class Accounts:
    def __init__(self) -> None:
        self.a = 100
        self.b = 100
        self.c = 100

def transfer_a_to_b(accounts: Accounts) -> None:
    if accounts.a >= 60:
        accounts.a -= 60
        accounts.b += 60

def transfer_b_to_c(accounts: Accounts) -> None:
    if accounts.b >= 80:
        accounts.b -= 80
        accounts.c += 80

frontrun._report._global_report_path = "dpor_report.html"
try:
    result = explore_dpor(
        setup=Accounts,
        threads=[transfer_a_to_b, transfer_b_to_c],
        invariant=lambda accs: accs.a + accs.b + accs.c == 300,
        preemption_bound=2,
    )
    print(result.explanation)
finally:
    frontrun._report._global_report_path = None

The race here is a classic lost update on account B: both threads read-modify-write accounts.b without any lock. When one thread’s write overwrites the other’s, the total balance drifts away from 300.

The report shows every explored interleaving as a timeline. Executions where the invariant holds are shown in green; failing ones in red. Click any execution button or use the arrow keys to step through them. Each switch-point panel shows the source line and opcode where the scheduler switched threads, making it easy to pinpoint exactly which access caused the conflict.

Example reports (generated at documentation build time):

Run any example directly to regenerate its report:

python examples/dpor_bank_transfer.py my_report.html
python examples/dpor_bank_transfer_locked.py my_report.html
python examples/dpor_sqlite_counter.py my_report.html
python examples/dpor_sqlite_counter.py my_report.html fixed
python examples/dpor_dining_philosophers.py my_report.html

Locking and path reduction

The locked bank transfer illustrates an important property of DPOR: when operations are protected by a lock, DPOR explores far fewer interleavings because the only meaningful ordering question is which thread acquires the lock first. Compare the racy report against the locked report to see this directly.

The stop_on_first=False parameter (used in all the examples above) tells DPOR to continue exploring after the first failure. The default stop_on_first=True stops as soon as a violation is found, which is usually what you want in a test suite.

Real-World Case Study: SQLAlchemy ORM

For a walkthrough of a real lost-update race in SQLAlchemy ORM code running against PostgreSQL, see SQLAlchemy Lost-Update Race Condition. That case study demonstrates detection with trace markers, bytecode exploration, and DPOR with C-level I/O interception via LD_PRELOAD.