API Reference¶
Trace Markers¶
Frontrun: Deterministic thread interleaving using sys.settrace and comment markers.
This module provides a mechanism to control thread execution order by marking synchronization points in code with special comments and enforcing a predefined execution schedule using Python’s tracing facilities.
- Example usage:
```python def worker_function():
x = read_data() # frontrun: read write_data(x) # frontrun: write
- schedule = Schedule([
Step(“thread1”, “read”), Step(“thread2”, “read”), Step(“thread1”, “write”), Step(“thread2”, “write”),
])
executor = TraceExecutor(schedule) executor.run(“thread1”, worker_function) executor.run(“thread2”, worker_function) executor.wait() ```
- class frontrun.trace_markers.MarkerRegistry[source]¶
Bases:
objectTracks marker locations in source code for efficient lookup.
This class scans source files to find lines with frontrun markers and maintains a mapping from (filename, line_number) to marker names.
- class frontrun.trace_markers.ThreadCoordinator(schedule, *, deadlock_timeout=5.0)[source]¶
Bases:
objectCoordinates thread execution according to a schedule.
This class manages the synchronization between threads, ensuring that each thread executes markers in the order specified by the schedule.
- Parameters:
schedule (Schedule)
deadlock_timeout (float)
- wait_for_turn(execution_name, marker_name, *, _reacquire_execution_lock=False)[source]¶
Block until it’s this execution unit’s turn to execute this marker.
When _reacquire_execution_lock is
True(used by the trace executors),_execution_lockis acquired while the condition lock is still held, before returning. This prevents other threads from racing ahead between being notified and the caller resuming execution. The caller must have already released_execution_lockbefore calling this method.- Parameters:
execution_name (str) – The name of the calling execution unit
marker_name (str) – The marker that was hit
_reacquire_execution_lock (bool)
- class frontrun.trace_markers.TraceExecutor(schedule, *, deadlock_timeout=5.0)[source]¶
Bases:
objectExecutes threads with interlaced execution according to a schedule.
This is the main interface for the frontrun library. It sets up tracing for each thread and coordinates their execution.
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- __init__(schedule, *, deadlock_timeout=5.0)[source]¶
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- run(execution_name, target, args=(), kwargs=None)[source]¶
Start a new thread with tracing enabled.
- Parameters:
execution_name (str) – The name for this execution unit (must match schedule)
target (Callable[[...], None]) – The function to execute in the thread
args (tuple[Any, ...]) – Positional arguments for the target function
kwargs (dict[str, Any] | None) – Keyword arguments for the target function
- frontrun.trace_markers.frontrun(schedule, threads, thread_args=None, thread_kwargs=None, timeout=None, deadlock_timeout=5.0)[source]¶
Convenience function to run multiple threads with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining execution order
threads (dict[str, Callable[[...], None]]) – Dictionary mapping execution unit names to their target functions
thread_args (dict[str, tuple[Any, ...]] | None) – Optional dictionary mapping execution unit names to argument tuples
thread_kwargs (dict[str, dict[str, Any]] | None) – Optional dictionary mapping execution unit names to keyword argument dicts
timeout (float | None) – Optional timeout for waiting
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
The TraceExecutor instance (useful for inspection)
- Return type:
Example
schedule=Schedule([Step(“t1”, “marker1”), Step(“t2”, “marker1”)]), threads={“t1”: worker_func, “t2”: worker_func},
)¶
Async Trace Markers¶
Frontrun: Deterministic async task interleaving using comment-based markers.
This module provides a mechanism to control async task execution order by marking
synchronization points in code with # frontrun: marker_name comments,
matching the elegant syntax of the sync trace_markers module.
Key Insight: Thread-Based Execution with sys.settrace¶
This implementation mirrors the sync architecture exactly:
Each async task runs in its own thread via
asyncio.run(task_fn())sys.settracefires on every line, including inside synchronous sub-coroutinesWhen a marker is detected, the trace function blocks the thread via
ThreadCoordinator.wait_for_turn()Execution resumes exactly where it paused
This solves the synchronous function body bug: when await self.get_balance()
completes synchronously (no internal yields), the trace still fires on every
line during that execution. After get_balance() returns, the trace fires for
the next line (after the marker), detects the marker, and blocks before that
line executes.
Marker Semantics¶
Marker Placement: Place markers to gate the operations you want to control.
- Inline markers (marker on same line as operation):
current = self.balance # frontrun: read_balance
- Separate-line markers (marker before operation):
# frontrun: read_balance current = self.balance
Both styles work identically: the marker gates execution of the line, ensuring it only executes after the scheduler approves this task at this marker.
Example usage:
async def worker_function():
# frontrun: read_data
x = await read_data()
# frontrun: write_data
await write_data(x)
schedule = Schedule([
Step("task1", "read_data"),
Step("task2", "read_data"),
Step("task1", "write_data"),
Step("task2", "write_data"),
])
executor = AsyncTraceExecutor(schedule)
executor.run({
'task1': worker_function,
'task2': worker_function,
})
Or using the convenience function:
async_frontrun(
schedule=schedule,
tasks={'task1': worker1, 'task2': worker2},
)
- class frontrun.async_trace_markers.AsyncTraceExecutor(schedule, *, deadlock_timeout=5.0)[source]¶
Bases:
objectExecutes async tasks with interlaced execution according to a schedule.
This is the main interface for the async frontrun library. It uses comment-based markers (# frontrun: marker_name) to control task execution order.
Unlike the sync version which runs tasks in actual threads, this runs each async task in its own thread with its own event loop via asyncio.run().
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- __init__(schedule, *, deadlock_timeout=5.0)[source]¶
Initialize the executor with a schedule.
- Parameters:
schedule (Schedule) – The Schedule defining the execution order
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- run(tasks, timeout=10.0)[source]¶
Run all tasks with controlled interleaving based on comment markers.
This is now a synchronous method that creates threads and waits for them.
- Parameters:
tasks (dict[str, Callable[[], Coroutine[Any, Any, None]]]) – Dictionary mapping task names to their async functions
timeout (float) – Timeout in seconds for all tasks to complete
- Raises:
TimeoutError – If tasks don’t complete within the timeout
Any exception that occurred in a task during execution –
- Return type:
None
- frontrun.async_trace_markers.async_frontrun(schedule, tasks, task_args=None, task_kwargs=None, timeout=10.0, deadlock_timeout=5.0)[source]¶
Convenience function to run multiple async tasks with a schedule.
This is now a synchronous function (not async) that creates an executor and runs the tasks.
Tasks use # frontrun: marker_name comments to mark synchronization points. No need to pass marker functions to tasks - the executor automatically detects markers via sys.settrace.
- Parameters:
schedule (Schedule) – The Schedule defining execution order
tasks (dict[str, Callable[[...], Coroutine[Any, Any, None]]]) – Dictionary mapping execution unit names to their async target functions
task_args (dict[str, tuple[Any, ...]] | None) – Optional dictionary mapping execution unit names to argument tuples
task_kwargs (dict[str, dict[str, Any]] | None) – Optional dictionary mapping execution unit names to keyword argument dicts
timeout (float) – Timeout in seconds for the entire execution
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
The AsyncTraceExecutor instance (useful for inspection)
- Return type:
Example:
async def worker(account, amount): # frontrun: before_deposit await account.deposit(amount) async_frontrun( schedule=Schedule([ Step("t1", "before_deposit"), Step("t2", "before_deposit") ]), tasks={"t1": worker, "t2": worker}, task_args={"t1": (account, 50), "t2": (account, 50)}, )
Bytecode Instrumentation¶
Bytecode-level deterministic concurrency testing.
Uses sys.settrace with f_trace_opcodes to intercept execution at every bytecode instruction, enabling fine-grained control over thread interleaving.
This pairs naturally with property-based testing: rather than specifying exact schedules, generate random interleavings and check that invariants hold (or that bugs can be found).
The core insight: CPython context switches happen between bytecode instructions. By controlling which thread gets to execute each instruction, we can explore the full space of possible interleavings.
Example — find a race condition with random schedule exploration:
>>> from frontrun.bytecode import explore_interleavings
>>>
>>> class Counter:
... def __init__(self):
... self.value = 0
... def increment(self):
... temp = self.value
... self.value = temp + 1
>>>
>>> result = explore_interleavings(
... setup=lambda: Counter(),
... threads=[lambda c: c.increment(), lambda c: c.increment()],
... invariant=lambda c: c.value == 2,
... )
>>> assert not result.property_holds # race condition found!
- class frontrun.bytecode.OpcodeScheduler(schedule, num_threads, *, deadlock_timeout=5.0, max_ops=0, trace_recorder=None)[source]¶
Bases:
objectControls thread execution at bytecode instruction granularity.
The schedule is a list of thread indices. Each entry means “let this thread execute one bytecode instruction.”
When the explicit schedule is exhausted, the scheduler dynamically extends it with round-robin entries so that threads remain under deterministic scheduler control instead of falling back to real (non-deterministic) concurrency. A hard cap (
max_ops) limits the total number of scheduler steps to prevent infinite runs.Deadlock detection uses a configurable fallback
condition.waittimeout (default 5 s) for threads stuck in C extensions or other unmanaged blocking calls. When cooperative locks are enabled, theWaitForGraphprovides instant lock-ordering cycle detection.- Parameters:
schedule (list[int])
num_threads (int)
deadlock_timeout (float)
max_ops (int)
trace_recorder (TraceRecorder | None)
- wait_for_turn(thread_id)[source]¶
Block until it’s this thread’s turn. Returns False when done.
- Parameters:
thread_id (int)
- Return type:
bool
- property had_error: bool¶
- class frontrun.bytecode.BytecodeShuffler(scheduler, detect_io=True)[source]¶
Bases:
objectRun concurrent functions with bytecode-level interleaving control.
Sets up per-thread trace functions that intercept every bytecode instruction in user code and defer to the OpcodeScheduler.
Replaces threading and queue primitives (Lock, RLock, Semaphore, BoundedSemaphore, Event, Condition, Queue, LifoQueue, PriorityQueue) with cooperative versions that yield scheduler turns instead of blocking in C. This prevents the deadlock that otherwise occurs when one thread holds a primitive and the scheduler gives a turn to another thread that tries to acquire it.
- Parameters:
scheduler (OpcodeScheduler)
detect_io (bool)
- run(funcs, args=None, kwargs=None, timeout=10.0)[source]¶
Run functions concurrently with controlled interleaving.
- Parameters:
funcs (list[Callable[[...], None]]) – One callable per thread.
args (list[tuple[Any, ...]] | None) – Per-thread positional args.
kwargs (list[dict[str, Any]] | None) – Per-thread keyword args.
timeout (float) – Max total wait time for all threads (global deadline).
- Return type:
None
- frontrun.bytecode.controlled_interleaving(schedule, num_threads=2)[source]¶
Context manager for running code under a specific interleaving.
- Parameters:
schedule (list[int]) – List of thread indices controlling opcode execution order.
num_threads (int) – Number of threads.
- Yields:
BytecodeShuffler runner.
Example
>>> with controlled_interleaving([0, 1, 0, 1], num_threads=2) as runner: ... runner.run([func1, func2])
- frontrun.bytecode.run_with_schedule(schedule, setup, threads, timeout=5.0, detect_io=True, debug=False, deadlock_timeout=5.0, trace_recorder=None)[source]¶
Run one interleaving and return the state object.
- Parameters:
schedule (list[int]) – Opcode-level schedule (list of thread indices).
setup (Callable[[], T]) – Returns fresh shared state.
threads (list[Callable[[T], None]]) – Callables that each receive the state as their argument.
timeout (float) – Max seconds.
detect_io (bool) – Automatically detect socket/file I/O and treat them as scheduling points (default True).
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
trace_recorder (TraceRecorder | None) – Optional recorder for capturing trace events. When provided, records shared-state accesses for later formatting into human-readable explanations.
debug (bool)
- Returns:
The state object after execution.
- Return type:
T
- frontrun.bytecode.explore_interleavings(setup, threads, invariant, max_attempts=200, max_ops=300, timeout_per_run=5.0, seed=None, debug=False, detect_io=True, deadlock_timeout=5.0, reproduce_on_failure=10)[source]¶
Search for interleavings that violate an invariant.
Note
When running under pytest, this function requires the
frontrunCLI wrapper (frontrun pytest ...) or the--frontrun-patch-locksflag. Without it, the test is automatically skipped.Generates random opcode-level schedules and tests whether the invariant holds under each one. If a violation is found, returns immediately with the counterexample schedule.
This is the bytecode-level analogue of property-based testing: instead of generating random inputs, we generate random interleavings and check that the result satisfies an invariant.
- Parameters:
setup (Callable[[], T]) – Returns fresh shared state for each attempt.
threads (list[Callable[[T], None]]) – Callables that each receive the state as their argument.
invariant (Callable[[T], bool]) – Predicate on the state. Returns True if the property holds.
max_attempts (int) – How many random interleavings to try.
max_ops (int) – Maximum schedule length per attempt.
timeout_per_run (float) – Timeout for each individual run.
seed (int | None) – Optional RNG seed for reproducibility.
detect_io (bool) – Automatically detect socket/file I/O and treat them as scheduling points (default True).
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
reproduce_on_failure (int) – When a counterexample is found, replay the same schedule this many times to measure reproducibility (default 10). Set to 0 to skip reproduction testing.
debug (bool)
- Returns:
InterleavingResult with the outcome. The
unique_interleavingsfield reports how many distinct execution orderings were observed, providing a lower bound on exploration coverage.- Return type:
InterleavingResult
- frontrun.bytecode.schedule_strategy(num_threads, max_ops=300)[source]¶
Hypothesis strategy for generating fair opcode schedules.
Generates schedules as a sequence of rounds, where each round is a random permutation of all thread indices. This guarantees every thread gets exactly the same number of scheduling slots, preventing starvation (e.g. a schedule that gives 99 % of steps to one thread).
For use with hypothesis @given decorator in your own tests:
>>> from hypothesis import given >>> from frontrun.bytecode import schedule_strategy, run_with_schedule >>> >>> @given(schedule=schedule_strategy(2)) ... def test_my_invariant(schedule): ... state = run_with_schedule(schedule, setup, threads) ... assert state.value == expected
Note: hypothesis expects deterministic tests. Bytecode-level interleaving is deterministic for a given schedule, but hypothesis’s shrinking may still interact oddly with threading. Consider using settings(phases=[Phase.generate]) to skip shrinking if needed.
- Parameters:
num_threads (int)
max_ops (int)
Async Bytecode Instrumentation¶
Await-point-level deterministic async concurrency testing.
Uses the shared InterleavedLoop abstraction to control which async task resumes at each await point, enabling fine-grained control over task interleaving.
This pairs naturally with property-based testing: rather than specifying exact schedules, generate random interleavings and check that invariants hold (or that bugs can be found).
The core insight: in async Python, context switches happen ONLY at await points. The event loop is single-threaded. By controlling which task resumes at each await point, we can explore the full space of possible interleavings — and there are far fewer of them than in threaded code.
Example — find a race condition with random schedule exploration:
>>> import asyncio
>>> from frontrun.async_bytecode import explore_interleavings, await_point
>>>
>>> class Counter:
... def __init__(self):
... self.value = 0
... async def increment(self):
... temp = self.value
... await await_point() # Yield control; race can happen here
... self.value = temp + 1
>>>
>>> result = asyncio.run(explore_interleavings(
... setup=lambda: Counter(),
... tasks=[lambda c: c.increment(), lambda c: c.increment()],
... invariant=lambda c: c.value == 2,
... ))
>>> assert not result.property_holds # race condition found!
The await_point() function marks explicit yield points where context switches can occur. This is analogous to bytecode.py’s opcode-level tracing, but for async code the number of interleaving points is much smaller and more explicit.
In production async code, every await is a potential context switch point. For testing, call await await_point() at each location where a race condition could manifest.
- async frontrun.async_bytecode.await_point()[source]¶
Yield to the scheduler at an await point.
Call this at every point where a context switch could happen in your async code. This is the async equivalent of a bytecode opcode — the atomic unit of interleaving.
In typical async code, every await statement is a potential context switch point. For testing race conditions, replace strategic awaits with await await_point() to allow the scheduler to control ordering.
If no scheduler is active (i.e., not running under AsyncBytecodeShuffler), this function returns immediately without blocking.
- class frontrun.async_bytecode.AwaitScheduler(schedule, num_tasks, *, deadlock_timeout=5.0)[source]¶
Bases:
InterleavedLoopControls async task execution at await-point granularity.
The schedule is a list of task indices. Each entry means “let this task resume from its next await point.” When the schedule is exhausted, all tasks run freely to completion.
Built on the shared InterleavedLoop abstraction, using index-based scheduling as its policy.
- Parameters:
schedule (list[int])
num_tasks (int)
deadlock_timeout (float)
- should_proceed(task_id, marker=None)[source]¶
Return True if this task should resume now.
Called while holding the condition lock. Must not await.
- Parameters:
task_id (Any) – Identity of the calling task (str, int, etc.)
marker (Any) – Optional context from the yield point (e.g. a marker name, an (operation, phase) tuple, or None).
- Return type:
bool
- on_proceed(task_id, marker=None)[source]¶
Update scheduling state after a task is allowed to proceed.
Called while holding the condition lock, immediately after should_proceed returned True. Must not await.
- Parameters:
task_id (Any) – Identity of the task that is proceeding.
marker (Any) – Same marker value passed to should_proceed.
- Return type:
None
- property had_error: bool¶
Check if an error occurred during execution.
- class frontrun.async_bytecode.AsyncBytecodeShuffler(scheduler)[source]¶
Bases:
objectRun concurrent async functions with await-point-level interleaving control.
Creates asyncio tasks for each function and delegates to the AwaitScheduler (an InterleavedLoop subclass) for execution and context setup.
- Parameters:
scheduler (AwaitScheduler)
- async run(funcs, args=None, kwargs=None, timeout=10.0)[source]¶
Run async functions concurrently with controlled interleaving.
- Parameters:
funcs (list[Callable[[...], Coroutine[Any, Any, None]]]) – One async callable per task.
args (list[tuple[Any, ...]] | None) – Per-task positional args.
kwargs (list[dict[str, Any]] | None) – Per-task keyword args.
timeout (float) – Max wait time for all tasks.
- frontrun.async_bytecode.controlled_interleaving(schedule, num_tasks=2)[source]¶
Context manager for running async code under a specific interleaving.
- Parameters:
schedule (list[int]) – List of task indices controlling await-point execution order.
num_tasks (int) – Number of tasks.
- Yields:
AsyncBytecodeShuffler runner.
- Return type:
AsyncGenerator[AsyncBytecodeShuffler, None]
Example
>>> async with controlled_interleaving([0, 1, 0, 1], num_tasks=2) as runner: ... await runner.run([coro1, coro2])
- async frontrun.async_bytecode.run_with_schedule(schedule, setup, tasks, timeout=5.0, deadlock_timeout=5.0)[source]¶
Run one async interleaving and return the state object.
- Parameters:
schedule (list[int]) – Await-point-level schedule (list of task indices).
setup (Callable[[], Any]) – Returns fresh shared state.
tasks (list[Callable[[Any], Coroutine[Any, Any, None]]]) – Async callables that each receive the state as their argument.
timeout (float) – Max seconds.
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
The state object after execution.
- Return type:
Any
- async frontrun.async_bytecode.explore_interleavings(setup, tasks, invariant, max_attempts=200, max_ops=100, timeout_per_run=5.0, seed=None, deadlock_timeout=5.0)[source]¶
Search for async interleavings that violate an invariant.
Generates random await-point-level schedules and tests whether the invariant holds under each one. If a violation is found, returns immediately with the counterexample schedule.
This is the async analogue of property-based testing for concurrency: instead of generating random inputs, we generate random interleavings and check that the result satisfies an invariant.
Note: max_ops defaults to 100 (vs 300 for bytecode.py) because async code has far fewer interleaving points than threaded bytecode execution. Each await_point() call represents a much coarser-grained checkpoint.
- Parameters:
setup (Callable[[], Any]) – Returns fresh shared state for each attempt.
tasks (list[Callable[[Any], Coroutine[Any, Any, None]]]) – Async callables that each receive the state as their argument.
invariant (Callable[[Any], bool]) – Predicate on the state. Returns True if the property holds.
max_attempts (int) – How many random interleavings to try.
max_ops (int) – Maximum schedule length per attempt.
timeout_per_run (float) – Timeout for each individual run.
seed (int | None) – Optional RNG seed for reproducibility.
deadlock_timeout (float) – Seconds to wait before declaring a deadlock (default 5.0). Increase for code that legitimately blocks in C extensions (NumPy, database queries, network I/O).
- Returns:
InterleavingResult with the outcome. The
unique_interleavingsfield reports how many distinct schedule orderings were observed.- Return type:
InterleavingResult
- frontrun.async_bytecode.schedule_strategy(num_tasks, max_ops=100)[source]¶
Hypothesis strategy for generating fair await-point schedules.
Generates schedules as a sequence of rounds, where each round is a random permutation of all task indices. This guarantees every task gets exactly the same number of scheduling slots, preventing starvation.
For use with hypothesis @given decorator in your own tests:
>>> from hypothesis import given >>> from frontrun.async_bytecode import schedule_strategy, run_with_schedule >>> import asyncio >>> >>> @given(schedule=schedule_strategy(2)) ... def test_my_invariant(schedule): ... state = asyncio.run(run_with_schedule(schedule, setup, tasks)) ... assert state.value == expected
Note: max_ops defaults to 100 (vs 300 for bytecode.py) because async code has far fewer interleaving points. Each schedule entry corresponds to one await_point() call, not one bytecode opcode.
- Parameters:
num_tasks (int)
max_ops (int)
- Return type:
Any
Trace Formatting¶
Trace recording, filtering, and formatting for comprehensible race condition errors.
When frontrun finds a race condition, the raw counterexample is a list of thread indices — one per bytecode instruction. This module transforms that into a human-readable “story” of which source lines executed in which order.
The pipeline: 1. Record a TraceEvent at each opcode during the failing run. 2. Filter to events that touch shared state (LOAD_ATTR/STORE_ATTR, etc.) 3. Deduplicate consecutive events from the same thread on the same source line. 4. Classify the conflict pattern (lost update, order violation, etc.) 5. Format as an interleaved source-line trace.
- class frontrun._trace_format.TraceEvent(step_index, thread_id, filename, lineno, function_name, opcode, access_type=None, attr_name=None, obj_type_name=None)[source]¶
Bases:
objectA single recorded event from the trace.
- Parameters:
step_index (int)
thread_id (int)
filename (str)
lineno (int)
function_name (str)
opcode (str)
access_type (str | None)
attr_name (str | None)
obj_type_name (str | None)
- step_index: int¶
- thread_id: int¶
- filename: str¶
- lineno: int¶
- function_name: str¶
- opcode: str¶
- access_type: str | None¶
- attr_name: str | None¶
- obj_type_name: str | None¶
- class frontrun._trace_format.SourceLineEvent(thread_id, filename, lineno, function_name, source_line, access_type=None, attr_name=None, obj_type_name=None)[source]¶
Bases:
objectA deduplicated, source-level event for display.
- Parameters:
thread_id (int)
filename (str)
lineno (int)
function_name (str)
source_line (str)
access_type (str | None)
attr_name (str | None)
obj_type_name (str | None)
- thread_id: int¶
- filename: str¶
- lineno: int¶
- function_name: str¶
- source_line: str¶
- access_type: str | None¶
- attr_name: str | None¶
- obj_type_name: str | None¶
- class frontrun._trace_format.TraceRecorder(*, enabled=True)[source]¶
Bases:
objectAccumulates TraceEvent objects during a single execution.
Thread-safe: multiple threads call
record()concurrently, each holding the scheduler lock (so ordering is deterministic).- Parameters:
enabled (bool)
- events: list[TraceEvent]¶
- enabled¶
Keep only events that access shared mutable state.
- Parameters:
events (list[TraceEvent])
- Return type:
list[TraceEvent]
- frontrun._trace_format.deduplicate_to_source_lines(events)[source]¶
Collapse consecutive events from the same thread+line into one SourceLineEvent.
When multiple opcodes on the same source line produce events (e.g., LOAD_ATTR then STORE_ATTR for
self.value += 1), merge them into a single entry with a combined access_type.- Parameters:
events (list[TraceEvent])
- Return type:
list[SourceLineEvent]
- class frontrun._trace_format.ConflictInfo(pattern, summary, attr_name=None)[source]¶
Bases:
objectDescription of the conflict pattern found in the trace.
- Parameters:
pattern (str)
summary (str)
attr_name (str | None)
- pattern: str¶
- summary: str¶
- attr_name: str | None = None¶
- frontrun._trace_format.classify_conflict(events)[source]¶
Examine a filtered, deduplicated trace and classify the conflict type.
Looks for classic patterns: - Lost update: R_a R_b W_a W_b (or R_a R_b W_b W_a) - Write-write: W_a W_b on same attribute without intervening sync
- Parameters:
events (list[SourceLineEvent])
- Return type:
- frontrun._trace_format.format_trace(events, *, num_threads, thread_names=None, num_explored=0, invariant_desc=None, show_opcodes=False, reproduction_attempts=0, reproduction_successes=0)[source]¶
Format a trace as a human-readable interleaved source-line display.
- Parameters:
events (list[TraceEvent]) – Raw trace events from a TraceRecorder.
num_threads (int) – Total number of threads.
thread_names (list[str] | None) – Optional display names for threads.
num_explored (int) – Number of interleavings explored before finding the bug.
invariant_desc (str | None) – Description of the violated invariant.
show_opcodes (bool) – If True, include opcode-level detail for each line.
reproduction_attempts (int) – How many times the schedule was replayed.
reproduction_successes (int) – How many replays reproduced the failure.
- Returns:
Multi-line string suitable for printing or attaching to test output.
- Return type:
str
Async Scheduler Utilities¶
Async event loop abstraction for deterministic task interleaving.
This module provides InterleavedLoop, the shared foundation for all async frontrun POCs. It wraps asyncio’s cooperative scheduling to give deterministic control over which task resumes at each yield point.
In async Python, the event loop decides which ready task to resume after each await point. InterleavedLoop intercepts this decision, using a pluggable scheduling policy to control the execution order.
Key insight: async code is single-threaded and cooperative. Context switches happen ONLY at await points. InterleavedLoop exploits this by gating each yield point through an asyncio.Condition — tasks wait until the scheduling policy says it’s their turn.
Both async approaches build on this abstraction: - async_trace_markers (comment annotations): marker-based scheduling - async_bytecode (property-based): index-based scheduling
Each POC subclasses InterleavedLoop and implements two methods: - should_proceed(task_id, marker): return True when a task should resume - on_proceed(task_id, marker): update internal scheduling state
Example — a simple round-robin scheduler:
>>> class RoundRobinLoop(InterleavedLoop):
... def __init__(self, order):
... super().__init__()
... self._order = order
... self._step = 0
...
... def should_proceed(self, task_id, marker=None):
... if self._step >= len(self._order):
... return True
... return self._order[self._step] == task_id
...
... def on_proceed(self, task_id, marker=None):
... self._step += 1
- class frontrun.async_scheduler.InterleavedLoop(*, deadlock_timeout=5.0)[source]¶
Bases:
objectWrapped event loop for deterministic async task interleaving.
This class controls which async task resumes at each yield point. Tasks call
await loop.pause(task_id)at points where a context switch could happen, and the loop’s scheduling policy decides whether the task should proceed or wait.- Subclasses must implement:
should_proceed(task_id, marker): Is it this task’s turn? on_proceed(task_id, marker): Update state after a task proceeds.
- The base class provides:
pause(): Yield point that gates on the scheduling policy run_all(): Run tasks with controlled interleaving Error propagation, timeout handling, and done-task tracking
- Parameters:
deadlock_timeout (float)
- should_proceed(task_id, marker=None)[source]¶
Return True if this task should resume now.
Called while holding the condition lock. Must not await.
- Parameters:
task_id (Any) – Identity of the calling task (str, int, etc.)
marker (Any) – Optional context from the yield point (e.g. a marker name, an (operation, phase) tuple, or None).
- Return type:
bool
- on_proceed(task_id, marker=None)[source]¶
Update scheduling state after a task is allowed to proceed.
Called while holding the condition lock, immediately after should_proceed returned True. Must not await.
- Parameters:
task_id (Any) – Identity of the task that is proceeding.
marker (Any) – Same marker value passed to should_proceed.
- Return type:
None
- async pause(task_id, marker=None)[source]¶
Yield point: block until the scheduling policy says to proceed.
Tasks call this at every point where a context switch could happen. The call blocks (yields to the event loop) until should_proceed() returns True for this task, then calls on_proceed() and returns.
Uses all-tasks-waiting detection: if every non-done task is blocked in
pause()and none can proceed, deadlock is detected instantly.- Parameters:
task_id (Any) – Identity of the calling task.
marker (Any) – Optional scheduling context.
- Return type:
None
- async run_all(task_funcs, timeout=10.0)[source]¶
Run tasks with controlled interleaving.
- Parameters:
task_funcs (dict[Any, Callable[[...], Awaitable[None]]] | list[Callable[[...], Awaitable[None]]]) – Either a dict
{task_id: async_callable}or a list of async callables (which get integer task_ids 0, 1, 2, …).timeout (float) – Maximum total time to wait for all tasks.
- Return type:
None
- property had_error: bool¶
True if an error was reported during execution.