Quick Start¶
This guide will get you up and running with Frontrun using Trace Markers.
Basic Example: Triggering a Race Condition¶
from frontrun.trace_markers import Schedule, Step, TraceExecutor
class Counter:
def __init__(self):
self.value = 0
def increment(self):
temp = self.value # frontrun: read_value
temp += 1
self.value = temp # frontrun: write_value
def test_counter_lost_update():
counter = Counter()
# Both threads read before either writes, causing a lost update
schedule = Schedule([
Step("thread1", "read_value"), # T1 reads 0
Step("thread2", "read_value"), # T2 reads 0 (both see same value!)
Step("thread1", "write_value"), # T1 writes 1
Step("thread2", "write_value"), # T2 writes 1 (overwrites T1's update!)
])
executor = TraceExecutor(schedule)
executor.run("thread1", counter.increment)
executor.run("thread2", counter.increment)
executor.wait(timeout=5.0)
assert counter.value == 1 # One increment lost
Understanding Trace Markers¶
Trace markers are comments of the form # frontrun: <name> that tell
Frontrun where synchronization points are. A marker gates the code that
follows it: when a thread reaches a marker, it pauses until the scheduler grants
it a turn. Only then does the gated code execute.
Two placement styles are supported:
Inline with code (marker on the same line as the operation it gates):
def increment(self): temp = self.value # frontrun: read_value temp += 1 self.value = temp # frontrun: write_value
Here
read_valuegates the read ofself.value, andwrite_valuegates the write.On a separate line before the operation:
def increment(self): # frontrun: read_value temp = self.value temp += 1 # frontrun: write_value self.value = temp
The semantics are the same: the marker gates the next executable line.
Name markers after the operation they gate (read_value, write_balance,
acquire_lock, etc.) rather than using temporal prefixes like before_ or
after_.
Creating Schedules¶
A schedule defines the execution order of marked synchronization points:
from frontrun.trace_markers import Schedule, Step
schedule = Schedule([
Step("thread1", "marker_name_1"),
Step("thread1", "marker_name_2"),
Step("thread2", "marker_name_1"),
Step("thread2", "marker_name_2"),
])
Each Step specifies:
The thread/task name
The marker name to execute at that step
Running with Controlled Interleaving¶
Execute your code with a specific schedule:
from frontrun.trace_markers import TraceExecutor
executor = TraceExecutor(schedule)
# Register tasks to run
executor.run("thread1", task_function_1)
executor.run("thread2", task_function_2)
# Wait for completion
executor.wait(timeout=5.0)
Async Support¶
Async trace markers use the same comment-based syntax. Each async task runs in
its own thread (via asyncio.run), with sys.settrace controlling
interleaving between tasks.
A marker gates the next await expression. When a task reaches a marker, it
pauses until the scheduler grants it a turn; only then does the gated await
execute. Between two markers the task runs without interruption from other
scheduled tasks.
Here is a complete async example — the same lost-update race as the sync
version, but with await boundaries:
from frontrun.async_trace_markers import AsyncTraceExecutor
from frontrun.common import Schedule, Step
class AsyncCounter:
def __init__(self):
self.value = 0
async def get_value(self):
return self.value
async def set_value(self, new_value):
self.value = new_value
async def increment(self):
# frontrun: read_value
temp = await self.get_value()
# frontrun: write_value
await self.set_value(temp + 1)
def test_async_counter_lost_update():
counter = AsyncCounter()
# Both tasks read before either writes — triggers the lost update
schedule = Schedule([
Step("task1", "read_value"),
Step("task2", "read_value"),
Step("task1", "write_value"),
Step("task2", "write_value"),
])
executor = AsyncTraceExecutor(schedule)
executor.run({
"task1": counter.increment,
"task2": counter.increment,
})
# Both tasks read 0, then both write 1 — one increment is lost
assert counter.value == 1
def test_async_counter_serialized():
counter = AsyncCounter()
# Serialized: task1 completes before task2 starts
schedule = Schedule([
Step("task1", "read_value"),
Step("task1", "write_value"),
Step("task2", "read_value"),
Step("task2", "write_value"),
])
executor = AsyncTraceExecutor(schedule)
executor.run({
"task1": counter.increment,
"task2": counter.increment,
})
assert counter.value == 2 # No lost update