from __future__ import annotations from dataclasses import dataclass, field from typing import Callable, List, Optional import numpy as np import pandas as pd from backtester.indicators.money_line import ( Direction, MoneyLineInputs, MoneyLineSignal, money_line_signals, ) QualityFilter = Callable[[MoneyLineSignal], bool] @dataclass class TradeConfig: position_size: float = 1000.0 take_profit_1_size_percent: float = 60.0 atr_multiplier_tp1: float = 2.0 atr_multiplier_tp2: float = 4.0 atr_multiplier_sl: float = 3.0 min_tp1_percent: float = 0.5 max_tp1_percent: float = 1.5 min_tp2_percent: float = 1.0 max_tp2_percent: float = 3.0 min_sl_percent: float = 0.8 max_sl_percent: float = 2.0 fallback_tp1_percent: float = 0.8 fallback_tp2_percent: float = 1.7 fallback_sl_percent: float = 1.3 trailing_atr_multiplier: float = 1.5 trailing_min_percent: float = 0.25 trailing_max_percent: float = 0.9 max_bars_per_trade: Optional[int] = None @dataclass class SimulatedTrade: symbol: str direction: Direction signal_type: str entry_time: pd.Timestamp exit_time: pd.Timestamp entry_price: float exit_price: float realized_pnl: float profit_percent: float exit_reason: str bars_held: int tp1_hit: bool tp2_hit: bool trailing_active: bool mae_percent: float mfe_percent: float quality_score: Optional[float] = None adx_at_entry: Optional[float] = None atr_at_entry: Optional[float] = None runner_size: float = 0.0 tp1_size: float = 0.0 _exit_index: int = field(repr=False, default=-1) @dataclass class SimulationResult: trades: List[SimulatedTrade] @property def total_pnl(self) -> float: return sum(t.realized_pnl for t in self.trades) @property def win_rate(self) -> float: wins = sum(1 for t in self.trades if t.realized_pnl > 0) return 0.0 if not self.trades else wins / len(self.trades) @property def average_pnl(self) -> float: return 0.0 if not self.trades else self.total_pnl / len(self.trades) @property def max_drawdown(self) -> float: equity = 0.0 peak = 0.0 max_dd = 0.0 for trade in self.trades: equity += trade.realized_pnl peak = max(peak, equity) max_dd = min(max_dd, equity - peak) return max_dd def simulate_money_line( df: pd.DataFrame, symbol: str, inputs: Optional[MoneyLineInputs] = None, config: Optional[TradeConfig] = None, quality_filter: Optional[QualityFilter] = None, ) -> SimulationResult: if inputs is None: inputs = MoneyLineInputs() if config is None: config = TradeConfig() if quality_filter is None: quality_filter = lambda _: True # type: ignore data = df.sort_index().copy() index_positions = {ts: idx for idx, ts in enumerate(data.index)} signals = money_line_signals(data, inputs) trades: List[SimulatedTrade] = [] next_available_index = 0 for signal in signals: if signal.timestamp not in index_positions: continue start_idx = index_positions[signal.timestamp] if start_idx < next_available_index: continue if not quality_filter(signal): continue trade = _simulate_trade(data, start_idx, signal, symbol, config) if trade is None: continue trades.append(trade) next_available_index = trade._exit_index return SimulationResult(trades=trades) def _simulate_trade( data: pd.DataFrame, start_idx: int, signal: MoneyLineSignal, symbol: str, config: TradeConfig, ) -> Optional[SimulatedTrade]: if start_idx >= len(data) - 1: return None entry_price = float(signal.entry_price) if not np.isfinite(entry_price) or entry_price <= 0: return None tp1_percent = _percent_from_atr( signal.atr, entry_price, config.atr_multiplier_tp1, config.min_tp1_percent, config.max_tp1_percent, config.fallback_tp1_percent, ) tp2_percent = _percent_from_atr( signal.atr, entry_price, config.atr_multiplier_tp2, config.min_tp2_percent, config.max_tp2_percent, config.fallback_tp2_percent, ) sl_percent = _percent_from_atr( signal.atr, entry_price, config.atr_multiplier_sl, config.min_sl_percent, config.max_sl_percent, config.fallback_sl_percent, ) direction = signal.direction tp1_price = _target_price(entry_price, tp1_percent, direction) tp2_price = _target_price(entry_price, tp2_percent, direction) stop_price = _stop_price(entry_price, sl_percent, direction) tp1_fraction = config.take_profit_1_size_percent / 100.0 tp1_fraction = np.clip(tp1_fraction, 0.0, 1.0) tp1_size = config.position_size * tp1_fraction runner_size = config.position_size - tp1_size tp1_hit = False tp2_hit = False trailing_active = False remaining_size = config.position_size realized_pnl = 0.0 exit_reason = "TIME" exit_price = entry_price exit_idx = start_idx bars_held = 0 mae = 0.0 mfe = 0.0 runner_stop_percent = _runner_stop_offset(signal.adx) runner_stop_price = _stop_price(entry_price, runner_stop_percent, direction) trailing_stop_price = runner_stop_price favorable_price = entry_price max_bars = config.max_bars_per_trade or len(data) for idx in range(start_idx + 1, len(data)): bar = data.iloc[idx] bar_high = float(bar.high) bar_low = float(bar.low) bars_held += 1 mae = min(mae, _profit_percent(bar_low if direction == "long" else bar_high, entry_price, direction)) mfe = max(mfe, _profit_percent(bar_high if direction == "long" else bar_low, entry_price, direction)) if not tp1_hit: if _stop_hit(bar_low, bar_high, stop_price, direction): realized_pnl += config.position_size * _profit_percent(stop_price, entry_price, direction) / 100.0 exit_reason = "SL" exit_price = stop_price exit_idx = idx break if _target_hit(bar_low, bar_high, tp1_price, direction): tp1_hit = True if tp1_size > 0: realized_pnl += tp1_size * _profit_percent(tp1_price, entry_price, direction) / 100.0 remaining_size -= tp1_size exit_reason = "TP1" runner_stop_price = _stop_price(entry_price, runner_stop_percent, direction) trailing_stop_price = runner_stop_price favorable_price = entry_price # Continue evaluating same bar for runner logic else: if remaining_size <= 0: exit_reason = "TP1" exit_price = tp1_price exit_idx = idx break if _stop_hit(bar_low, bar_high, runner_stop_price, direction): realized_pnl += remaining_size * _profit_percent(runner_stop_price, entry_price, direction) / 100.0 exit_reason = "BREAKEVEN" if runner_stop_percent == 0 else "RUNNER_SL" exit_price = runner_stop_price exit_idx = idx break if (not tp2_hit) and _target_hit(bar_low, bar_high, tp2_price, direction): tp2_hit = True trailing_active = True exit_reason = "TP2" favorable_price = _update_favorable_price(favorable_price, bar_high, bar_low, direction) trailing_stop_price = _compute_trailing_stop( favorable_price, entry_price, signal.atr, config, direction, ) if trailing_active: favorable_price = _update_favorable_price(favorable_price, bar_high, bar_low, direction) trailing_stop_price = _compute_trailing_stop( favorable_price, entry_price, signal.atr, config, direction, ) if _stop_hit(bar_low, bar_high, trailing_stop_price, direction): realized_pnl += remaining_size * _profit_percent(trailing_stop_price, entry_price, direction) / 100.0 exit_reason = "TRAILING_SL" exit_price = trailing_stop_price exit_idx = idx break if bars_held >= max_bars: exit_price = float(bar.close) realized_pnl += remaining_size * _profit_percent(exit_price, entry_price, direction) / 100.0 exit_reason = "MAX_TIME" exit_idx = idx break else: final_bar = data.iloc[-1] exit_price = float(final_bar.close) realized_pnl += remaining_size * _profit_percent(exit_price, entry_price, direction) / 100.0 exit_reason = "END" exit_idx = len(data) - 1 exit_time = data.index[exit_idx] profit_percent = realized_pnl / config.position_size * 100 if config.position_size else 0.0 return SimulatedTrade( symbol=symbol, direction=direction, signal_type=signal.signal_type, entry_time=signal.timestamp, exit_time=exit_time, entry_price=entry_price, exit_price=exit_price, realized_pnl=realized_pnl, profit_percent=profit_percent, exit_reason=exit_reason, bars_held=bars_held, tp1_hit=tp1_hit, tp2_hit=tp2_hit, trailing_active=trailing_active, mae_percent=mae, mfe_percent=mfe, quality_score=None, adx_at_entry=signal.adx, atr_at_entry=signal.atr, runner_size=runner_size, tp1_size=tp1_size, _exit_index=exit_idx, ) def _percent_from_atr( atr_value: float, price: float, multiplier: float, min_percent: float, max_percent: float, fallback: float, ) -> float: if price <= 0: return fallback atr_percent = (atr_value / price) * 100 if price else 0.0 if atr_percent == 0: return fallback percent = atr_percent * multiplier return float(np.clip(percent, min_percent, max_percent)) def _target_price(entry: float, percent: float, direction: Direction) -> float: if direction == "long": return entry * (1 + percent / 100.0) return entry * (1 - percent / 100.0) def _stop_price(entry: float, percent: float, direction: Direction) -> float: if direction == "long": return entry * (1 - percent / 100.0) return entry * (1 + percent / 100.0) def _stop_hit(bar_low: float, bar_high: float, stop_price: float, direction: Direction) -> bool: if direction == "long": return bar_low <= stop_price return bar_high >= stop_price def _target_hit(bar_low: float, bar_high: float, target_price: float, direction: Direction) -> bool: if direction == "long": return bar_high >= target_price return bar_low <= target_price def _profit_percent(price: float, entry: float, direction: Direction) -> float: if entry == 0: return 0.0 if direction == "long": return (price - entry) / entry * 100.0 return (entry - price) / entry * 100.0 def _runner_stop_offset(adx: float) -> float: if adx < 20: return 0.0 if adx < 25: return 0.3 return 0.55 def _update_favorable_price(current: float, bar_high: float, bar_low: float, direction: Direction) -> float: if direction == "long": return max(current, bar_high) return min(current, bar_low) def _compute_trailing_stop( favorable_price: float, entry_price: float, atr_value: float, config: TradeConfig, direction: Direction, ) -> float: atr_percent = (atr_value / entry_price) * 100 if entry_price else 0.0 trail_percent = atr_percent * config.trailing_atr_multiplier trail_percent = float(np.clip(trail_percent, config.trailing_min_percent, config.trailing_max_percent)) if direction == "long": return favorable_price * (1 - trail_percent / 100.0) return favorable_price * (1 + trail_percent / 100.0)