"""
Bytecode-tracing DPOR (Dynamic Partial Order Reduction) for frontrun.
This module implements systematic interleaving exploration using DPOR,
completely separate from the existing bytecode.py random exploration.
The approach:
1. A Rust DPOR engine (frontrun._dpor) manages the exploration tree,
vector clocks, and backtrack set computation.
2. Python drives execution: runs threads under sys.settrace opcode
tracing, uses a shadow stack to detect shared-memory accesses,
and feeds access/sync events to the Rust engine.
3. Cooperative threading primitives (lock, event, etc.) are monkey-patched
to yield control back to the DPOR scheduler and report synchronization
events for happens-before tracking.
Usage::
from frontrun.dpor import explore_dpor
class Counter:
def __init__(self):
self.value = 0
def increment(self):
temp = self.value
self.value = temp + 1
result = explore_dpor(
setup=lambda: Counter(),
threads=[lambda c: c.increment(), lambda c: c.increment()],
invariant=lambda c: c.value == 2,
)
assert result.property_holds, result.explanation # fails — lost update!
"""
from __future__ import annotations
import dis
import sys
import threading
import time
import types
from collections.abc import Callable
from typing import Any, TypeVar
from frontrun._cooperative import (
clear_context,
patch_locks,
real_condition,
real_lock,
set_context,
set_sync_reporter,
unpatch_locks,
)
from frontrun._deadlock import SchedulerAbort, install_wait_for_graph, uninstall_wait_for_graph
from frontrun._io_detection import (
patch_io,
set_io_reporter,
uninstall_io_profile,
unpatch_io,
)
from frontrun._trace_format import TraceRecorder, build_call_chain, format_trace
from frontrun._tracing import is_dynamic_code as _is_dynamic_code
from frontrun._tracing import should_trace_file as _should_trace_file
from frontrun.cli import require_active as _require_frontrun_env
from frontrun.common import InterleavingResult
try:
from frontrun._dpor import PyDporEngine, PyExecution # type: ignore[reportAttributeAccessIssue]
except ModuleNotFoundError as _err:
raise ModuleNotFoundError(
"explore_dpor requires the frontrun._dpor Rust extension.\n"
"Build it with: make build-dpor-3.14t (or build-dpor-3.10 / build-dpor-3.14)\n"
"Or install from source: pip install -e ."
) from _err
T = TypeVar("T")
_PY_VERSION = sys.version_info[:2]
# sys.monitoring (PEP 669) is available since 3.12 and is required for
# free-threaded builds (3.13t/3.14t) where sys.settrace + f_trace_opcodes
# has a known crash bug (CPython #118415).
_USE_SYS_MONITORING = _PY_VERSION >= (3, 12)
# ---------------------------------------------------------------------------
# Result type
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Shadow Stack for shared-access detection
# ---------------------------------------------------------------------------
class ShadowStack:
"""Mirrors CPython's evaluation stack to track object identity.
When LOAD_ATTR/STORE_ATTR execute, we peek at our shadow stack
to identify which object is being accessed.
"""
__slots__ = ("stack",)
def __init__(self) -> None:
self.stack: list[Any] = []
def push(self, val: Any) -> None:
self.stack.append(val)
def pop(self) -> Any:
return self.stack.pop() if self.stack else None
def peek(self, n: int = 0) -> Any:
idx = -(n + 1)
return self.stack[idx] if abs(idx) <= len(self.stack) else None
def clear(self) -> None:
self.stack.clear()
# Pre-analyzed instruction cache: code object -> {offset -> instruction}.
#
# Keyed by the code object itself (not ``id(code)``). Using the code
# object as the dict key keeps a strong reference, which prevents the
# object from being garbage-collected while cached. This eliminates the
# stale-cache bug where a GC'd code object's id was reused by a new one
# within a single DPOR execution.
_INSTR_CACHE: dict[Any, dict[int, dis.Instruction]] = {}
# Must use real_lock() — not threading.Lock() — to avoid cooperative re-entry
# when the pytest plugin patches threading.Lock globally.
_INSTR_CACHE_LOCK = real_lock()
def _get_instructions(code: Any) -> dict[int, dis.Instruction]:
"""Get a mapping from byte offset to Instruction for a code object."""
# Fast path: already cached (safe to read without lock on GIL builds;
# on free-threaded builds dict reads are internally locked)
cached = _INSTR_CACHE.get(code)
if cached is not None:
return cached
with _INSTR_CACHE_LOCK:
# Double-check after acquiring lock
if code in _INSTR_CACHE:
return _INSTR_CACHE[code]
mapping = {}
# show_caches parameter was added in Python 3.11
if _PY_VERSION >= (3, 11):
instructions = dis.get_instructions(code, show_caches=False)
else:
instructions = dis.get_instructions(code)
for instr in instructions:
mapping[instr.offset] = instr
_INSTR_CACHE[code] = mapping
return mapping
# ---------------------------------------------------------------------------
# Thread-local state for the DPOR scheduler
# ---------------------------------------------------------------------------
_dpor_tls = threading.local()
# ---------------------------------------------------------------------------
# LD_PRELOAD I/O event bridge
# ---------------------------------------------------------------------------
class _PreloadBridge:
"""Routes I/O events from the LD_PRELOAD library to DPOR threads.
The LD_PRELOAD library intercepts C-level ``send()``/``recv()`` calls
(e.g. from psycopg2's libpq) and writes events to a pipe. An
:class:`~frontrun._preload_io.IOEventDispatcher` reads the pipe in a
background thread and invokes :meth:`listener` for each event.
This bridge maps OS thread IDs to DPOR logical thread IDs and buffers
events in per-thread lists. The DPOR scheduler drains these buffers
at each scheduling point via :meth:`drain`.
"""
def __init__(self, dispatcher: Any = None) -> None:
self._lock = real_lock()
self._tid_to_dpor: dict[int, int] = {}
self._pending: dict[int, list[tuple[int, str, str]]] = {}
self._active = False
self._dispatcher = dispatcher # IOEventDispatcher (for poll())
def register_thread(self, os_tid: int, dpor_id: int) -> None:
"""Map an OS thread ID to a DPOR logical thread ID."""
with self._lock:
self._tid_to_dpor[os_tid] = dpor_id
self._pending.setdefault(dpor_id, [])
self._active = True
def unregister_thread(self, os_tid: int) -> None:
"""Remove an OS thread ID mapping."""
with self._lock:
self._tid_to_dpor.pop(os_tid, None)
if not self._tid_to_dpor:
self._active = False
def clear(self) -> None:
"""Clear all mappings and pending events (between executions)."""
with self._lock:
self._tid_to_dpor.clear()
self._pending.clear()
self._active = False
def listener(self, event: Any) -> None:
"""IOEventDispatcher callback — buffer the event for the right thread."""
if not self._active:
return
# Skip close events — closing a file descriptor doesn't mutate the
# external resource and creates many spurious conflict points that
# force DPOR to explore uninteresting interleavings first.
if event.kind == "close":
return
with self._lock:
dpor_id = self._tid_to_dpor.get(event.tid)
if dpor_id is None:
return
# Map libc I/O operations to DPOR access kinds. Using the
# actual send/recv distinction (write/read) is critical: the
# DPOR engine's ObjectState tracks per-thread latest-read and
# latest-write separately. If we treated all socket I/O as
# "write", only the LAST write per thread would be tracked,
# and early access positions (e.g. a SELECT recv) would be
# overwritten by later ones (e.g. a COMMIT recv). With
# read/write distinction, DPOR iteratively backtracks through
# the send/recv pairs to reach the critical interleaving.
kind = "write" if event.kind == "write" else "read"
obj_key = _make_object_key(hash(event.resource_id), event.resource_id)
self._pending.setdefault(dpor_id, []).append((obj_key, kind, event.resource_id))
def drain(self, dpor_id: int) -> list[tuple[int, str, str]]:
"""Return and clear buffered events for a DPOR thread.
Each item is ``(object_key, kind, resource_id)``.
On free-threaded Python the background reader may not have
processed pipe data yet, so we poll the dispatcher first to
flush any pending bytes into listener callbacks.
"""
if self._dispatcher is not None:
self._dispatcher.poll()
with self._lock:
events = self._pending.get(dpor_id)
if events:
self._pending[dpor_id] = []
return events
return []
# ---------------------------------------------------------------------------
# DPOR Opcode Scheduler
# ---------------------------------------------------------------------------
class DporScheduler:
"""Controls thread execution at opcode granularity, driven by the DPOR engine.
Unlike the random OpcodeScheduler in bytecode.py, this scheduler gets
its scheduling decisions from the Rust DPOR engine.
Deadlock detection uses a fallback timeout plus instant lock-ordering
cycle detection via the :class:`~frontrun._deadlock.WaitForGraph`.
"""
def __init__(
self,
engine: PyDporEngine,
execution: PyExecution,
num_threads: int,
engine_lock: threading.Lock | None = None,
deadlock_timeout: float = 5.0,
trace_recorder: TraceRecorder | None = None,
preload_bridge: _PreloadBridge | None = None,
detect_io: bool = False,
) -> None:
self.engine = engine
self.execution = execution
self.num_threads = num_threads
self.deadlock_timeout = deadlock_timeout
self.trace_recorder = trace_recorder
self._preload_bridge = preload_bridge
self._detect_io = detect_io
# On free-threaded Python, PyO3 &mut self borrows are non-blocking
# (try-or-panic). A single engine_lock serialises ALL calls to the
# engine and execution objects across worker threads, the sync
# reporter, and the main explore_dpor loop.
self._engine_lock: threading.Lock = engine_lock if engine_lock is not None else real_lock()
self._lock = real_lock()
self._condition = real_condition(self._lock)
self._finished = False
self._error: Exception | None = None
self._threads_done: set[int] = set()
self._current_thread: int | None = None
# Shadow stacks are per-thread (each thread only accesses its own),
# stored in thread-local storage. This avoids cross-thread access
# entirely, which is critical for free-threaded builds.
# Format: _dpor_tls._shadow_stacks = {frame_id: ShadowStack}
# Tracks which threads are waiting for which locks (lock_id → {thread_ids}).
# Used to block threads in the DPOR execution when they're spinning
# on a cooperative lock, and unblock them when the lock is released.
self._lock_waiters: dict[int, set[int]] = {}
# Maps iterator id → original container object. When GET_ITER creates
# an iterator from a mutable container, we record the mapping so that
# FOR_ITER can report reads on the underlying container.
self._iter_to_container: dict[int, Any] = {}
# Request the first scheduling decision
self._current_thread = self._schedule_next()
def _schedule_next(self) -> int | None:
"""Ask the DPOR engine which thread to run next."""
with self._engine_lock:
runnable = self.execution.runnable_threads()
if not runnable:
return None
return self.engine.schedule(self.execution)
def wait_for_turn(self, thread_id: int) -> bool:
"""Block until it's this thread's turn. Returns False when done."""
return self._report_and_wait(None, thread_id)
def report_and_wait(self, frame: Any, thread_id: int) -> bool:
"""Report accesses for an opcode and wait for this thread's turn.
Combines ``_process_opcode`` and the wait-for-turn logic under a
single lock acquisition so that ``engine.report_access()`` and
``engine.schedule()`` can never be called concurrently. This is
critical on free-threaded Python (3.13t/3.14t) where there is no
GIL to serialise PyO3 ``&mut self`` borrows.
"""
return self._report_and_wait(frame, thread_id)
def _report_and_wait(self, frame: Any | None, thread_id: int) -> bool:
with self._condition:
# Merge LD_PRELOAD I/O events (C-level send/recv from e.g.
# psycopg2) into the thread's pending_io list. The preload
# bridge buffers events from the pipe reader thread, keyed by
# DPOR thread ID.
_pending_io: list[tuple[int, str]] | None = getattr(_dpor_tls, "pending_io", None)
if self._preload_bridge is not None:
_preload_events = self._preload_bridge.drain(thread_id)
if _preload_events:
# Record into trace for human-readable output. These events
# come from C extensions (e.g. libpq) with no Python frame.
_recorder = self.trace_recorder
if _recorder is not None:
for _, _kind, _resource_id in _preload_events:
_recorder.record_io(thread_id, _resource_id, _kind)
# Convert 3-tuples to 2-tuples for the pending list
_io_pairs = [(_key, _kind) for _key, _kind, _ in _preload_events]
if _pending_io is not None:
_pending_io.extend(_io_pairs)
else:
_dpor_tls.pending_io = _io_pairs
_pending_io = _io_pairs
# Flush deferred I/O reports when the thread is outside all locks.
# This must happen at report_and_wait level (not only inside
# _process_opcode) to guarantee it fires even when _process_opcode
# returns early due to unresolved instructions.
#
# All events flushed in one report_and_wait share the same
# path_id (from the most recent schedule() call). The Rust
# engine's process_io_access uses record_io_access (keeps the
# FIRST access per thread, not the latest) so early accesses
# like a SELECT recv aren't overwritten by later ones like a
# COMMIT ack. This ensures DPOR backtracks to the earliest
# (most useful) position for each thread.
if _pending_io and getattr(_dpor_tls, "lock_depth", 0) == 0:
_elock = self._engine_lock
_engine = self.engine
_execution = self.execution
for _obj_key, _io_kind in _pending_io:
with _elock:
_engine.report_io_access(_execution, thread_id, _obj_key, _io_kind)
_pending_io.clear()
while True:
if self._finished or self._error:
return False
if self._current_thread == thread_id:
# Process opcode accesses only when it's our turn.
# Deferring this until the thread is scheduled ensures
# that accesses are recorded at the correct path_id
# (after any intervening operations by other threads).
# Without this, a preempted thread's accesses land at the
# preemption branch where the other thread is Active,
# making backtracks at that position impossible.
if frame is not None:
_process_opcode(frame, self, thread_id)
frame = None # only process once
# It's our turn. After executing one opcode, schedule next.
next_thread = self._schedule_next()
self._current_thread = next_thread
if next_thread is None:
self._finished = True
self._condition.notify_all()
return True
# Wait for our turn (fallback timeout for C-blocked threads)
if not self._condition.wait(timeout=self.deadlock_timeout):
if self._current_thread in self._threads_done:
# Current thread is done, try scheduling again
next_thread = self._schedule_next()
self._current_thread = next_thread
if next_thread is None:
self._finished = True
self._condition.notify_all()
continue
self._error = TimeoutError(
f"DPOR deadlock: waiting for thread {thread_id}, current is {self._current_thread}"
)
self._condition.notify_all()
return False
def mark_done(self, thread_id: int) -> None:
with self._condition:
self._threads_done.add(thread_id)
with self._engine_lock:
self.execution.finish_thread(thread_id)
# If the done thread was the current one, schedule next
if self._current_thread == thread_id:
next_thread = self._schedule_next()
self._current_thread = next_thread
if next_thread is None and len(self._threads_done) >= self.num_threads:
self._finished = True
self._condition.notify_all()
def report_error(self, error: Exception) -> None:
with self._condition:
if self._error is None:
self._error = error
self._condition.notify_all()
@staticmethod
def get_shadow_stack(frame_id: int) -> ShadowStack:
stacks = getattr(_dpor_tls, "_shadow_stacks", None)
if stacks is None:
stacks = {}
_dpor_tls._shadow_stacks = stacks
if frame_id not in stacks:
stacks[frame_id] = ShadowStack()
return stacks[frame_id]
@staticmethod
def remove_shadow_stack(frame_id: int) -> None:
stacks = getattr(_dpor_tls, "_shadow_stacks", None)
if stacks is not None:
stacks.pop(frame_id, None)
# ---------------------------------------------------------------------------
# Opcode trace callback with shadow stack access detection
# ---------------------------------------------------------------------------
# C-level method access classification.
#
# Python bytecode tracing can't see inside C method calls — list.append(),
# set.add(), dict.update(), etc. all execute opaquely. We detect these in the
# CALL handler by checking if the callable is a builtin_function_or_method
# (i.e. bound to a C-implemented object) and classifying the call as a read or
# write based on the method name.
#
# Design: immutable types are excluded entirely (calling str.upper() can't
# cause a data race). For mutable types, known read-only methods report READ;
# everything else defaults to WRITE.
_BUILTIN_METHOD_TYPE = type(len) # builtin_function_or_method
_WRAPPER_DESCRIPTOR_TYPE = type(object.__setattr__) # wrapper_descriptor
_METHOD_WRAPPER_TYPE = type("".__str__) # method-wrapper
_IMMUTABLE_TYPES = (str, bytes, int, float, bool, complex, tuple, frozenset, type(None), types.ModuleType)
# C-level methods that are read-only (don't mutate the object).
_C_METHOD_READ_ONLY = frozenset(
{
# Lookup / iteration (common to multiple container types)
"__contains__",
"__getitem__",
"__getattribute__",
"__len__",
"__iter__",
"__reversed__",
# list / tuple
"count",
"index",
# dict
"get",
"keys",
"values",
"items",
# set
"issubset",
"issuperset",
"isdisjoint",
"union",
"intersection",
"difference",
"symmetric_difference",
# copy
"copy",
"__copy__",
}
)
# C-level methods on immutable types that iterate their FIRST ARGUMENT.
# The method's __self__ is immutable (e.g. str for str.join), so the standard
# C-method handler skips them. We detect these by name and report a READ on
# the first argument instead.
_IMMUTABLE_SELF_ARG_READERS = frozenset({"join"})
# Type constructors that iterate their first argument (read it).
# These are `type` objects (list, dict, bytes, etc.) called as constructors.
# The CALL handler needs to report a READ on the first argument when one of
# these types is called.
_CONTAINER_CONSTRUCTORS: frozenset[type] = frozenset(
{list, dict, set, frozenset, tuple, bytes, bytearray, enumerate, zip, map, filter, reversed}
)
# ---------------------------------------------------------------------------
# Passthrough builtins: functions that operate on their ARGUMENTS rather than
# __self__. Keyed by id(function) for O(1) lookup.
# Format: {id(fn): (access_kind, obj_arg_index, name_arg_index_or_None)}
# ---------------------------------------------------------------------------
import builtins as _builtins_mod
import operator as _operator_mod
_PASSTHROUGH_BUILTINS: dict[int, tuple[str, int, int | None]] = {}
def _register_passthrough(fn: Any, kind: str, obj_idx: int, name_idx: int | None) -> None:
_PASSTHROUGH_BUILTINS[id(fn)] = (kind, obj_idx, name_idx)
# Attribute writers: setattr(obj, name, val), delattr(obj, name)
_register_passthrough(_builtins_mod.setattr, "write", 0, 1)
_register_passthrough(_builtins_mod.getattr, "read", 0, 1)
_register_passthrough(_builtins_mod.delattr, "write", 0, 1)
_register_passthrough(_builtins_mod.hasattr, "read", 0, 1)
# operator module item access: operator.setitem(d, k, v), etc.
_register_passthrough(_operator_mod.setitem, "write", 0, 1)
_register_passthrough(_operator_mod.getitem, "read", 0, 1)
_register_passthrough(_operator_mod.delitem, "write", 0, 1)
# len() reads the container (needed to detect check-then-act races)
_register_passthrough(_builtins_mod.len, "read", 0, None)
# Container-iterating builtins: these read their first argument by iterating it.
# Without explicit registration, DPOR doesn't see the read because __self__ is a
# module (builtins / _functools) which is immutable.
_register_passthrough(_builtins_mod.sorted, "read", 0, None)
_register_passthrough(_builtins_mod.min, "read", 0, None)
_register_passthrough(_builtins_mod.max, "read", 0, None)
_register_passthrough(_builtins_mod.sum, "read", 0, None)
_register_passthrough(_builtins_mod.any, "read", 0, None)
_register_passthrough(_builtins_mod.all, "read", 0, None)
_register_passthrough(_builtins_mod.next, "read", 0, None)
import functools as _functools_mod
_register_passthrough(_functools_mod.reduce, "read", 1, None)
def _make_object_key(obj_id: int, name: Any) -> int:
"""Create a non-negative u64 object key for the Rust engine."""
return hash((obj_id, name)) & 0xFFFFFFFFFFFFFFFF
def _report_read(
engine: PyDporEngine, execution: PyExecution, thread_id: int, obj: Any, name: Any, lock: threading.Lock
) -> None:
if obj is not None:
with lock:
engine.report_access(execution, thread_id, _make_object_key(id(obj), name), "read")
def _report_write(
engine: PyDporEngine, execution: PyExecution, thread_id: int, obj: Any, name: Any, lock: threading.Lock
) -> None:
if obj is not None:
with lock:
engine.report_access(execution, thread_id, _make_object_key(id(obj), name), "write")
def _subscript_key_name(key: Any) -> Any:
"""Normalize a subscript key for object key computation.
For string keys, return the string directly so it matches LOAD_ATTR/STORE_ATTR
argval (e.g. 'value' instead of "'value'"). For non-string keys, use repr()
as a fallback to distinguish types (e.g. int 0 vs string '0').
"""
if isinstance(key, str):
return key
return repr(key)
def _expand_slice_reads(
engine: PyDporEngine,
execution: PyExecution,
thread_id: int,
container: Any,
key: Any,
lock: threading.Lock,
) -> None:
"""For slice subscript reads, report individual element reads.
When a slice like ``buf[0:4]`` is read, DPOR only sees a single read on
key ``'slice(0, 4, None)'``. Individual element writes like ``buf[0] = x``
use key ``'0'``. These keys don't match, so DPOR doesn't see the conflict.
This function expands a slice read into per-element reads so that each
write position gets its own conflict point, enabling DPOR to explore
interleavings where the slice read happens between individual writes.
"""
if not isinstance(key, slice):
return
try:
length = len(container)
except (TypeError, AttributeError):
return
indices = range(*key.indices(length))
for idx in indices:
_report_read(engine, execution, thread_id, container, repr(idx), lock)
def _process_opcode(
frame: Any,
scheduler: DporScheduler,
thread_id: int,
) -> None:
"""Process a single opcode, updating the shadow stack and reporting accesses.
Handles opcodes across Python 3.10-3.14, including:
- 3.13: LOAD_FAST_LOAD_FAST, STORE_FAST_STORE_FAST
- 3.14: LOAD_FAST_BORROW, LOAD_FAST_BORROW_LOAD_FAST_BORROW,
STORE_FAST_LOAD_FAST, STORE_FAST_MAYBE_NULL, LOAD_FAST_AND_CLEAR,
LOAD_SMALL_INT, BINARY_SUBSCR removal
"""
code = frame.f_code
instrs = _get_instructions(code)
instr = instrs.get(frame.f_lasti)
if instr is None:
return
shadow = scheduler.get_shadow_stack(id(frame))
op = instr.opname
engine = scheduler.engine
execution = scheduler.execution
elock = scheduler._engine_lock
recorder = scheduler.trace_recorder
# === LOAD instructions: push values onto the shadow stack ===
if op in ("LOAD_FAST", "LOAD_FAST_CHECK", "LOAD_FAST_BORROW"):
# LOAD_FAST_BORROW is new in 3.14: same semantics as LOAD_FAST
# but uses a borrowed reference internally.
val = frame.f_locals.get(instr.argval)
shadow.push(val)
elif op in ("LOAD_FAST_LOAD_FAST", "LOAD_FAST_BORROW_LOAD_FAST_BORROW"):
# New in 3.13: pushes two locals in one instruction.
# LOAD_FAST_BORROW_LOAD_FAST_BORROW is the 3.14 variant using
# borrowed references internally (same observable semantics).
# argval is a tuple of two variable names.
argval = instr.argval
if isinstance(argval, tuple) and len(argval) == 2:
shadow.push(frame.f_locals.get(argval[0]))
shadow.push(frame.f_locals.get(argval[1]))
else:
shadow.push(None)
shadow.push(None)
elif op == "LOAD_GLOBAL":
val = frame.f_globals.get(instr.argval)
if val is None:
# Fall back to builtins (setattr, getattr, type, dict, object, etc.)
_fb = getattr(frame, "f_builtins", None)
if isinstance(_fb, dict):
val = _fb.get(instr.argval)
shadow.push(val)
# On 3.11+, LOAD_GLOBAL with NULL flag (bit 0 of arg) pushes an
# extra NULL slot after the value, matching the stack layout
# expected by CALL: [callable, NULL, args...].
if _PY_VERSION >= (3, 11) and instr.arg is not None and instr.arg & 1:
shadow.push(None)
# Report a READ on the module's globals dict for this variable name.
# Without this, LOAD_GLOBAL/STORE_GLOBAL races are invisible to DPOR.
_report_read(engine, execution, thread_id, frame.f_globals, instr.argval, elock)
elif op == "LOAD_NAME":
# Used in exec/eval code (module-level scope). Like LOAD_GLOBAL
# but checks locals first, then globals, then builtins.
val = frame.f_locals.get(instr.argval)
if val is None:
val = frame.f_globals.get(instr.argval)
if val is None:
_fb = getattr(frame, "f_builtins", None)
if isinstance(_fb, dict):
val = _fb.get(instr.argval)
shadow.push(val)
elif op == "LOAD_DEREF":
val = frame.f_locals.get(instr.argval)
shadow.push(val)
# Report a READ on closure cell/free variables so DPOR sees
# cross-thread conflicts. Using code as the identity works because
# threads sharing a closure function share the same code object.
varname = instr.argval
if varname in code.co_freevars or varname in code.co_cellvars:
_report_read(engine, execution, thread_id, code, varname, elock)
elif op in ("LOAD_CONST", "LOAD_CONST_IMMORTAL", "LOAD_CONST_MORTAL"):
shadow.push(instr.argval)
elif op == "LOAD_SMALL_INT":
# New in 3.14: pushes a small integer (the oparg itself).
shadow.push(instr.arg)
# === Stack manipulation ===
elif op == "COPY":
n = instr.arg
if n is not None and len(shadow.stack) >= n:
shadow.push(shadow.stack[-n])
else:
shadow.push(None)
elif op == "SWAP":
n = instr.arg
if n is not None and len(shadow.stack) >= n:
shadow.stack[-1], shadow.stack[-n] = shadow.stack[-n], shadow.stack[-1]
# --- Python 3.10 stack manipulation (replaced by COPY/SWAP in 3.11) ---
elif op == "DUP_TOP":
shadow.push(shadow.peek())
elif op == "DUP_TOP_TWO":
b = shadow.peek(0)
a = shadow.peek(1)
shadow.push(a)
shadow.push(b)
elif op == "ROT_TWO":
if len(shadow.stack) >= 2:
shadow.stack[-1], shadow.stack[-2] = shadow.stack[-2], shadow.stack[-1]
elif op == "ROT_THREE":
if len(shadow.stack) >= 3:
shadow.stack[-1], shadow.stack[-2], shadow.stack[-3] = (
shadow.stack[-2],
shadow.stack[-3],
shadow.stack[-1],
)
elif op == "ROT_FOUR":
if len(shadow.stack) >= 4:
shadow.stack[-1], shadow.stack[-2], shadow.stack[-3], shadow.stack[-4] = (
shadow.stack[-2],
shadow.stack[-3],
shadow.stack[-4],
shadow.stack[-1],
)
# === Attribute access: the instructions we care about most ===
elif op == "LOAD_ATTR":
obj = shadow.pop()
attr = instr.argval
_report_read(engine, execution, thread_id, obj, attr, elock)
# Also report on obj.__dict__ so LOAD_ATTR conflicts with
# STORE_SUBSCR on the same __dict__ (cross-path detection).
if obj is not None:
try:
_obj_dict = object.__getattribute__(obj, "__dict__")
_report_read(engine, execution, thread_id, _obj_dict, attr, elock)
except AttributeError:
pass
if recorder is not None and obj is not None:
recorder.record(thread_id, frame, opcode=op, access_type="read", attr_name=attr, obj=obj)
if obj is not None:
try:
val = getattr(obj, attr)
shadow.push(val)
# When the loaded value is a mutable object (but NOT a bound
# method), report a READ on the object itself. This detects
# cases where a container is read indirectly — e.g. passed
# to len() or iterated — creating a conflict with C-level
# method WRITEs (append, add, etc.) reported by the CALL
# handler below.
#
# We skip bound methods (loading .append is not a container
# read) and immutable types (no mutation possible).
if val is not None and type(val) is not _BUILTIN_METHOD_TYPE and not isinstance(val, _IMMUTABLE_TYPES):
_report_read(engine, execution, thread_id, val, "__cmethods__", elock)
except Exception:
shadow.push(None)
else:
shadow.push(None)
# On 3.11+, LOAD_ATTR with method flag (bit 0 of arg) pushes an
# extra self/NULL slot after the callable, matching LOAD_METHOD's
# stack layout: [value, NULL].
if _PY_VERSION >= (3, 11) and instr.arg is not None and instr.arg & 1:
shadow.push(None)
elif op == "STORE_ATTR":
obj = shadow.pop() # TOS = object
_val = shadow.pop() # TOS1 = value
_report_write(engine, execution, thread_id, obj, instr.argval, elock)
# Also report on obj.__dict__ so STORE_ATTR conflicts with
# STORE_SUBSCR on the same __dict__ (cross-path detection).
if obj is not None:
try:
_obj_dict = object.__getattribute__(obj, "__dict__")
_report_write(engine, execution, thread_id, _obj_dict, instr.argval, elock)
except AttributeError:
pass
if recorder is not None and obj is not None:
recorder.record(thread_id, frame, opcode=op, access_type="write", attr_name=instr.argval, obj=obj)
elif op == "LOAD_METHOD":
# Python 3.10 only (replaced by LOAD_ATTR with method flag in 3.11+).
# Pops owner, pushes (method, self/NULL) — net stack effect +1.
obj = shadow.pop()
attr = instr.argval
_report_read(engine, execution, thread_id, obj, attr, elock)
if recorder is not None and obj is not None:
recorder.record(thread_id, frame, opcode=op, access_type="read", attr_name=attr, obj=obj)
if obj is not None:
try:
shadow.push(getattr(obj, attr))
except Exception:
shadow.push(None)
else:
shadow.push(None)
# Extra push for the self/NULL slot (LOAD_METHOD pushes 2 values).
shadow.push(None)
elif op == "DELETE_ATTR":
obj = shadow.pop()
_report_write(engine, execution, thread_id, obj, instr.argval, elock)
if recorder is not None and obj is not None:
recorder.record(thread_id, frame, opcode=op, access_type="write", attr_name=instr.argval, obj=obj)
# === Subscript access (dict/list operations) ===
elif op == "BINARY_SUBSCR":
# Present on 3.10-3.13. Removed in 3.14 (replaced by BINARY_OP
# with subscript oparg).
key = shadow.pop()
container = shadow.pop()
_kname = _subscript_key_name(key)
_report_read(engine, execution, thread_id, container, _kname, elock)
# Container-level read for conflict with C-methods and different subscript keys.
if container is not None and not isinstance(container, _IMMUTABLE_TYPES):
_report_read(engine, execution, thread_id, container, "__cmethods__", elock)
# For slice accesses, also report reads on individual element keys
# so DPOR sees per-element conflicts with STORE_SUBSCR writes.
_expand_slice_reads(engine, execution, thread_id, container, key, elock)
if recorder is not None and container is not None:
recorder.record(thread_id, frame, opcode=op, access_type="read", attr_name=_kname, obj=container)
shadow.push(None)
elif op == "STORE_SUBSCR":
key = shadow.pop()
container = shadow.pop()
_val = shadow.pop()
_kname = _subscript_key_name(key)
_report_write(engine, execution, thread_id, container, _kname, elock)
# Also report a container-level write so subscript writes conflict
# with C-method reads (e.g. len(), iteration) and with subscript
# reads using different keys (e.g. slice vs element).
# Uses regular (last-access) semantics: keeping the LAST write
# position enables cascading backtracks that progressively interleave
# reads between individual writes across multiple DPOR executions.
if container is not None and not isinstance(container, _IMMUTABLE_TYPES):
_report_write(engine, execution, thread_id, container, "__cmethods__", elock)
if recorder is not None and container is not None:
recorder.record(thread_id, frame, opcode=op, access_type="write", attr_name=_kname, obj=container)
elif op == "DELETE_SUBSCR":
key = shadow.pop()
container = shadow.pop()
_kname = _subscript_key_name(key)
_report_write(engine, execution, thread_id, container, _kname, elock)
# Container-level write for delete too (regular last-access semantics).
if container is not None and not isinstance(container, _IMMUTABLE_TYPES):
_report_write(engine, execution, thread_id, container, "__cmethods__", elock)
if recorder is not None and container is not None:
recorder.record(thread_id, frame, opcode=op, access_type="write", attr_name=_kname, obj=container)
# === Arithmetic and binary operations ===
elif op == "BINARY_OP":
# On 3.14, BINARY_OP also handles subscript operations (replacing
# the removed BINARY_SUBSCR). Check the argrepr for "[]" / subscript.
argrepr = instr.argrepr
if argrepr and ("[" in argrepr or "NB_SUBSCR" in argrepr.upper()):
key = shadow.pop()
container = shadow.pop()
_kname = _subscript_key_name(key)
_report_read(engine, execution, thread_id, container, _kname, elock)
# Container-level read for subscript access (same as BINARY_SUBSCR).
if container is not None and not isinstance(container, _IMMUTABLE_TYPES):
_report_read(engine, execution, thread_id, container, "__cmethods__", elock)
# For slice accesses, also report reads on individual element keys
_expand_slice_reads(engine, execution, thread_id, container, key, elock)
if recorder is not None and container is not None:
recorder.record(thread_id, frame, opcode=op, access_type="read", attr_name=_kname, obj=container)
shadow.push(None)
else:
shadow.pop()
shadow.pop()
shadow.push(None)
elif op.startswith(("INPLACE_", "BINARY_")):
# Python 3.10 INPLACE_ADD, INPLACE_SUBTRACT, etc. and
# BINARY_ADD, BINARY_MULTIPLY, etc. All pop 2, push 1.
# (BINARY_OP and BINARY_SUBSCR are already handled above.)
shadow.pop()
shadow.pop()
shadow.push(None)
# === Store instructions ===
elif op == "STORE_GLOBAL":
shadow.pop()
# Report a WRITE on the module's globals dict for this variable name.
_report_write(engine, execution, thread_id, frame.f_globals, instr.argval, elock)
elif op == "STORE_DEREF":
shadow.pop()
# Report a WRITE on closure cell/free variables.
varname = instr.argval
if varname in code.co_freevars or varname in code.co_cellvars:
_report_write(engine, execution, thread_id, code, varname, elock)
elif op == "STORE_FAST":
shadow.pop()
elif op == "STORE_FAST_STORE_FAST":
# New in 3.13: pops two values.
shadow.pop()
shadow.pop()
elif op == "STORE_FAST_LOAD_FAST":
# New in 3.14: stores TOS into one local, then loads another local.
# Net stack effect is 0 (pop one, push one) but the fallback handler
# sees effect=0 and does nothing, losing the pushed value.
argval = instr.argval
shadow.pop()
if isinstance(argval, tuple) and len(argval) == 2:
shadow.push(frame.f_locals.get(argval[1]))
else:
shadow.push(None)
elif op == "STORE_FAST_MAYBE_NULL":
# New in 3.14: like STORE_FAST but tolerates NULL on TOS.
shadow.pop()
elif op == "LOAD_FAST_AND_CLEAR":
# Used in comprehensions: loads a local then clears it.
val = frame.f_locals.get(instr.argval)
shadow.push(val)
# === Return/pop ===
elif op in ("RETURN_VALUE", "RETURN_CONST"):
shadow.pop()
elif op == "POP_TOP":
shadow.pop()
# === Build instructions (slice, list, etc.) ===
elif op == "BUILD_SLICE":
# BUILD_SLICE pops 2 or 3 items (start, stop, [step]) and pushes a slice object.
# The fallback stack_effect handler adjusts the size correctly, but we need
# the ACTUAL slice object on the shadow stack so that BINARY_SUBSCR can
# detect slice accesses and expand them into per-element reads.
argc = instr.arg or 2
items: list[Any] = [shadow.pop() for _ in range(argc)]
items.reverse()
try:
if argc == 2:
shadow.push(slice(items[0], items[1]))
else:
shadow.push(slice(items[0], items[1], items[2]))
except (TypeError, ValueError):
shadow.push(None)
# === Iterator operations ===
# GET_ITER creates an iterator from a container. We record the mapping
# so that FOR_ITER can report reads on the original container.
elif op == "GET_ITER":
# stack: [iterable] → [iterator]
# GET_ITER pops the iterable and pushes an iterator (stack effect = 0).
# We record the iterable→iterator mapping via a mutable marker on the
# shadow stack so that FOR_ITER can report per-element reads.
iterable = shadow.peek()
if iterable is not None and not isinstance(iterable, _IMMUTABLE_TYPES):
shadow.pop()
# Mutable list marker: [tag, container, iteration_counter]
# The counter tracks which element index FOR_ITER is reading,
# enabling per-element conflict detection with STORE_SUBSCR writes.
shadow.push(["__iter_source__", iterable, 0])
else:
shadow.pop()
shadow.push(None)
elif op == "FOR_ITER":
# stack: [iterator] → [iterator, next_value] or [−iterator] (exhausted)
# FOR_ITER calls __next__ on the iterator. If the iterator was created
# from a mutable container (tracked via GET_ITER), report reads on it.
# stack effect = +1 (pushes the yielded value; TOS is the iterator).
# We peek at the iterator marker to find the underlying container.
top = shadow.peek()
if isinstance(top, list) and len(top) == 3 and top[0] == "__iter_source__":
_iter_container = top[1]
_iter_counter = top[2]
if _iter_container is not None and not isinstance(_iter_container, _IMMUTABLE_TYPES):
# Per-element read using the iteration counter as the key.
# For lists, counter 0, 1, 2... matches STORE_SUBSCR keys "0", "1", "2"...
# This creates per-element conflicts enabling fine-grained interleaving.
_report_read(engine, execution, thread_id, _iter_container, repr(_iter_counter), elock)
# Coarse-grained read for conflict with C-method writes (append,
# insert, etc.) and other container-level operations. Uses regular
# (last-access) semantics: each iteration overwrites the previous read
# position. This means backtracks target the LAST iteration, which
# allows the other thread to interleave after some elements have
# already been read — catching mid-iteration mutation races.
_report_read(engine, execution, thread_id, _iter_container, "__cmethods__", elock)
# Increment counter for next iteration (mutable list, in-place update).
top[2] = _iter_counter + 1
shadow.push(None) # push the yielded value
elif op == "END_FOR":
# End of for loop — pops the exhausted iterator value.
shadow.pop()
elif op == "POP_ITER":
# Python 3.14: pops the iterator itself at end of for loop.
shadow.pop()
# === Function/method calls ===
elif op in ("CALL", "CALL_FUNCTION", "CALL_METHOD", "CALL_KW", "CALL_FUNCTION_KW", "CALL_FUNCTION_EX"):
# Detect C-level method calls and classify as read or write.
#
# Three detection strategies, tried in order:
# 1. Passthrough builtins (setattr, getattr, operator.setitem, etc.)
# — operate on their arguments rather than __self__
# 2. Bound C methods (list.append, dict.update, etc.)
# — __self__ is the mutable target object
# 3. Wrapper descriptors (object.__setattr__, dict.__setitem__, etc.)
# — unbound C type methods, first argument is the target
argc = instr.arg or 0
scan_depth = min(argc + 3, len(shadow.stack))
_call_handled = False
# When a container constructor (enumerate, zip, etc.) wraps a mutable
# iterable, we save the source container so that GET_ITER → FOR_ITER
# can report per-element reads on the underlying container.
_constructor_source: Any = None
for i in range(scan_depth):
item = shadow.stack[-(i + 1)]
if item is None:
continue
item_type = type(item)
# --- Strategy 1: Passthrough builtins ---
# These are builtins whose __self__ is a module (builtins, operator)
# but that access their ARGUMENTS. Identified by id(function).
_pt = _PASSTHROUGH_BUILTINS.get(id(item))
if _pt is not None:
_pt_kind, _pt_obj_idx, _pt_name_idx = _pt
# Arguments are always in the top `argc` positions on the stack
# regardless of Python version (3.10: [func, args], 3.11-3.13:
# [NULL, func, args], 3.14: [func, NULL, args]).
if argc >= _pt_obj_idx + 1:
_pt_target = shadow.stack[-(argc - _pt_obj_idx)]
_pt_attr: Any = "__cmethods__"
if _pt_name_idx is not None and argc >= _pt_name_idx + 1:
_raw = shadow.stack[-(argc - _pt_name_idx)]
if isinstance(_raw, str):
_pt_attr = _raw
if _pt_target is not None and not isinstance(_pt_target, _IMMUTABLE_TYPES):
if _pt_kind == "read":
_report_read(engine, execution, thread_id, _pt_target, _pt_attr, elock)
else:
_report_write(engine, execution, thread_id, _pt_target, _pt_attr, elock)
_call_handled = True
break
# --- Strategy 2: Bound C methods (existing behavior) ---
if item_type is _BUILTIN_METHOD_TYPE or item_type is _METHOD_WRAPPER_TYPE:
self_obj = getattr(item, "__self__", None)
if self_obj is not None and not isinstance(self_obj, _IMMUTABLE_TYPES):
method_name = getattr(item, "__name__", None)
if method_name in _C_METHOD_READ_ONLY:
_report_read(engine, execution, thread_id, self_obj, "__cmethods__", elock)
else:
_report_write(engine, execution, thread_id, self_obj, "__cmethods__", elock)
_call_handled = True
break
# __self__ is immutable (e.g. str, module) — check if the method
# iterates its first argument (e.g. str.join reads the iterable).
if self_obj is not None:
method_name = getattr(item, "__name__", None)
if method_name in _IMMUTABLE_SELF_ARG_READERS and argc >= 1:
_arg_target = shadow.stack[-argc]
if _arg_target is not None and not isinstance(_arg_target, _IMMUTABLE_TYPES):
_report_read(engine, execution, thread_id, _arg_target, "__cmethods__", elock)
_call_handled = True
break
# Otherwise fall through to continue scan
# --- Strategy 2b: Type constructors that iterate arguments ---
# list(iterable), dict(iterable), bytes(iterable), enumerate(iterable),
# zip(iter1, iter2), map(func, iterable), filter(func, iterable), etc.
if item_type is type and item in _CONTAINER_CONSTRUCTORS:
# Report a READ on each mutable argument (they get iterated).
# Also save the first mutable arg as the "source container" so that
# if this constructor result is iterated via FOR_ITER, the reads
# are attributed to the underlying container (not the wrapper).
for _ci in range(argc):
_c_arg = shadow.stack[-(argc - _ci)]
if _c_arg is not None and not isinstance(_c_arg, _IMMUTABLE_TYPES):
_report_read(engine, execution, thread_id, _c_arg, "__cmethods__", elock)
if _constructor_source is None:
_constructor_source = _c_arg
_call_handled = True
break
# --- Strategy 3: Wrapper descriptors (unbound C type methods) ---
if item_type is _WRAPPER_DESCRIPTOR_TYPE:
objclass = getattr(item, "__objclass__", None)
if objclass is not None and not issubclass(objclass, _IMMUTABLE_TYPES):
# First argument (self) is always at the bottom of the argc args
if argc >= 1:
_wd_target = shadow.stack[-argc]
if _wd_target is not None and not isinstance(_wd_target, _IMMUTABLE_TYPES):
method_name = getattr(item, "__name__", None)
if method_name in _C_METHOD_READ_ONLY:
_report_read(engine, execution, thread_id, _wd_target, "__cmethods__", elock)
else:
_report_write(engine, execution, thread_id, _wd_target, "__cmethods__", elock)
_call_handled = True
break
# Standard stack effect handling.
try:
effect = dis.stack_effect(instr.opcode, instr.arg or 0)
for _ in range(max(0, -effect)):
shadow.pop()
for _ in range(max(0, effect)):
shadow.push(None)
except (ValueError, TypeError):
shadow.clear()
# Fixup: when a container constructor (enumerate, zip, map, etc.) wraps
# a mutable iterable, replace the None result on TOS with the source
# container. This way GET_ITER picks it up and FOR_ITER can report
# per-element reads on the underlying container during iteration.
if _constructor_source is not None and shadow.stack:
shadow.stack[-1] = _constructor_source
else:
# Fallback: use dis.stack_effect for unknown opcodes.
# This handles PUSH_NULL, RESUME, PRECALL, and any
# version-specific opcodes we don't explicitly handle.
try:
effect = dis.stack_effect(instr.opcode, instr.arg or 0)
for _ in range(max(0, -effect)):
shadow.pop()
for _ in range(max(0, effect)):
shadow.push(None)
except (ValueError, TypeError):
shadow.clear()
# ---------------------------------------------------------------------------
# DPOR Bytecode Runner
# ---------------------------------------------------------------------------
class DporBytecodeRunner:
"""Runs threads under DPOR-controlled bytecode-level interleaving.
Uses sys.monitoring (PEP 669) on Python 3.12+ for thread-safe opcode
instrumentation. Falls back to sys.settrace on 3.10-3.11.
"""
# sys.monitoring tool ID for DPOR (use PROFILER to avoid conflict with debuggers)
_TOOL_ID: int | None = None
def __init__(
self,
scheduler: DporScheduler,
detect_io: bool = True,
preload_bridge: _PreloadBridge | None = None,
) -> None:
self.scheduler = scheduler
self.detect_io = detect_io
self._preload_bridge = preload_bridge
self.threads: list[threading.Thread] = []
self.errors: dict[int, Exception] = {}
self._lock_patched = False
self._io_patched = False
self._monitoring_active = False
def _patch_locks(self) -> None:
install_wait_for_graph()
patch_locks()
self._lock_patched = True
def _unpatch_locks(self) -> None:
if self._lock_patched:
unpatch_locks()
uninstall_wait_for_graph()
self._lock_patched = False
def _patch_io(self) -> None:
if not self.detect_io:
return
patch_io()
self._io_patched = True
def _unpatch_io(self) -> None:
if self._io_patched:
unpatch_io()
self._io_patched = False
# --- sys.settrace backend (3.10-3.11) ---
def _make_trace(self, thread_id: int) -> Callable[..., Any]:
scheduler = self.scheduler
_detect_io = scheduler._detect_io
def trace(frame: Any, event: str, arg: Any) -> Any:
if scheduler._finished or scheduler._error:
return None
if event == "call":
filename = frame.f_code.co_filename
if _should_trace_file(filename):
# Skip dynamically generated code (<string>, etc.)
# unless its caller is user code. In I/O mode, skip
# all dynamic code unconditionally.
if _is_dynamic_code(filename):
if _detect_io:
return None
caller = frame.f_back
if caller is None or not _should_trace_file(caller.f_code.co_filename):
return None
frame.f_trace_opcodes = True
return trace
return None
if event == "opcode":
scheduler.report_and_wait(frame, thread_id)
return trace
if event == "return":
scheduler.remove_shadow_stack(id(frame))
return trace
return trace
return trace
# --- sys.monitoring backend (3.12+) ---
def _setup_monitoring(self) -> None:
"""Set up sys.monitoring INSTRUCTION events for all code objects."""
if not _USE_SYS_MONITORING:
return
mon = sys.monitoring
tool_id = mon.PROFILER_ID # type: ignore[attr-defined]
DporBytecodeRunner._TOOL_ID = tool_id
mon.use_tool_id(tool_id, "frontrun._dpor") # type: ignore[attr-defined]
mon.set_events(tool_id, mon.events.PY_START | mon.events.PY_RETURN | mon.events.INSTRUCTION) # type: ignore[attr-defined]
scheduler = self.scheduler
_detect_io = scheduler._detect_io
def handle_py_start(code: Any, instruction_offset: int) -> Any:
# Only use mon.DISABLE for code that should *never* be traced
# (stdlib, site-packages, frontrun internals). Do NOT disable
# for transient conditions like scheduler._finished — DISABLE
# permanently removes INSTRUCTION events from the code object,
# corrupting monitoring state for subsequent DPOR iterations
# and tests that share the same tool ID.
if not _should_trace_file(code.co_filename):
return mon.DISABLE # type: ignore[attr-defined]
# In I/O-detection mode, skip dynamically generated code
# (e.g. dataclass __init__ from exec/compile in libraries).
# These create thousands of extra scheduling points that
# drown out I/O-based backtrack points. Safe to DISABLE
# because each exec() creates a fresh code object.
if _detect_io and code.co_filename.startswith("<"):
return mon.DISABLE # type: ignore[attr-defined]
return None
def handle_py_return(code: Any, instruction_offset: int, retval: Any) -> Any:
if not _should_trace_file(code.co_filename):
return None
thread_id = getattr(_dpor_tls, "thread_id", None)
if thread_id is not None and getattr(_dpor_tls, "scheduler", None) is scheduler:
frame = sys._getframe(1)
scheduler.remove_shadow_stack(id(frame))
return None
def handle_instruction(code: Any, instruction_offset: int) -> Any:
if scheduler._finished or scheduler._error:
return None
if not _should_trace_file(code.co_filename):
return None
# Skip dynamically generated code (<string>, etc.) unless its
# caller is user code. Libraries use exec/compile internally
# (dataclass __init__, SQLAlchemy methods) creating thousands
# of scheduling points in non-user code. In I/O mode, skip
# all dynamic code unconditionally.
if _is_dynamic_code(code.co_filename):
if _detect_io:
return None
frame = sys._getframe(1)
caller = frame.f_back
if caller is None or not _should_trace_file(caller.f_code.co_filename):
return None
thread_id = getattr(_dpor_tls, "thread_id", None)
if thread_id is None:
return None
# Guard against zombie threads from a previous DporBytecodeRunner
# whose monitoring was torn down and replaced by ours. The zombie
# still has TLS from the old scheduler, but this closure captures
# the *new* scheduler. Letting it through would call engine
# methods on the wrong execution, causing PyO3 borrow conflicts.
if getattr(_dpor_tls, "scheduler", None) is not scheduler:
return None
# Use sys._getframe() to get the actual frame for _process_opcode.
# report_and_wait runs _process_opcode and wait_for_turn under a
# single lock so that engine.report_access() and engine.schedule()
# cannot overlap on free-threaded builds.
frame = sys._getframe(1)
scheduler.report_and_wait(frame, thread_id)
return None
mon.register_callback(tool_id, mon.events.PY_START, handle_py_start) # type: ignore[attr-defined]
mon.register_callback(tool_id, mon.events.PY_RETURN, handle_py_return) # type: ignore[attr-defined]
mon.register_callback(tool_id, mon.events.INSTRUCTION, handle_instruction) # type: ignore[attr-defined]
self._monitoring_active = True
def _teardown_monitoring(self) -> None:
if not self._monitoring_active:
return
mon = sys.monitoring
tool_id = DporBytecodeRunner._TOOL_ID
if tool_id is not None:
mon.set_events(tool_id, 0) # type: ignore[attr-defined]
mon.register_callback(tool_id, mon.events.PY_START, None) # type: ignore[attr-defined]
mon.register_callback(tool_id, mon.events.PY_RETURN, None) # type: ignore[attr-defined]
mon.register_callback(tool_id, mon.events.INSTRUCTION, None) # type: ignore[attr-defined]
mon.free_tool_id(tool_id) # type: ignore[attr-defined]
self._monitoring_active = False
# --- Thread entry points ---
def _setup_dpor_tls(self, thread_id: int) -> None:
"""Set up both shared cooperative TLS and DPOR-specific TLS."""
scheduler = self.scheduler
engine = scheduler.engine
execution = scheduler.execution
engine_lock = scheduler._engine_lock
# Shared context for cooperative primitives
set_context(self.scheduler, thread_id)
# Sync reporter so cooperative Lock/RLock report to the DPOR engine.
# Must hold the engine_lock to serialise PyO3 &mut self borrows.
# Also handles block/unblock for cooperative lock spinning:
# "lock_wait" → execution.block_thread (DPOR skips this thread)
# "lock_acquire" → execution.unblock_thread (DPOR can schedule again)
# "lock_release" → unblock all waiters for this lock
def _sync_reporter(event: str, obj_id: int) -> None:
if event == "lock_wait":
with engine_lock:
scheduler._lock_waiters.setdefault(obj_id, set()).add(thread_id)
execution.block_thread(thread_id)
return
if event == "lock_acquire":
with engine_lock:
waiter_set = scheduler._lock_waiters.get(obj_id)
if waiter_set is not None and thread_id in waiter_set:
waiter_set.discard(thread_id)
execution.unblock_thread(thread_id)
engine.report_sync(execution, thread_id, "lock_acquire", obj_id)
_dpor_tls.lock_depth = getattr(_dpor_tls, "lock_depth", 0) + 1
return
if event == "lock_release":
with engine_lock:
waiters = scheduler._lock_waiters.pop(obj_id, set())
for waiter in waiters:
execution.unblock_thread(waiter)
engine.report_sync(execution, thread_id, "lock_release", obj_id)
_dpor_tls.lock_depth = max(0, getattr(_dpor_tls, "lock_depth", 1) - 1)
# Wake threads that may now be schedulable
with scheduler._condition:
scheduler._condition.notify_all()
return
with engine_lock:
engine.report_sync(execution, thread_id, event, obj_id)
set_sync_reporter(_sync_reporter)
# DPOR-specific TLS for _process_opcode (shadow stacks, etc.)
_dpor_tls.scheduler = self.scheduler
_dpor_tls.thread_id = thread_id
_dpor_tls.engine = engine
_dpor_tls.execution = execution
# IO detection: defer I/O reports until the thread exits all locks
# so that the DPOR backtrack point lands at a scheduling point where
# the other thread can actually run (not blocked on the lock).
_dpor_tls.lock_depth = 0
_dpor_tls.pending_io = []
# Register OS TID → DPOR thread ID mapping for LD_PRELOAD events.
if self._preload_bridge is not None:
self._preload_bridge.register_thread(threading.get_native_id(), thread_id)
if self.detect_io:
_recorder = scheduler.trace_recorder
def _io_reporter(resource_id: str, kind: str) -> None:
object_key = _make_object_key(hash(resource_id), resource_id)
pending: list[tuple[int, str]] = _dpor_tls.pending_io
pending.append((object_key, kind))
# Record I/O event in the trace for human-readable output
if _recorder is not None:
_frame = sys._getframe(1)
# Walk up to find user code (skip frontrun internals)
while _frame is not None and not _should_trace_file(_frame.f_code.co_filename):
_frame = _frame.f_back
if _frame is not None:
chain = build_call_chain(_frame, filter_fn=_should_trace_file)
_recorder.record(
thread_id=thread_id,
frame=_frame,
opcode="IO",
access_type=kind,
attr_name=resource_id,
call_chain=chain,
)
set_io_reporter(_io_reporter)
def _teardown_dpor_tls(self) -> None:
"""Clean up both shared and DPOR-specific TLS."""
if self._preload_bridge is not None:
self._preload_bridge.unregister_thread(threading.get_native_id())
if self.detect_io:
set_io_reporter(None)
uninstall_io_profile()
clear_context()
set_sync_reporter(None)
_dpor_tls.scheduler = None
_dpor_tls.thread_id = None
_dpor_tls.engine = None
_dpor_tls.execution = None
_dpor_tls.lock_depth = 0
_dpor_tls.pending_io = []
def _run_thread_settrace(
self,
thread_id: int,
func: Callable[..., None],
args: tuple[Any, ...],
) -> None:
"""Thread entry using sys.settrace (3.10-3.11)."""
try:
self._setup_dpor_tls(thread_id)
trace_fn = self._make_trace(thread_id)
sys.settrace(trace_fn)
func(*args)
except SchedulerAbort:
pass # scheduler already has the error; just exit cleanly
except Exception as e:
self.errors[thread_id] = e
self.scheduler.report_error(e)
finally:
sys.settrace(None)
self._teardown_dpor_tls()
self.scheduler.mark_done(thread_id)
def _run_thread_monitoring(
self,
thread_id: int,
func: Callable[..., None],
args: tuple[Any, ...],
) -> None:
"""Thread entry using sys.monitoring (3.12+)."""
try:
self._setup_dpor_tls(thread_id)
func(*args)
except SchedulerAbort:
pass # scheduler already has the error; just exit cleanly
except Exception as e:
self.errors[thread_id] = e
self.scheduler.report_error(e)
finally:
self._teardown_dpor_tls()
self.scheduler.mark_done(thread_id)
def run(
self,
funcs: list[Callable[..., None]],
args: list[tuple[Any, ...]] | None = None,
timeout: float = 10.0,
) -> None:
if args is None:
args = [() for _ in funcs]
use_monitoring = _USE_SYS_MONITORING
if use_monitoring:
self._setup_monitoring()
run_thread = self._run_thread_monitoring
else:
run_thread = self._run_thread_settrace
try:
for i, (func, a) in enumerate(zip(funcs, args)):
t = threading.Thread(
target=run_thread,
args=(i, func, a),
name=f"dpor-{i}",
daemon=True,
)
self.threads.append(t)
for t in self.threads:
t.start()
deadline = time.monotonic() + timeout
for t in self.threads:
remaining = max(0, deadline - time.monotonic())
t.join(timeout=remaining)
# Signal scheduler to abort if any threads are still alive.
# On free-threaded Python, condition notifications can be lost
# and threads may still be blocked in wait_for_turn.
alive = [t for t in self.threads if t.is_alive()]
if alive:
self.scheduler._error = TimeoutError(f"Timed out waiting for {len(alive)} thread(s) to complete")
with self.scheduler._condition:
self.scheduler._condition.notify_all()
# Give threads a brief grace period to notice the error
for t in alive:
t.join(timeout=0.5)
finally:
if use_monitoring:
self._teardown_monitoring()
if self.errors:
first_error = next(iter(self.errors.values()))
if not isinstance(first_error, TimeoutError):
raise first_error
# ---------------------------------------------------------------------------
# High-level API
# ---------------------------------------------------------------------------
[docs]
def explore_dpor(
setup: Callable[[], T],
threads: list[Callable[[T], None]],
invariant: Callable[[T], bool],
max_executions: int | None = None,
preemption_bound: int | None = 2,
max_branches: int = 100_000,
timeout_per_run: float = 5.0,
stop_on_first: bool = True,
detect_io: bool = True,
deadlock_timeout: float = 5.0,
reproduce_on_failure: int = 10,
total_timeout: float | None = None,
) -> InterleavingResult:
"""Systematically explore interleavings using DPOR.
This is the DPOR replacement for ``explore_interleavings()``. Instead of
random sampling, it uses the DPOR algorithm to explore only distinct
interleavings (modulo independent operation reordering).
Args:
setup: Creates fresh shared state for each execution.
threads: List of callables, each receiving the shared state.
invariant: Predicate over shared state; must be True after all
threads complete.
max_executions: Safety limit on total executions (None = unlimited).
preemption_bound: Limit on preemptions per execution. 2 catches most
bugs. None = unbounded (full DPOR).
max_branches: Maximum scheduling points per execution.
timeout_per_run: Timeout per execution in seconds.
stop_on_first: If True (default), stop exploring as soon as the
first invariant violation is found. Set to False to collect
all failing interleavings.
detect_io: Automatically detect socket/file I/O operations and
report them as resource accesses (default True). Two threads
accessing the same endpoint or file will be treated as
conflicting, enabling DPOR to explore their orderings.
deadlock_timeout: Seconds to wait before declaring a deadlock
(default 5.0). Increase for code that legitimately blocks
in C extensions (NumPy, database queries, network I/O).
reproduce_on_failure: When a counterexample is found, replay the
same schedule this many times to measure reproducibility
(default 10). Set to 0 to skip reproduction testing.
total_timeout: Maximum total time in seconds for the entire
exploration (default None = unlimited). When exceeded, returns
results gathered so far.
Returns:
InterleavingResult with exploration statistics and any counterexample found.
.. note::
When running under **pytest**, this function requires the
``frontrun`` CLI wrapper (``frontrun pytest ...``) or the
``--frontrun-patch-locks`` flag. Without it, the test is
automatically skipped.
"""
_require_frontrun_env("explore_dpor")
num_threads = len(threads)
pb = None if preemption_bound is None else preemption_bound
me = None if max_executions is None else max_executions
engine = PyDporEngine(
num_threads=num_threads,
preemption_bound=pb,
max_branches=max_branches,
max_executions=me,
)
result = InterleavingResult(property_holds=True)
# Shared lock serialising ALL PyO3 calls to engine/execution objects.
# On free-threaded Python, PyO3 &mut self borrows panic rather than
# block when contested, so we need a Python-level lock shared across
# worker threads, the sync reporter, and the main loop.
engine_lock = real_lock()
total_deadline = time.monotonic() + total_timeout if total_timeout is not None else None
# Set up the LD_PRELOAD → DPOR bridge for C-level I/O detection.
# When code under test uses C extensions that call libc send()/recv()
# directly (e.g. psycopg2/libpq), the Python-level monkey-patches in
# _io_detection can't see those calls. The LD_PRELOAD library
# intercepts them at the C level and writes events to a pipe. The
# IOEventDispatcher reads the pipe in a background thread and the
# _PreloadBridge routes events to the correct DPOR thread for
# conflict analysis.
preload_dispatcher = None
preload_bridge: _PreloadBridge | None = None
if detect_io:
from frontrun._preload_io import IOEventDispatcher
preload_dispatcher = IOEventDispatcher()
preload_bridge = _PreloadBridge(dispatcher=preload_dispatcher)
preload_dispatcher.add_listener(preload_bridge.listener)
preload_dispatcher.start()
try:
while True:
if total_deadline is not None and time.monotonic() > total_deadline:
break
with engine_lock:
execution = engine.begin_execution()
recorder = TraceRecorder()
# Clear bridge state for this new execution.
if preload_bridge is not None:
preload_bridge.clear()
scheduler = DporScheduler(
engine,
execution,
num_threads,
engine_lock=engine_lock,
deadlock_timeout=deadlock_timeout,
trace_recorder=recorder,
preload_bridge=preload_bridge,
detect_io=detect_io,
)
runner = DporBytecodeRunner(scheduler, detect_io=detect_io, preload_bridge=preload_bridge)
runner._patch_locks()
runner._patch_io()
try:
state = setup()
def make_thread_func(thread_func: Callable[[T], None], s: T) -> Callable[[], None]:
def wrapper() -> None:
thread_func(s)
return wrapper
funcs = [make_thread_func(t, state) for t in threads]
try:
runner.run(funcs, timeout=timeout_per_run)
except TimeoutError:
pass
finally:
runner._unpatch_io()
runner._unpatch_locks()
result.num_explored += 1
if not invariant(state):
result.property_holds = False
with engine_lock:
schedule = execution.schedule_trace
schedule_list = list(schedule)
result.failures.append((result.num_explored, schedule_list))
if result.counterexample is None:
result.counterexample = schedule_list
# Replay the counterexample to measure reproducibility
if reproduce_on_failure > 0 and result.reproduction_attempts == 0:
from frontrun._preload_io import _set_preload_pipe_fd
from frontrun.bytecode import run_with_schedule
# Pause LD_PRELOAD pipe writes during replay. The replay
# threads do file I/O that the LD_PRELOAD library intercepts,
# generating pipe events. If the pipe buffer fills up (64 KB),
# the LD_PRELOAD write() blocks the worker thread while the
# reader thread may block on FD_MAP held by that same worker
# — a deadlock. Disabling the pipe fd avoids this entirely;
# detect_io is already False for replays anyway.
_set_preload_pipe_fd(-1)
successes = 0
for _ in range(reproduce_on_failure):
try:
# DPOR processes I/O events within the same scheduling
# step as the triggering opcode (via pending_io), so
# its schedule is pure opcode-level. The bytecode
# shuffler's _io_reporter adds extra scheduling steps
# for each I/O event, which would desync the replay.
# Disable detect_io during replay so the step counts
# match; the actual I/O still happens as a side effect
# of opcode execution.
replay_state = run_with_schedule(
schedule_list,
setup,
threads,
timeout=timeout_per_run,
detect_io=False,
deadlock_timeout=deadlock_timeout,
)
if not invariant(replay_state):
successes += 1
except Exception:
pass # timeout / crash during replay — not a reproduction
result.reproduction_attempts = reproduce_on_failure
result.reproduction_successes = successes
# Re-enable pipe writes for subsequent DPOR executions.
if preload_dispatcher is not None and preload_dispatcher._write_fd is not None:
_set_preload_pipe_fd(preload_dispatcher._write_fd)
if result.explanation is None:
result.explanation = format_trace(
recorder.events,
num_threads=num_threads,
num_explored=result.num_explored,
reproduction_attempts=result.reproduction_attempts,
reproduction_successes=result.reproduction_successes,
)
if stop_on_first:
# Clear cache before returning
with _INSTR_CACHE_LOCK:
_INSTR_CACHE.clear()
return result
# Clear instruction cache between executions to avoid stale code ids
with _INSTR_CACHE_LOCK:
_INSTR_CACHE.clear()
with engine_lock:
if not engine.next_execution():
break
finally:
if preload_dispatcher is not None:
preload_dispatcher.stop()
return result