API Reference

Common Data Structures

Shared data structures for frontrun.

class frontrun.common.Step(execution_name, marker_name)[source]

Bases: object

Represents a single step in the execution schedule.

Parameters:
  • execution_name (str)

  • marker_name (str)

execution_name

The name of the execution unit (thread/task) that should execute this step

Type:

str

marker_name

The marker name that identifies this synchronization point

Type:

str

execution_name: str
marker_name: str
class frontrun.common.Schedule(steps)[source]

Bases: object

Defines the execution order for tasks at synchronization points.

A schedule is a linear sequence of steps that specify which task should execute which marker in order.

Initialize a schedule with a list of steps.

Parameters:

steps (list[Step]) – Ordered list of Step objects defining the execution sequence

__init__(steps)[source]

Initialize a schedule with a list of steps.

Parameters:

steps (list[Step]) – Ordered list of Step objects defining the execution sequence

class frontrun.common.InterleavingResult(property_holds, counterexample=None, num_explored=0, unique_interleavings=0, failures=<factory>, explanation=None, reproduction_attempts=0, reproduction_successes=0)[source]

Bases: object

Result of exploring interleavings.

Returned by explore_interleavings(), explore_interleavings(), and explore_dpor().

Parameters:
  • property_holds (bool)

  • counterexample (list[int] | None)

  • num_explored (int)

  • unique_interleavings (int)

  • failures (list[tuple[int, list[int]]])

  • explanation (str | None)

  • reproduction_attempts (int)

  • reproduction_successes (int)

property_holds

True if the invariant held under all tested interleavings.

Type:

bool

counterexample

First schedule that violated the invariant (if any).

Type:

list[int] | None

num_explored

How many interleavings were tested.

Type:

int

unique_interleavings

Number of distinct schedule orderings observed. Provides a lower bound on interleaving-space coverage. Relevant for random bytecode exploration; DPOR always explores distinct interleavings so this equals num_explored.

Type:

int

failures

All failing (execution_number, schedule) pairs. Only populated by DPOR when stop_on_first=False.

Type:

list[tuple[int, list[int]]]

explanation

Human-readable explanation of the race condition, showing interleaved source lines and the conflict pattern. None if no race was found.

Type:

str | None

reproduction_attempts

Number of times the counterexample schedule was re-run to test reproducibility. 0 if no counterexample.

Type:

int

reproduction_successes

How many of those re-runs reproduced the invariant violation.

Type:

int

property_holds: bool
counterexample: list[int] | None = None
num_explored: int = 0
unique_interleavings: int = 0
failures: list[tuple[int, list[int]]]
explanation: str | None = None
reproduction_attempts: int = 0
reproduction_successes: int = 0

DPOR (Systematic Exploration)

Bytecode-tracing DPOR (Dynamic Partial Order Reduction) for frontrun.

This module implements systematic interleaving exploration using DPOR, completely separate from the existing bytecode.py random exploration.

The approach: 1. A Rust DPOR engine (frontrun._dpor) manages the exploration tree,

vector clocks, and backtrack set computation.

  1. Python drives execution: runs threads under sys.settrace opcode tracing, uses a shadow stack to detect shared-memory accesses, and feeds access/sync events to the Rust engine.

  2. Cooperative threading primitives (lock, event, etc.) are monkey-patched to yield control back to the DPOR scheduler and report synchronization events for happens-before tracking.

Usage:

from frontrun.dpor import explore_dpor

class Counter:
    def __init__(self):
        self.value = 0
    def increment(self):
        temp = self.value
        self.value = temp + 1

result = explore_dpor(
    setup=lambda: Counter(),
    threads=[lambda c: c.increment(), lambda c: c.increment()],
    invariant=lambda c: c.value == 2,
)
assert result.property_holds, result.explanation  # fails — lost update!
frontrun.dpor.explore_dpor(setup, threads, invariant, max_executions=None, preemption_bound=2, max_branches=100000, timeout_per_run=5.0, stop_on_first=True, detect_io=True, deadlock_timeout=5.0, reproduce_on_failure=10, total_timeout=None)[source]

Systematically explore interleavings using DPOR.

This is the DPOR replacement for explore_interleavings(). Instead of random sampling, it uses the DPOR algorithm to explore only distinct interleavings (modulo independent operation reordering).

Parameters:
  • setup (Callable[[], T]) – Creates fresh shared state for each execution.

  • threads (list[Callable[[T], None]]) – List of callables, each receiving the shared state.

  • invariant (Callable[[T], bool]) – Predicate over shared state; must be True after all threads complete.

  • max_executions (int | None) – Safety limit on total executions (None = unlimited).

  • preemption_bound (int | None) – Limit on preemptions per execution. 2 catches most bugs. None = unbounded (full DPOR).

  • max_branches (int) – Maximum scheduling points per execution.

  • timeout_per_run (float) – Timeout per execution in seconds.

  • stop_on_first (bool) – If True (default), stop exploring as soon as the first invariant violation is found. Set to False to collect all failing interleavings.

  • detect_io (bool) – Automatically detect socket/file I/O operations and report them as resource accesses (default True). Two threads accessing the same endpoint or file will be treated as conflicting, enabling DPOR to explore their orderings.

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

  • reproduce_on_failure (int) – When a counterexample is found, replay the same schedule this many times to measure reproducibility (default 10). Set to 0 to skip reproduction testing.

  • total_timeout (float | None) – Maximum total time in seconds for the entire exploration (default None = unlimited). When exceeded, returns results gathered so far.

Returns:

InterleavingResult with exploration statistics and any counterexample found.

Return type:

InterleavingResult

Note

When running under pytest, this function requires the frontrun CLI wrapper (frontrun pytest ...) or the --frontrun-patch-locks flag. Without it, the test is automatically skipped.

Bytecode Instrumentation

Bytecode-level deterministic concurrency testing.

Uses sys.settrace with f_trace_opcodes to intercept execution at every bytecode instruction, enabling fine-grained control over thread interleaving.

This pairs naturally with property-based testing: rather than specifying exact schedules, generate random interleavings and check that invariants hold (or that bugs can be found).

The core insight: CPython context switches happen between bytecode instructions. By controlling which thread gets to execute each instruction, we can explore the full space of possible interleavings.

Example — find a race condition with random schedule exploration:

>>> from frontrun.bytecode import explore_interleavings
>>>
>>> class Counter:
...     def __init__(self):
...         self.value = 0
...     def increment(self):
...         temp = self.value
...         self.value = temp + 1
>>>
>>> result = explore_interleavings(
...     setup=lambda: Counter(),
...     threads=[lambda c: c.increment(), lambda c: c.increment()],
...     invariant=lambda c: c.value == 2,
... )
>>> assert result.property_holds, result.explanation  # fails — lost update!
frontrun.bytecode.explore_interleavings(setup, threads, invariant, max_attempts=200, max_ops=300, timeout_per_run=5.0, seed=None, debug=False, detect_io=True, deadlock_timeout=5.0, reproduce_on_failure=10, total_timeout=None)[source]

Search for interleavings that violate an invariant.

Note

When running under pytest, this function requires the frontrun CLI wrapper (frontrun pytest ...) or the --frontrun-patch-locks flag. Without it, the test is automatically skipped.

Generates random opcode-level schedules and tests whether the invariant holds under each one. If a violation is found, returns immediately with the counterexample schedule.

This is the bytecode-level analogue of property-based testing: instead of generating random inputs, we generate random interleavings and check that the result satisfies an invariant.

Parameters:
  • setup (Callable[[], T]) – Returns fresh shared state for each attempt.

  • threads (list[Callable[[T], None]]) – Callables that each receive the state as their argument.

  • invariant (Callable[[T], bool]) – Predicate on the state. Returns True if the property holds.

  • max_attempts (int) – How many random interleavings to try.

  • max_ops (int) – Maximum schedule length per attempt.

  • timeout_per_run (float) – Timeout for each individual run.

  • seed (int | None) – Optional RNG seed for reproducibility.

  • detect_io (bool) – Automatically detect socket/file I/O and treat them as scheduling points (default True).

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

  • reproduce_on_failure (int) – When a counterexample is found, replay the same schedule this many times to measure reproducibility (default 10). Set to 0 to skip reproduction testing.

  • total_timeout (float | None) – Maximum total time in seconds for the entire exploration (default None = unlimited). When exceeded, returns results gathered so far.

  • debug (bool)

Returns:

InterleavingResult with the outcome. The unique_interleavings field reports how many distinct execution orderings were observed, providing a lower bound on exploration coverage.

Return type:

InterleavingResult

Trace Markers

Frontrun: Deterministic thread interleaving using sys.settrace and comment markers.

This module provides a mechanism to control thread execution order by marking synchronization points in code with special comments and enforcing a predefined execution schedule using Python’s tracing facilities.

Example usage:

```python from frontrun.common import Schedule, Step from frontrun.trace_markers import TraceExecutor

def worker_function():

x = read_data() # frontrun: read write_data(x) # frontrun: write

schedule = Schedule([

Step(“thread1”, “read”), Step(“thread2”, “read”), Step(“thread1”, “write”), Step(“thread2”, “write”),

])

executor = TraceExecutor(schedule) executor.run(“thread1”, worker_function) executor.run(“thread2”, worker_function) executor.wait() ```

class frontrun.trace_markers.MarkerRegistry[source]

Bases: object

Tracks marker locations in source code for efficient lookup.

This class scans source files to find lines with frontrun markers and maintains a mapping from (filename, line_number) to marker names.

scan_frame(frame)[source]

Scan the source file for the given frame to find all markers.

Parameters:

frame (Any) – A Python frame object from the trace function

Return type:

None

get_marker(filename, lineno)[source]

Get the marker name for a specific file location.

Parameters:
  • filename (str) – The source file path

  • lineno (int) – The line number

Returns:

The marker name if found, None otherwise

Return type:

str | None

class frontrun.trace_markers.ThreadCoordinator(schedule, *, deadlock_timeout=5.0)[source]

Bases: object

Coordinates thread execution according to a schedule.

This class manages the synchronization between threads, ensuring that each thread executes markers in the order specified by the schedule.

Parameters:
  • schedule (Schedule)

  • deadlock_timeout (float)

wait_for_turn(execution_name, marker_name, *, _reacquire_execution_lock=False)[source]

Block until it’s this execution unit’s turn to execute this marker.

When _reacquire_execution_lock is True (used by the trace executors), _execution_lock is acquired while the condition lock is still held, before returning. This prevents other threads from racing ahead between being notified and the caller resuming execution. The caller must have already released _execution_lock before calling this method.

Parameters:
  • execution_name (str) – The name of the calling execution unit

  • marker_name (str) – The marker that was hit

  • _reacquire_execution_lock (bool)

report_error(error)[source]

Report an error and wake up all waiting threads.

Parameters:

error (Exception) – The exception that occurred

is_finished()[source]

Check if the schedule has completed or encountered an error.

Return type:

bool

class frontrun.trace_markers.TraceExecutor(schedule, *, deadlock_timeout=5.0)[source]

Bases: object

Executes threads with interlaced execution according to a schedule.

This is the main interface for the frontrun library. It sets up tracing for each thread and coordinates their execution.

Initialize the executor with a schedule.

Parameters:
  • schedule (Schedule) – The Schedule defining the execution order

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

__init__(schedule, *, deadlock_timeout=5.0)[source]

Initialize the executor with a schedule.

Parameters:
  • schedule (Schedule) – The Schedule defining the execution order

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

run(execution_name, target, args=(), kwargs=None)[source]

Start a new thread with tracing enabled.

Parameters:
  • execution_name (str) – The name for this execution unit (must match schedule)

  • target (Callable[[...], None]) – The function to execute in the thread

  • args (tuple[Any, ...]) – Positional arguments for the target function

  • kwargs (dict[str, Any] | None) – Keyword arguments for the target function

wait(timeout=None)[source]

Wait for all threads to complete.

Parameters:

timeout (float | None) – Optional timeout in seconds

Raises:
  • TimeoutError – If threads don’t complete within the timeout

  • Any exception that occurred in a thread during execution

reset()[source]

Reset the executor for another run (for testing purposes).

frontrun.trace_markers.frontrun(schedule, threads, thread_args=None, thread_kwargs=None, timeout=None, deadlock_timeout=5.0)[source]

Convenience function to run multiple threads with a schedule.

Parameters:
  • schedule (Schedule) – The Schedule defining execution order

  • threads (dict[str, Callable[[...], None]]) – Dictionary mapping execution unit names to their target functions

  • thread_args (dict[str, tuple[Any, ...]] | None) – Optional dictionary mapping execution unit names to argument tuples

  • thread_kwargs (dict[str, dict[str, Any]] | None) – Optional dictionary mapping execution unit names to keyword argument dicts

  • timeout (float | None) – Optional timeout for waiting

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

Returns:

The TraceExecutor instance (useful for inspection)

Return type:

TraceExecutor

Example

```python frontrun(

schedule=Schedule([Step(“t1”, “marker1”), Step(“t2”, “marker1”)]), threads={“t1”: worker_func, “t2”: worker_func},

)

Async Trace Markers

Frontrun: Deterministic async task interleaving using comment-based markers.

This module provides a mechanism to control async task execution order by marking synchronization points in code with # frontrun: marker_name comments, matching the elegant syntax of the sync trace_markers module.

Key Insight: Thread-Based Execution with sys.settrace

This implementation mirrors the sync architecture exactly:

  1. Each async task runs in its own thread via asyncio.run(task_fn())

  2. sys.settrace fires on every line, including inside synchronous sub-coroutines

  3. When a marker is detected, the trace function blocks the thread via ThreadCoordinator.wait_for_turn()

  4. Execution resumes exactly where it paused

This solves the synchronous function body bug: when await self.get_balance() completes synchronously (no internal yields), the trace still fires on every line during that execution. After get_balance() returns, the trace fires for the next line (after the marker), detects the marker, and blocks before that line executes.

Marker Semantics

Marker Placement: Place markers to gate the operations you want to control.

Inline markers (marker on same line as operation):

current = self.balance # frontrun: read_balance

Separate-line markers (marker before operation):

# frontrun: read_balance current = self.balance

Both styles work identically: the marker gates execution of the line, ensuring it only executes after the scheduler approves this task at this marker.

Example usage:

async def worker_function():
    # frontrun: read_data
    x = await read_data()
    # frontrun: write_data
    await write_data(x)

schedule = Schedule([
    Step("task1", "read_data"),
    Step("task2", "read_data"),
    Step("task1", "write_data"),
    Step("task2", "write_data"),
])

executor = AsyncTraceExecutor(schedule)
executor.run({
    'task1': worker_function,
    'task2': worker_function,
})

Or using the convenience function:

async_frontrun(
    schedule=schedule,
    tasks={'task1': worker1, 'task2': worker2},
)
class frontrun.async_trace_markers.AsyncTraceExecutor(schedule, *, deadlock_timeout=5.0)[source]

Bases: object

Executes async tasks with interlaced execution according to a schedule.

This is the main interface for the async frontrun library. It uses comment-based markers (# frontrun: marker_name) to control task execution order.

Unlike the sync version which runs tasks in actual threads, this runs each async task in its own thread with its own event loop via asyncio.run().

Initialize the executor with a schedule.

Parameters:
  • schedule (Schedule) – The Schedule defining the execution order

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

__init__(schedule, *, deadlock_timeout=5.0)[source]

Initialize the executor with a schedule.

Parameters:
  • schedule (Schedule) – The Schedule defining the execution order

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

run(tasks, timeout=10.0)[source]

Run all tasks with controlled interleaving based on comment markers.

This is now a synchronous method that creates threads and waits for them.

Parameters:
  • tasks (dict[str, Callable[[], Coroutine[Any, Any, None]]]) – Dictionary mapping task names to their async functions

  • timeout (float) – Timeout in seconds for all tasks to complete

Raises:
  • TimeoutError – If tasks don’t complete within the timeout

  • Any exception that occurred in a task during execution

Return type:

None

reset()[source]

Reset the executor for another run (for testing purposes).

frontrun.async_trace_markers.async_frontrun(schedule, tasks, task_args=None, task_kwargs=None, timeout=10.0, deadlock_timeout=5.0)[source]

Convenience function to run multiple async tasks with a schedule.

This is now a synchronous function (not async) that creates an executor and runs the tasks.

Tasks use # frontrun: marker_name comments to mark synchronization points. No need to pass marker functions to tasks - the executor automatically detects markers via sys.settrace.

Parameters:
  • schedule (Schedule) – The Schedule defining execution order

  • tasks (dict[str, Callable[[...], Coroutine[Any, Any, None]]]) – Dictionary mapping execution unit names to their async target functions

  • task_args (dict[str, tuple[Any, ...]] | None) – Optional dictionary mapping execution unit names to argument tuples

  • task_kwargs (dict[str, dict[str, Any]] | None) – Optional dictionary mapping execution unit names to keyword argument dicts

  • timeout (float) – Timeout in seconds for the entire execution

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

Returns:

The AsyncTraceExecutor instance (useful for inspection)

Return type:

AsyncTraceExecutor

Example:

async def worker(account, amount):
    # frontrun: before_deposit
    await account.deposit(amount)

async_frontrun(
    schedule=Schedule([
        Step("t1", "before_deposit"),
        Step("t2", "before_deposit")
    ]),
    tasks={"t1": worker, "t2": worker},
    task_args={"t1": (account, 50), "t2": (account, 50)},
)

Async Bytecode Instrumentation

Await-point-level deterministic async concurrency testing.

Uses the shared InterleavedLoop abstraction to control which async task resumes at each await point, enabling fine-grained control over task interleaving.

This pairs naturally with property-based testing: rather than specifying exact schedules, generate random interleavings and check that invariants hold (or that bugs can be found).

The core insight: in async Python, context switches happen ONLY at await points. The event loop is single-threaded. By controlling which task resumes at each await point, we can explore the full space of possible interleavings — and there are far fewer of them than in threaded code.

Example — find a race condition with random schedule exploration:

>>> import asyncio
>>> from frontrun.async_bytecode import explore_interleavings, await_point
>>>
>>> class Counter:
...     def __init__(self):
...         self.value = 0
...     async def increment(self):
...         temp = self.value
...         await await_point()  # Yield control; race can happen here
...         self.value = temp + 1
>>>
>>> result = asyncio.run(explore_interleavings(
...     setup=lambda: Counter(),
...     tasks=[lambda c: c.increment(), lambda c: c.increment()],
...     invariant=lambda c: c.value == 2,
... ))
>>> assert result.property_holds, result.explanation  # fails — lost update!

The await_point() function marks explicit yield points where context switches can occur. This is analogous to bytecode.py’s opcode-level tracing, but for async code the number of interleaving points is much smaller and more explicit.

In production async code, every await is a potential context switch point. For testing, call await await_point() at each location where a race condition could manifest.

async frontrun.async_bytecode.await_point()[source]

Yield to the scheduler at an await point.

Call this at every point where a context switch could happen in your async code. This is the async equivalent of a bytecode opcode — the atomic unit of interleaving.

In typical async code, every await statement is a potential context switch point. For testing race conditions, replace strategic awaits with await await_point() to allow the scheduler to control ordering.

If no scheduler is active (i.e., not running under AsyncBytecodeShuffler), this function returns immediately without blocking.

class frontrun.async_bytecode.AwaitScheduler(schedule, num_tasks, *, deadlock_timeout=5.0)[source]

Bases: InterleavedLoop

Controls async task execution at await-point granularity.

The schedule is a list of task indices. Each entry means “let this task resume from its next await point.” When the schedule is exhausted, all tasks run freely to completion.

Built on the shared InterleavedLoop abstraction, using index-based scheduling as its policy.

Parameters:
  • schedule (list[int])

  • num_tasks (int)

  • deadlock_timeout (float)

should_proceed(task_id, marker=None)[source]

Return True if this task should resume now.

Called while holding the condition lock. Must not await.

Parameters:
  • task_id (Any) – Identity of the calling task (str, int, etc.)

  • marker (Any) – Optional context from the yield point (e.g. a marker name, an (operation, phase) tuple, or None).

Return type:

bool

on_proceed(task_id, marker=None)[source]

Update scheduling state after a task is allowed to proceed.

Called while holding the condition lock, immediately after should_proceed returned True. Must not await.

Parameters:
  • task_id (Any) – Identity of the task that is proceeding.

  • marker (Any) – Same marker value passed to should_proceed.

Return type:

None

property had_error: bool

Check if an error occurred during execution.

class frontrun.async_bytecode.AsyncBytecodeShuffler(scheduler)[source]

Bases: object

Run concurrent async functions with await-point-level interleaving control.

Creates asyncio tasks for each function and delegates to the AwaitScheduler (an InterleavedLoop subclass) for execution and context setup.

Parameters:

scheduler (AwaitScheduler)

async run(funcs, args=None, kwargs=None, timeout=10.0)[source]

Run async functions concurrently with controlled interleaving.

Parameters:
  • funcs (list[Callable[[...], Coroutine[Any, Any, None]]]) – One async callable per task.

  • args (list[tuple[Any, ...]] | None) – Per-task positional args.

  • kwargs (list[dict[str, Any]] | None) – Per-task keyword args.

  • timeout (float) – Max wait time for all tasks.

frontrun.async_bytecode.controlled_interleaving(schedule, num_tasks=2)[source]

Context manager for running async code under a specific interleaving.

Parameters:
  • schedule (list[int]) – List of task indices controlling await-point execution order.

  • num_tasks (int) – Number of tasks.

Yields:

AsyncBytecodeShuffler runner.

Return type:

AsyncGenerator[AsyncBytecodeShuffler, None]

Example

>>> async with controlled_interleaving([0, 1, 0, 1], num_tasks=2) as runner:
...     await runner.run([coro1, coro2])
async frontrun.async_bytecode.run_with_schedule(schedule, setup, tasks, timeout=5.0, deadlock_timeout=5.0)[source]

Run one async interleaving and return the state object.

Parameters:
  • schedule (list[int]) – Await-point-level schedule (list of task indices).

  • setup (Callable[[], Any]) – Returns fresh shared state.

  • tasks (list[Callable[[Any], Coroutine[Any, Any, None]]]) – Async callables that each receive the state as their argument.

  • timeout (float) – Max seconds.

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

Returns:

The state object after execution.

Return type:

Any

async frontrun.async_bytecode.explore_interleavings(setup, tasks, invariant, max_attempts=200, max_ops=100, timeout_per_run=5.0, seed=None, deadlock_timeout=5.0)[source]

Search for async interleavings that violate an invariant.

Generates random await-point-level schedules and tests whether the invariant holds under each one. If a violation is found, returns immediately with the counterexample schedule.

This is the async analogue of property-based testing for concurrency: instead of generating random inputs, we generate random interleavings and check that the result satisfies an invariant.

Note: max_ops defaults to 100 (vs 300 for bytecode.py) because async code has far fewer interleaving points than threaded bytecode execution. Each await_point() call represents a much coarser-grained checkpoint.

Parameters:
  • setup (Callable[[], Any]) – Returns fresh shared state for each attempt.

  • tasks (list[Callable[[Any], Coroutine[Any, Any, None]]]) – Async callables that each receive the state as their argument.

  • invariant (Callable[[Any], bool]) – Predicate on the state. Returns True if the property holds.

  • max_attempts (int) – How many random interleavings to try.

  • max_ops (int) – Maximum schedule length per attempt.

  • timeout_per_run (float) – Timeout for each individual run.

  • seed (int | None) – Optional RNG seed for reproducibility.

  • deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).

Returns:

InterleavingResult with the outcome. The unique_interleavings field reports how many distinct schedule orderings were observed.

Return type:

InterleavingResult

frontrun.async_bytecode.schedule_strategy(num_tasks, max_ops=100)[source]

Hypothesis strategy for generating fair await-point schedules.

Generates schedules as a sequence of rounds, where each round is a random permutation of all task indices. This guarantees every task gets exactly the same number of scheduling slots, preventing starvation.

For use with hypothesis @given decorator in your own tests:

>>> from hypothesis import given
>>> from frontrun.async_bytecode import schedule_strategy, run_with_schedule
>>> import asyncio
>>>
>>> @given(schedule=schedule_strategy(2))
... def test_my_invariant(schedule):
...     state = asyncio.run(run_with_schedule(schedule, setup, tasks))
...     assert state.value == expected

Note: max_ops defaults to 100 (vs 300 for bytecode.py) because async code has far fewer interleaving points. Each schedule entry corresponds to one await_point() call, not one bytecode opcode.

Parameters:
  • num_tasks (int)

  • max_ops (int)

Return type:

Any

Trace Formatting

Trace recording, filtering, and formatting for comprehensible race condition errors.

When frontrun finds a race condition, the raw counterexample is a list of thread indices — one per bytecode instruction. This module transforms that into a human-readable “story” of which source lines executed in which order.

The pipeline: 1. Record a TraceEvent at each opcode during the failing run. 2. Filter to events that touch shared state (LOAD_ATTR/STORE_ATTR, etc.) 3. Deduplicate consecutive events from the same thread on the same source line. 4. Classify the conflict pattern (lost update, order violation, etc.) 5. Format as an interleaved source-line trace.

class frontrun._trace_format.TraceEvent(step_index, thread_id, filename, lineno, function_name, opcode, access_type=None, attr_name=None, obj_type_name=None, call_chain=None)[source]

Bases: object

A single recorded event from the trace.

Parameters:
  • step_index (int)

  • thread_id (int)

  • filename (str)

  • lineno (int)

  • function_name (str)

  • opcode (str)

  • access_type (str | None)

  • attr_name (str | None)

  • obj_type_name (str | None)

  • call_chain (list[str] | None)

step_index: int
thread_id: int
filename: str
lineno: int
function_name: str
opcode: str
access_type: str | None
attr_name: str | None
obj_type_name: str | None
call_chain: list[str] | None
class frontrun._trace_format.SourceLineEvent(thread_id, filename, lineno, function_name, source_line, access_type=None, attr_name=None, obj_type_name=None, call_chain=None)[source]

Bases: object

A deduplicated, source-level event for display.

Parameters:
  • thread_id (int)

  • filename (str)

  • lineno (int)

  • function_name (str)

  • source_line (str)

  • access_type (str | None)

  • attr_name (str | None)

  • obj_type_name (str | None)

  • call_chain (list[str] | None)

thread_id: int
filename: str
lineno: int
function_name: str
source_line: str
access_type: str | None
attr_name: str | None
obj_type_name: str | None
call_chain: list[str] | None
frontrun._trace_format.qualified_name(frame)[source]

Get a qualified function name from a frame (e.g. DB.dict).

Parameters:

frame (Any)

Return type:

str

frontrun._trace_format.build_call_chain(frame, *, filter_fn, max_depth=3)[source]

Walk user-code frames from frame upward, returning qualified names.

filter_fn(filename) -> bool selects which frames are user code (typically frontrun._tracing.should_trace_file()). Returns None when the chain would be empty.

Parameters:
  • frame (Any)

  • filter_fn (Any)

  • max_depth (int)

Return type:

list[str] | None

class frontrun._trace_format.TraceRecorder(*, enabled=True)[source]

Bases: object

Accumulates TraceEvent objects during a single execution.

Thread-safe: multiple threads call record() concurrently, each holding the scheduler lock (so ordering is deterministic).

Parameters:

enabled (bool)

events: list[TraceEvent]
enabled
record(thread_id, frame, opcode=None, access_type=None, attr_name=None, obj=None, obj_type_name=None, call_chain=None)[source]

Record one trace event from a frame object.

Parameters:
  • thread_id (int)

  • frame (Any)

  • opcode (str | None)

  • access_type (str | None)

  • attr_name (str | None)

  • obj (Any)

  • obj_type_name (str | None)

  • call_chain (list[str] | None)

Return type:

None

record_io(thread_id, resource_id, kind)[source]

Record an I/O event that has no Python frame (e.g. C-level socket I/O).

Parameters:
  • thread_id (int)

  • resource_id (str)

  • kind (str)

Return type:

None

record_from_opcode(thread_id, frame)[source]

Record an event using the frame’s current instruction.

Used by the bytecode explorer, which doesn’t do shadow-stack analysis. We inspect the instruction to extract access info.

Parameters:
  • thread_id (int)

  • frame (Any)

Return type:

None

frontrun._trace_format.filter_to_shared_accesses(events)[source]

Keep only events that access shared mutable state.

Parameters:

events (list[TraceEvent])

Return type:

list[TraceEvent]

frontrun._trace_format.deduplicate_to_source_lines(events)[source]

Collapse consecutive events from the same thread+line into one SourceLineEvent.

When multiple opcodes on the same source line produce events (e.g., LOAD_ATTR then STORE_ATTR for self.value += 1), merge them into a single entry with a combined access_type — but only when they access the same (obj_type, attr_name) key. Events with different keys on the same line get separate entries so that filtering can distinguish them later (e.g. an attribute read vs an I/O event).

Parameters:

events (list[TraceEvent])

Return type:

list[SourceLineEvent]

class frontrun._trace_format.ConflictInfo(pattern, summary, attr_name=None)[source]

Bases: object

Description of the conflict pattern found in the trace.

Parameters:
  • pattern (str)

  • summary (str)

  • attr_name (str | None)

pattern: str
summary: str
attr_name: str | None = None
frontrun._trace_format.classify_conflict(events)[source]

Examine a filtered, deduplicated trace and classify the conflict type.

Looks for classic patterns: - Lost update: R_a R_b W_a W_b (or R_a R_b W_b W_a) - Write-write: W_a W_b on same attribute without intervening sync

Parameters:

events (list[SourceLineEvent])

Return type:

ConflictInfo

class frontrun._trace_format.CollapsedRun(count, thread_id)[source]

Bases: object

Placeholder for a collapsed sequence of events from one thread.

Parameters:
  • count (int)

  • thread_id (int)

count: int
thread_id: int
frontrun._trace_format.condense_trace(lines, *, max_lines=30)[source]

Condense a trace to show only the essential interleaving.

Strategy: 1. Always filter to events involved in cross-thread data conflicts

(same attribute accessed by 2+ threads with at least one write). After filtering, re-merge consecutive same-line events that were previously separated by now-removed entries.

  1. If still too long, collapse single-thread runs (keep first/last).

  2. Cap at max_lines.

Returns a mixed list of SourceLineEvent and CollapsedRun placeholders for the formatter to render.

Parameters:
Return type:

list[SourceLineEvent | CollapsedRun]

frontrun._trace_format.format_trace(events, *, num_threads, thread_names=None, num_explored=0, invariant_desc=None, show_opcodes=False, reproduction_attempts=0, reproduction_successes=0, max_lines=30)[source]

Format a trace as a human-readable interleaved source-line display.

Parameters:
  • events (list[TraceEvent]) – Raw trace events from a TraceRecorder.

  • num_threads (int) – Total number of threads.

  • thread_names (list[str] | None) – Optional display names for threads.

  • num_explored (int) – Number of interleavings explored before finding the bug.

  • invariant_desc (str | None) – Description of the violated invariant.

  • show_opcodes (bool) – If True, include opcode-level detail for each line.

  • reproduction_attempts (int) – How many times the schedule was replayed.

  • reproduction_successes (int) – How many replays reproduced the failure.

  • max_lines (int) – Maximum trace lines before condensation (default 30).

Returns:

Multi-line string suitable for printing or attaching to test output.

Return type:

str

Async Scheduler Utilities

Async event loop abstraction for deterministic task interleaving.

This module provides InterleavedLoop, the shared foundation for all async frontrun POCs. It wraps asyncio’s cooperative scheduling to give deterministic control over which task resumes at each yield point.

In async Python, the event loop decides which ready task to resume after each await point. InterleavedLoop intercepts this decision, using a pluggable scheduling policy to control the execution order.

Key insight: async code is single-threaded and cooperative. Context switches happen ONLY at await points. InterleavedLoop exploits this by gating each yield point through an asyncio.Condition — tasks wait until the scheduling policy says it’s their turn.

Both async approaches build on this abstraction: - async_trace_markers (comment annotations): marker-based scheduling - async_bytecode (property-based): index-based scheduling

Each POC subclasses InterleavedLoop and implements two methods: - should_proceed(task_id, marker): return True when a task should resume - on_proceed(task_id, marker): update internal scheduling state

Example — a simple round-robin scheduler:

>>> class RoundRobinLoop(InterleavedLoop):
...     def __init__(self, order):
...         super().__init__()
...         self._order = order
...         self._step = 0
...
...     def should_proceed(self, task_id, marker=None):
...         if self._step >= len(self._order):
...             return True
...         return self._order[self._step] == task_id
...
...     def on_proceed(self, task_id, marker=None):
...         self._step += 1
class frontrun.async_scheduler.InterleavedLoop(*, deadlock_timeout=5.0)[source]

Bases: object

Wrapped event loop for deterministic async task interleaving.

This class controls which async task resumes at each yield point. Tasks call await loop.pause(task_id) at points where a context switch could happen, and the loop’s scheduling policy decides whether the task should proceed or wait.

Subclasses must implement:

should_proceed(task_id, marker): Is it this task’s turn? on_proceed(task_id, marker): Update state after a task proceeds.

The base class provides:

pause(): Yield point that gates on the scheduling policy run_all(): Run tasks with controlled interleaving Error propagation, timeout handling, and done-task tracking

Parameters:

deadlock_timeout (float)

should_proceed(task_id, marker=None)[source]

Return True if this task should resume now.

Called while holding the condition lock. Must not await.

Parameters:
  • task_id (Any) – Identity of the calling task (str, int, etc.)

  • marker (Any) – Optional context from the yield point (e.g. a marker name, an (operation, phase) tuple, or None).

Return type:

bool

on_proceed(task_id, marker=None)[source]

Update scheduling state after a task is allowed to proceed.

Called while holding the condition lock, immediately after should_proceed returned True. Must not await.

Parameters:
  • task_id (Any) – Identity of the task that is proceeding.

  • marker (Any) – Same marker value passed to should_proceed.

Return type:

None

async pause(task_id, marker=None)[source]

Yield point: block until the scheduling policy says to proceed.

Tasks call this at every point where a context switch could happen. The call blocks (yields to the event loop) until should_proceed() returns True for this task, then calls on_proceed() and returns.

Uses all-tasks-waiting detection: if every non-done task is blocked in pause() and none can proceed, deadlock is detected instantly.

Parameters:
  • task_id (Any) – Identity of the calling task.

  • marker (Any) – Optional scheduling context.

Return type:

None

async run_all(task_funcs, timeout=10.0)[source]

Run tasks with controlled interleaving.

Parameters:
  • task_funcs (dict[Any, Callable[[...], Awaitable[None]]] | list[Callable[[...], Awaitable[None]]]) – Either a dict {task_id: async_callable} or a list of async callables (which get integer task_ids 0, 1, 2, …).

  • timeout (float) – Maximum total time to wait for all tasks.

Return type:

None

property had_error: bool

True if an error was reported during execution.