API Reference¶
Common Data Structures¶
Shared data structures for frontrun.
- class frontrun.common.Step(execution_name, marker_name)[source]¶
Bases:
objectRepresents a single step in the execution schedule.
- Parameters:
execution_name (str)
marker_name (str)
- execution_name¶
The name of the execution unit (thread/task) that should execute this step
- Type:
str
- marker_name¶
The marker name that identifies this synchronization point
- Type:
str
- execution_name: str¶
- marker_name: str¶
- class frontrun.common.Schedule(steps)[source]¶
Bases:
objectDefines the execution order for tasks at synchronization points.
A schedule is a linear sequence of steps that specify which task should execute which marker in order.
Initialize a schedule with a list of steps.
- Parameters:
steps (list[Step]) – Ordered list of Step objects defining the execution sequence
- class frontrun.common.InterleavingResult(property_holds, counterexample=None, num_explored=0, unique_interleavings=0, failures=<factory>, explanation=None, reproduction_attempts=0, reproduction_successes=0)[source]¶
Bases:
objectResult of exploring interleavings.
Returned by
explore_interleavings(),explore_interleavings(), andexplore_dpor().- Parameters:
property_holds (bool)
counterexample (list[int] | None)
num_explored (int)
unique_interleavings (int)
failures (list[tuple[int, list[int]]])
explanation (str | None)
reproduction_attempts (int)
reproduction_successes (int)
- property_holds¶
True if the invariant held under all tested interleavings.
- Type:
bool
- counterexample¶
First schedule that violated the invariant (if any).
- Type:
list[int] | None
- num_explored¶
How many interleavings were tested.
- Type:
int
- unique_interleavings¶
Number of distinct schedule orderings observed. Provides a lower bound on interleaving-space coverage. Relevant for random bytecode exploration; DPOR always explores distinct interleavings so this equals
num_explored.- Type:
int
- failures¶
All failing (execution_number, schedule) pairs. Only populated by DPOR when
stop_on_first=False.- Type:
list[tuple[int, list[int]]]
- explanation¶
Human-readable explanation of the race condition, showing interleaved source lines and the conflict pattern. None if no race was found.
- Type:
str | None
- reproduction_attempts¶
Number of times the counterexample schedule was re-run to test reproducibility. 0 if no counterexample.
- Type:
int
- reproduction_successes¶
How many of those re-runs reproduced the invariant violation.
- Type:
int
- property_holds: bool¶
- counterexample: list[int] | None = None¶
- num_explored: int = 0¶
- unique_interleavings: int = 0¶
- failures: list[tuple[int, list[int]]]¶
- explanation: str | None = None¶
- reproduction_attempts: int = 0¶
- reproduction_successes: int = 0¶
DPOR (Systematic Exploration)¶
Bytecode-tracing DPOR (Dynamic Partial Order Reduction) for frontrun.
This module implements systematic interleaving exploration using DPOR, completely separate from the existing bytecode.py random exploration.
The approach: 1. A Rust DPOR engine (frontrun._dpor) manages the exploration tree,
vector clocks, and backtrack set computation.
Python drives execution: runs threads under sys.settrace opcode tracing, uses a shadow stack to detect shared-memory accesses, and feeds access/sync events to the Rust engine.
Cooperative threading primitives (lock, event, etc.) are monkey-patched to yield control back to the DPOR scheduler and report synchronization events for happens-before tracking.
Usage:
from frontrun.dpor import explore_dpor
class Counter:
def __init__(self):
self.value = 0
def increment(self):
temp = self.value
self.value = temp + 1
result = explore_dpor(
setup=lambda: Counter(),
threads=[lambda c: c.increment(), lambda c: c.increment()],
invariant=lambda c: c.value == 2,
)
assert result.property_holds, result.explanation # fails — lost update!
- frontrun.dpor.explore_dpor(setup, threads, invariant, max_executions=None, preemption_bound=2, max_branches=100000, timeout_per_run=5.0, stop_on_first=True, detect_io=True, deadlock_timeout=5.0, reproduce_on_failure=10, total_timeout=None)[source]¶
Systematically explore interleavings using DPOR.
This is the DPOR replacement for
explore_interleavings(). Instead of random sampling, it uses the DPOR algorithm to explore only distinct interleavings (modulo independent operation reordering).- Parameters:
setup (Callable[[], T]) – Creates fresh shared state for each execution.
threads (list[Callable[[T], None]]) – List of callables, each receiving the shared state.
invariant (Callable[[T], bool]) – Predicate over shared state; must be True after all threads complete.
max_executions (int | None) – Safety limit on total executions (None = unlimited).
preemption_bound (int | None) – Limit on preemptions per execution. 2 catches most bugs. None = unbounded (full DPOR).
max_branches (int) – Maximum scheduling points per execution.
timeout_per_run (float) – Timeout per execution in seconds.
stop_on_first (bool) – If True (default), stop exploring as soon as the first invariant violation is found. Set to False to collect all failing interleavings.
detect_io (bool) – Automatically detect socket/file I/O operations and report them as resource accesses (default True). Two threads accessing the same endpoint or file will be treated as conflicting, enabling DPOR to explore their orderings.
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
reproduce_on_failure (int) – When a counterexample is found, replay the same schedule this many times to measure reproducibility (default 10). Set to 0 to skip reproduction testing.
total_timeout (float | None) – Maximum total time in seconds for the entire exploration (default None = unlimited). When exceeded, returns results gathered so far.
- Returns:
InterleavingResult with exploration statistics and any counterexample found.
- Return type:
Note
When running under pytest, this function requires the
frontrunCLI wrapper (frontrun pytest ...) or the--frontrun-patch-locksflag. Without it, the test is automatically skipped.
Bytecode Instrumentation¶
Bytecode-level deterministic concurrency testing.
Uses sys.settrace with f_trace_opcodes to intercept execution at every bytecode instruction, enabling fine-grained control over thread interleaving.
This pairs naturally with property-based testing: rather than specifying exact schedules, generate random interleavings and check that invariants hold (or that bugs can be found).
The core insight: CPython context switches happen between bytecode instructions. By controlling which thread gets to execute each instruction, we can explore the full space of possible interleavings.
Example — find a race condition with random schedule exploration:
>>> from frontrun.bytecode import explore_interleavings
>>>
>>> class Counter:
... def __init__(self):
... self.value = 0
... def increment(self):
... temp = self.value
... self.value = temp + 1
>>>
>>> result = explore_interleavings(
... setup=lambda: Counter(),
... threads=[lambda c: c.increment(), lambda c: c.increment()],
... invariant=lambda c: c.value == 2,
... )
>>> assert result.property_holds, result.explanation # fails — lost update!
- frontrun.bytecode.explore_interleavings(setup, threads, invariant, max_attempts=200, max_ops=300, timeout_per_run=5.0, seed=None, debug=False, detect_io=True, deadlock_timeout=5.0, reproduce_on_failure=10, total_timeout=None)[source]¶
Search for interleavings that violate an invariant.
Note
When running under pytest, this function requires the
frontrunCLI wrapper (frontrun pytest ...) or the--frontrun-patch-locksflag. Without it, the test is automatically skipped.Generates random opcode-level schedules and tests whether the invariant holds under each one. If a violation is found, returns immediately with the counterexample schedule.
This is the bytecode-level analogue of property-based testing: instead of generating random inputs, we generate random interleavings and check that the result satisfies an invariant.
- Parameters:
setup (Callable[[], T]) – Returns fresh shared state for each attempt.
threads (list[Callable[[T], None]]) – Callables that each receive the state as their argument.
invariant (Callable[[T], bool]) – Predicate on the state. Returns True if the property holds.
max_attempts (int) – How many random interleavings to try.
max_ops (int) – Maximum schedule length per attempt.
timeout_per_run (float) – Timeout for each individual run.
seed (int | None) – Optional RNG seed for reproducibility.
detect_io (bool) – Automatically detect socket/file I/O and treat them as scheduling points (default True).
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
reproduce_on_failure (int) – When a counterexample is found, replay the same schedule this many times to measure reproducibility (default 10). Set to 0 to skip reproduction testing.
total_timeout (float | None) – Maximum total time in seconds for the entire exploration (default None = unlimited). When exceeded, returns results gathered so far.
debug (bool)
- Returns:
InterleavingResult with the outcome. The
unique_interleavingsfield reports how many distinct execution orderings were observed, providing a lower bound on exploration coverage.- Return type:
Trace Markers¶
Frontrun: Deterministic thread interleaving using sys.settrace and comment markers.
This module provides a mechanism to control thread execution order by marking synchronization points in code with special comments and enforcing a predefined execution schedule using Python’s tracing facilities.
- Example usage:
```python from frontrun.common import Schedule, Step from frontrun.trace_markers import TraceExecutor
- def worker_function():
x = read_data() # frontrun: read write_data(x) # frontrun: write
- schedule = Schedule([
Step(“thread1”, “read”), Step(“thread2”, “read”), Step(“thread1”, “write”), Step(“thread2”, “write”),
])
executor = TraceExecutor(schedule) executor.run(“thread1”, worker_function) executor.run(“thread2”, worker_function) executor.wait() ```
- class frontrun.trace_markers.MarkerRegistry[source]¶
Bases:
objectTracks marker locations in source code for efficient lookup.
This class scans source files to find lines with frontrun markers and maintains a mapping from (filename, line_number) to marker names.
- class frontrun.trace_markers.ThreadCoordinator(schedule, *, deadlock_timeout=5.0)[source]¶
Bases:
objectCoordinates thread execution according to a schedule.
This class manages the synchronization between threads, ensuring that each thread executes markers in the order specified by the schedule.
- Parameters:
schedule (Schedule)
deadlock_timeout (float)
- wait_for_turn(execution_name, marker_name, *, _reacquire_execution_lock=False)[source]¶
Block until it’s this execution unit’s turn to execute this marker.
When _reacquire_execution_lock is
True(used by the trace executors),_execution_lockis acquired while the condition lock is still held, before returning. This prevents other threads from racing ahead between being notified and the caller resuming execution. The caller must have already released_execution_lockbefore calling this method.- Parameters:
execution_name (str) – The name of the calling execution unit
marker_name (str) – The marker that was hit
_reacquire_execution_lock (bool)
- class frontrun.trace_markers.TraceExecutor(schedule, *, deadlock_timeout=5.0)[source]¶
Bases:
objectExecutes threads with interlaced execution according to a schedule.
This is the main interface for the frontrun library. It sets up tracing for each thread and coordinates their execution.
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- __init__(schedule, *, deadlock_timeout=5.0)[source]¶
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- run(execution_name, target, args=(), kwargs=None)[source]¶
Start a new thread with tracing enabled.
- Parameters:
execution_name (str) – The name for this execution unit (must match schedule)
target (Callable[[...], None]) – The function to execute in the thread
args (tuple[Any, ...]) – Positional arguments for the target function
kwargs (dict[str, Any] | None) – Keyword arguments for the target function
- frontrun.trace_markers.frontrun(schedule, threads, thread_args=None, thread_kwargs=None, timeout=None, deadlock_timeout=5.0)[source]¶
Convenience function to run multiple threads with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining execution order
threads (dict[str, Callable[[...], None]]) – Dictionary mapping execution unit names to their target functions
thread_args (dict[str, tuple[Any, ...]] | None) – Optional dictionary mapping execution unit names to argument tuples
thread_kwargs (dict[str, dict[str, Any]] | None) – Optional dictionary mapping execution unit names to keyword argument dicts
timeout (float | None) – Optional timeout for waiting
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
The TraceExecutor instance (useful for inspection)
- Return type:
Example
schedule=Schedule([Step(“t1”, “marker1”), Step(“t2”, “marker1”)]), threads={“t1”: worker_func, “t2”: worker_func},
)¶
Async Trace Markers¶
Frontrun: Deterministic async task interleaving using comment-based markers.
This module provides a mechanism to control async task execution order by marking
synchronization points in code with # frontrun: marker_name comments,
matching the elegant syntax of the sync trace_markers module.
Key Insight: Thread-Based Execution with sys.settrace¶
This implementation mirrors the sync architecture exactly:
Each async task runs in its own thread via
asyncio.run(task_fn())sys.settracefires on every line, including inside synchronous sub-coroutinesWhen a marker is detected, the trace function blocks the thread via
ThreadCoordinator.wait_for_turn()Execution resumes exactly where it paused
This solves the synchronous function body bug: when await self.get_balance()
completes synchronously (no internal yields), the trace still fires on every
line during that execution. After get_balance() returns, the trace fires for
the next line (after the marker), detects the marker, and blocks before that
line executes.
Marker Semantics¶
Marker Placement: Place markers to gate the operations you want to control.
- Inline markers (marker on same line as operation):
current = self.balance # frontrun: read_balance
- Separate-line markers (marker before operation):
# frontrun: read_balance current = self.balance
Both styles work identically: the marker gates execution of the line, ensuring it only executes after the scheduler approves this task at this marker.
Example usage:
async def worker_function():
# frontrun: read_data
x = await read_data()
# frontrun: write_data
await write_data(x)
schedule = Schedule([
Step("task1", "read_data"),
Step("task2", "read_data"),
Step("task1", "write_data"),
Step("task2", "write_data"),
])
executor = AsyncTraceExecutor(schedule)
executor.run({
'task1': worker_function,
'task2': worker_function,
})
Or using the convenience function:
async_frontrun(
schedule=schedule,
tasks={'task1': worker1, 'task2': worker2},
)
- class frontrun.async_trace_markers.AsyncTraceExecutor(schedule, *, deadlock_timeout=5.0)[source]¶
Bases:
objectExecutes async tasks with interlaced execution according to a schedule.
This is the main interface for the async frontrun library. It uses comment-based markers (# frontrun: marker_name) to control task execution order.
Unlike the sync version which runs tasks in actual threads, this runs each async task in its own thread with its own event loop via asyncio.run().
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- __init__(schedule, *, deadlock_timeout=5.0)[source]¶
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- run(tasks, timeout=10.0)[source]¶
Run all tasks with controlled interleaving based on comment markers.
This is now a synchronous method that creates threads and waits for them.
- Parameters:
tasks (dict[str, Callable[[], Coroutine[Any, Any, None]]]) – Dictionary mapping task names to their async functions
timeout (float) – Timeout in seconds for all tasks to complete
- Raises:
TimeoutError – If tasks don’t complete within the timeout
Any exception that occurred in a task during execution –
- Return type:
None
- frontrun.async_trace_markers.async_frontrun(schedule, tasks, task_args=None, task_kwargs=None, timeout=10.0, deadlock_timeout=5.0)[source]¶
Convenience function to run multiple async tasks with a schedule.
This is now a synchronous function (not async) that creates an executor and runs the tasks.
Tasks use # frontrun: marker_name comments to mark synchronization points. No need to pass marker functions to tasks - the executor automatically detects markers via sys.settrace.
- Parameters:
schedule (Schedule) – The Schedule defining execution order
tasks (dict[str, Callable[[...], Coroutine[Any, Any, None]]]) – Dictionary mapping execution unit names to their async target functions
task_args (dict[str, tuple[Any, ...]] | None) – Optional dictionary mapping execution unit names to argument tuples
task_kwargs (dict[str, dict[str, Any]] | None) – Optional dictionary mapping execution unit names to keyword argument dicts
timeout (float) – Timeout in seconds for the entire execution
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
The AsyncTraceExecutor instance (useful for inspection)
- Return type:
Example:
async def worker(account, amount): # frontrun: before_deposit await account.deposit(amount) async_frontrun( schedule=Schedule([ Step("t1", "before_deposit"), Step("t2", "before_deposit") ]), tasks={"t1": worker, "t2": worker}, task_args={"t1": (account, 50), "t2": (account, 50)}, )
Async Bytecode Instrumentation¶
Await-point-level deterministic async concurrency testing.
Uses the shared InterleavedLoop abstraction to control which async task resumes at each await point, enabling fine-grained control over task interleaving.
This pairs naturally with property-based testing: rather than specifying exact schedules, generate random interleavings and check that invariants hold (or that bugs can be found).
The core insight: in async Python, context switches happen ONLY at await points. The event loop is single-threaded. By controlling which task resumes at each await point, we can explore the full space of possible interleavings — and there are far fewer of them than in threaded code.
Example — find a race condition with random schedule exploration:
>>> import asyncio
>>> from frontrun.async_bytecode import explore_interleavings, await_point
>>>
>>> class Counter:
... def __init__(self):
... self.value = 0
... async def increment(self):
... temp = self.value
... await await_point() # Yield control; race can happen here
... self.value = temp + 1
>>>
>>> result = asyncio.run(explore_interleavings(
... setup=lambda: Counter(),
... tasks=[lambda c: c.increment(), lambda c: c.increment()],
... invariant=lambda c: c.value == 2,
... ))
>>> assert result.property_holds, result.explanation # fails — lost update!
The await_point() function marks explicit yield points where context switches can occur. This is analogous to bytecode.py’s opcode-level tracing, but for async code the number of interleaving points is much smaller and more explicit.
In production async code, every await is a potential context switch point. For testing, call await await_point() at each location where a race condition could manifest.
- async frontrun.async_bytecode.await_point()[source]¶
Yield to the scheduler at an await point.
Call this at every point where a context switch could happen in your async code. This is the async equivalent of a bytecode opcode — the atomic unit of interleaving.
In typical async code, every await statement is a potential context switch point. For testing race conditions, replace strategic awaits with await await_point() to allow the scheduler to control ordering.
If no scheduler is active (i.e., not running under AsyncBytecodeShuffler), this function returns immediately without blocking.
- class frontrun.async_bytecode.AwaitScheduler(schedule, num_tasks, *, deadlock_timeout=5.0)[source]¶
Bases:
InterleavedLoopControls async task execution at await-point granularity.
The schedule is a list of task indices. Each entry means “let this task resume from its next await point.” When the schedule is exhausted, all tasks run freely to completion.
Built on the shared InterleavedLoop abstraction, using index-based scheduling as its policy.
- Parameters:
schedule (list[int])
num_tasks (int)
deadlock_timeout (float)
- should_proceed(task_id, marker=None)[source]¶
Return True if this task should resume now.
Called while holding the condition lock. Must not await.
- Parameters:
task_id (Any) – Identity of the calling task (str, int, etc.)
marker (Any) – Optional context from the yield point (e.g. a marker name, an (operation, phase) tuple, or None).
- Return type:
bool
- on_proceed(task_id, marker=None)[source]¶
Update scheduling state after a task is allowed to proceed.
Called while holding the condition lock, immediately after should_proceed returned True. Must not await.
- Parameters:
task_id (Any) – Identity of the task that is proceeding.
marker (Any) – Same marker value passed to should_proceed.
- Return type:
None
- property had_error: bool¶
Check if an error occurred during execution.
- class frontrun.async_bytecode.AsyncBytecodeShuffler(scheduler)[source]¶
Bases:
objectRun concurrent async functions with await-point-level interleaving control.
Creates asyncio tasks for each function and delegates to the AwaitScheduler (an InterleavedLoop subclass) for execution and context setup.
- Parameters:
scheduler (AwaitScheduler)
- async run(funcs, args=None, kwargs=None, timeout=10.0)[source]¶
Run async functions concurrently with controlled interleaving.
- Parameters:
funcs (list[Callable[[...], Coroutine[Any, Any, None]]]) – One async callable per task.
args (list[tuple[Any, ...]] | None) – Per-task positional args.
kwargs (list[dict[str, Any]] | None) – Per-task keyword args.
timeout (float) – Max wait time for all tasks.
- frontrun.async_bytecode.controlled_interleaving(schedule, num_tasks=2)[source]¶
Context manager for running async code under a specific interleaving.
- Parameters:
schedule (list[int]) – List of task indices controlling await-point execution order.
num_tasks (int) – Number of tasks.
- Yields:
AsyncBytecodeShuffler runner.
- Return type:
AsyncGenerator[AsyncBytecodeShuffler, None]
Example
>>> async with controlled_interleaving([0, 1, 0, 1], num_tasks=2) as runner: ... await runner.run([coro1, coro2])
- async frontrun.async_bytecode.run_with_schedule(schedule, setup, tasks, timeout=5.0, deadlock_timeout=5.0)[source]¶
Run one async interleaving and return the state object.
- Parameters:
schedule (list[int]) – Await-point-level schedule (list of task indices).
setup (Callable[[], Any]) – Returns fresh shared state.
tasks (list[Callable[[Any], Coroutine[Any, Any, None]]]) – Async callables that each receive the state as their argument.
timeout (float) – Max seconds.
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
The state object after execution.
- Return type:
Any
- async frontrun.async_bytecode.explore_interleavings(setup, tasks, invariant, max_attempts=200, max_ops=100, timeout_per_run=5.0, seed=None, deadlock_timeout=5.0)[source]¶
Search for async interleavings that violate an invariant.
Generates random await-point-level schedules and tests whether the invariant holds under each one. If a violation is found, returns immediately with the counterexample schedule.
This is the async analogue of property-based testing for concurrency: instead of generating random inputs, we generate random interleavings and check that the result satisfies an invariant.
Note: max_ops defaults to 100 (vs 300 for bytecode.py) because async code has far fewer interleaving points than threaded bytecode execution. Each await_point() call represents a much coarser-grained checkpoint.
- Parameters:
setup (Callable[[], Any]) – Returns fresh shared state for each attempt.
tasks (list[Callable[[Any], Coroutine[Any, Any, None]]]) – Async callables that each receive the state as their argument.
invariant (Callable[[Any], bool]) – Predicate on the state. Returns True if the property holds.
max_attempts (int) – How many random interleavings to try.
max_ops (int) – Maximum schedule length per attempt.
timeout_per_run (float) – Timeout for each individual run.
seed (int | None) – Optional RNG seed for reproducibility.
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
InterleavingResult with the outcome. The
unique_interleavingsfield reports how many distinct schedule orderings were observed.- Return type:
- frontrun.async_bytecode.schedule_strategy(num_tasks, max_ops=100)[source]¶
Hypothesis strategy for generating fair await-point schedules.
Generates schedules as a sequence of rounds, where each round is a random permutation of all task indices. This guarantees every task gets exactly the same number of scheduling slots, preventing starvation.
For use with hypothesis @given decorator in your own tests:
>>> from hypothesis import given >>> from frontrun.async_bytecode import schedule_strategy, run_with_schedule >>> import asyncio >>> >>> @given(schedule=schedule_strategy(2)) ... def test_my_invariant(schedule): ... state = asyncio.run(run_with_schedule(schedule, setup, tasks)) ... assert state.value == expected
Note: max_ops defaults to 100 (vs 300 for bytecode.py) because async code has far fewer interleaving points. Each schedule entry corresponds to one await_point() call, not one bytecode opcode.
- Parameters:
num_tasks (int)
max_ops (int)
- Return type:
Any
Trace Formatting¶
Trace recording, filtering, and formatting for comprehensible race condition errors.
When frontrun finds a race condition, the raw counterexample is a list of thread indices — one per bytecode instruction. This module transforms that into a human-readable “story” of which source lines executed in which order.
The pipeline: 1. Record a TraceEvent at each opcode during the failing run. 2. Filter to events that touch shared state (LOAD_ATTR/STORE_ATTR, etc.) 3. Deduplicate consecutive events from the same thread on the same source line. 4. Classify the conflict pattern (lost update, order violation, etc.) 5. Format as an interleaved source-line trace.
- class frontrun._trace_format.TraceEvent(step_index, thread_id, filename, lineno, function_name, opcode, access_type=None, attr_name=None, obj_type_name=None, call_chain=None)[source]¶
Bases:
objectA single recorded event from the trace.
- Parameters:
step_index (int)
thread_id (int)
filename (str)
lineno (int)
function_name (str)
opcode (str)
access_type (str | None)
attr_name (str | None)
obj_type_name (str | None)
call_chain (list[str] | None)
- step_index: int¶
- thread_id: int¶
- filename: str¶
- lineno: int¶
- function_name: str¶
- opcode: str¶
- access_type: str | None¶
- attr_name: str | None¶
- obj_type_name: str | None¶
- call_chain: list[str] | None¶
- class frontrun._trace_format.SourceLineEvent(thread_id, filename, lineno, function_name, source_line, access_type=None, attr_name=None, obj_type_name=None, call_chain=None)[source]¶
Bases:
objectA deduplicated, source-level event for display.
- Parameters:
thread_id (int)
filename (str)
lineno (int)
function_name (str)
source_line (str)
access_type (str | None)
attr_name (str | None)
obj_type_name (str | None)
call_chain (list[str] | None)
- thread_id: int¶
- filename: str¶
- lineno: int¶
- function_name: str¶
- source_line: str¶
- access_type: str | None¶
- attr_name: str | None¶
- obj_type_name: str | None¶
- call_chain: list[str] | None¶
- frontrun._trace_format.qualified_name(frame)[source]¶
Get a qualified function name from a frame (e.g.
DB.dict).- Parameters:
frame (Any)
- Return type:
str
- frontrun._trace_format.build_call_chain(frame, *, filter_fn, max_depth=3)[source]¶
Walk user-code frames from frame upward, returning qualified names.
filter_fn(filename) -> boolselects which frames are user code (typicallyfrontrun._tracing.should_trace_file()). ReturnsNonewhen the chain would be empty.- Parameters:
frame (Any)
filter_fn (Any)
max_depth (int)
- Return type:
list[str] | None
- class frontrun._trace_format.TraceRecorder(*, enabled=True)[source]¶
Bases:
objectAccumulates TraceEvent objects during a single execution.
Thread-safe: multiple threads call
record()concurrently, each holding the scheduler lock (so ordering is deterministic).- Parameters:
enabled (bool)
- events: list[TraceEvent]¶
- enabled¶
- record(thread_id, frame, opcode=None, access_type=None, attr_name=None, obj=None, obj_type_name=None, call_chain=None)[source]¶
Record one trace event from a frame object.
- Parameters:
thread_id (int)
frame (Any)
opcode (str | None)
access_type (str | None)
attr_name (str | None)
obj (Any)
obj_type_name (str | None)
call_chain (list[str] | None)
- Return type:
None
Keep only events that access shared mutable state.
- Parameters:
events (list[TraceEvent])
- Return type:
list[TraceEvent]
- frontrun._trace_format.deduplicate_to_source_lines(events)[source]¶
Collapse consecutive events from the same thread+line into one SourceLineEvent.
When multiple opcodes on the same source line produce events (e.g., LOAD_ATTR then STORE_ATTR for
self.value += 1), merge them into a single entry with a combined access_type — but only when they access the same (obj_type, attr_name) key. Events with different keys on the same line get separate entries so that filtering can distinguish them later (e.g. an attribute read vs an I/O event).- Parameters:
events (list[TraceEvent])
- Return type:
list[SourceLineEvent]
- class frontrun._trace_format.ConflictInfo(pattern, summary, attr_name=None)[source]¶
Bases:
objectDescription of the conflict pattern found in the trace.
- Parameters:
pattern (str)
summary (str)
attr_name (str | None)
- pattern: str¶
- summary: str¶
- attr_name: str | None = None¶
- frontrun._trace_format.classify_conflict(events)[source]¶
Examine a filtered, deduplicated trace and classify the conflict type.
Looks for classic patterns: - Lost update: R_a R_b W_a W_b (or R_a R_b W_b W_a) - Write-write: W_a W_b on same attribute without intervening sync
- Parameters:
events (list[SourceLineEvent])
- Return type:
- class frontrun._trace_format.CollapsedRun(count, thread_id)[source]¶
Bases:
objectPlaceholder for a collapsed sequence of events from one thread.
- Parameters:
count (int)
thread_id (int)
- count: int¶
- thread_id: int¶
- frontrun._trace_format.condense_trace(lines, *, max_lines=30)[source]¶
Condense a trace to show only the essential interleaving.
Strategy: 1. Always filter to events involved in cross-thread data conflicts
(same attribute accessed by 2+ threads with at least one write). After filtering, re-merge consecutive same-line events that were previously separated by now-removed entries.
If still too long, collapse single-thread runs (keep first/last).
Cap at
max_lines.
Returns a mixed list of
SourceLineEventandCollapsedRunplaceholders for the formatter to render.- Parameters:
lines (list[SourceLineEvent])
max_lines (int)
- Return type:
list[SourceLineEvent | CollapsedRun]
- frontrun._trace_format.format_trace(events, *, num_threads, thread_names=None, num_explored=0, invariant_desc=None, show_opcodes=False, reproduction_attempts=0, reproduction_successes=0, max_lines=30)[source]¶
Format a trace as a human-readable interleaved source-line display.
- Parameters:
events (list[TraceEvent]) – Raw trace events from a TraceRecorder.
num_threads (int) – Total number of threads.
thread_names (list[str] | None) – Optional display names for threads.
num_explored (int) – Number of interleavings explored before finding the bug.
invariant_desc (str | None) – Description of the violated invariant.
show_opcodes (bool) – If True, include opcode-level detail for each line.
reproduction_attempts (int) – How many times the schedule was replayed.
reproduction_successes (int) – How many replays reproduced the failure.
max_lines (int) – Maximum trace lines before condensation (default 30).
- Returns:
Multi-line string suitable for printing or attaching to test output.
- Return type:
str
Async Scheduler Utilities¶
Async event loop abstraction for deterministic task interleaving.
This module provides InterleavedLoop, the shared foundation for all async frontrun POCs. It wraps asyncio’s cooperative scheduling to give deterministic control over which task resumes at each yield point.
In async Python, the event loop decides which ready task to resume after each await point. InterleavedLoop intercepts this decision, using a pluggable scheduling policy to control the execution order.
Key insight: async code is single-threaded and cooperative. Context switches happen ONLY at await points. InterleavedLoop exploits this by gating each yield point through an asyncio.Condition — tasks wait until the scheduling policy says it’s their turn.
Both async approaches build on this abstraction: - async_trace_markers (comment annotations): marker-based scheduling - async_bytecode (property-based): index-based scheduling
Each POC subclasses InterleavedLoop and implements two methods: - should_proceed(task_id, marker): return True when a task should resume - on_proceed(task_id, marker): update internal scheduling state
Example — a simple round-robin scheduler:
>>> class RoundRobinLoop(InterleavedLoop):
... def __init__(self, order):
... super().__init__()
... self._order = order
... self._step = 0
...
... def should_proceed(self, task_id, marker=None):
... if self._step >= len(self._order):
... return True
... return self._order[self._step] == task_id
...
... def on_proceed(self, task_id, marker=None):
... self._step += 1
- class frontrun.async_scheduler.InterleavedLoop(*, deadlock_timeout=5.0)[source]¶
Bases:
objectWrapped event loop for deterministic async task interleaving.
This class controls which async task resumes at each yield point. Tasks call
await loop.pause(task_id)at points where a context switch could happen, and the loop’s scheduling policy decides whether the task should proceed or wait.- Subclasses must implement:
should_proceed(task_id, marker): Is it this task’s turn? on_proceed(task_id, marker): Update state after a task proceeds.
- The base class provides:
pause(): Yield point that gates on the scheduling policy run_all(): Run tasks with controlled interleaving Error propagation, timeout handling, and done-task tracking
- Parameters:
deadlock_timeout (float)
- should_proceed(task_id, marker=None)[source]¶
Return True if this task should resume now.
Called while holding the condition lock. Must not await.
- Parameters:
task_id (Any) – Identity of the calling task (str, int, etc.)
marker (Any) – Optional context from the yield point (e.g. a marker name, an (operation, phase) tuple, or None).
- Return type:
bool
- on_proceed(task_id, marker=None)[source]¶
Update scheduling state after a task is allowed to proceed.
Called while holding the condition lock, immediately after should_proceed returned True. Must not await.
- Parameters:
task_id (Any) – Identity of the task that is proceeding.
marker (Any) – Same marker value passed to should_proceed.
- Return type:
None
- async pause(task_id, marker=None)[source]¶
Yield point: block until the scheduling policy says to proceed.
Tasks call this at every point where a context switch could happen. The call blocks (yields to the event loop) until should_proceed() returns True for this task, then calls on_proceed() and returns.
Uses all-tasks-waiting detection: if every non-done task is blocked in
pause()and none can proceed, deadlock is detected instantly.- Parameters:
task_id (Any) – Identity of the calling task.
marker (Any) – Optional scheduling context.
- Return type:
None
- async run_all(task_funcs, timeout=10.0)[source]¶
Run tasks with controlled interleaving.
- Parameters:
task_funcs (dict[Any, Callable[[...], Awaitable[None]]] | list[Callable[[...], Awaitable[None]]]) – Either a dict
{task_id: async_callable}or a list of async callables (which get integer task_ids 0, 1, 2, …).timeout (float) – Maximum total time to wait for all tasks.
- Return type:
None
- property had_error: bool¶
True if an error was reported during execution.