diff --git a/backtester/test_dynamic_thresholds.py b/backtester/test_dynamic_thresholds.py new file mode 100644 index 0000000..271d410 --- /dev/null +++ b/backtester/test_dynamic_thresholds.py @@ -0,0 +1,486 @@ +#!/usr/bin/env python3 +""" +Dynamic Threshold Backtesting - Test adaptive thresholds on historical data +WITHOUT implementing in production. + +This script: +1. Loads historical signals from database (BlockedSignal + Trade tables) +2. Simulates what market regime would have been at each signal +3. Calculates what dynamic threshold would have been +4. Compares outcomes: static vs dynamic thresholds +5. Outputs: Win rate, P&L, signal efficiency for both scenarios +""" + +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from dataclasses import dataclass +from typing import List, Dict, Tuple +import psycopg2 +from psycopg2.extras import RealDictCursor + + +@dataclass +class MarketRegime: + """Market regime at signal time""" + timestamp: datetime + volatility_score: float # 0-100 (ATR% percentile) + trend_score: float # 0-100 (ADX normalized) + momentum_score: float # 0-100 (RSI deviation) + volume_score: float # 0-100 (volume ratio normalized) + position_score: float # 0-100 (price position) + composite_score: float # 0-100 (weighted average) + regime_type: str # TRENDING, CHOPPY, REVERSAL, BREAKOUT + + +@dataclass +class SignalEvaluation: + """Signal with both static and dynamic threshold evaluation""" + timestamp: datetime + symbol: str + direction: str + quality_score: float + entry_price: float + + # Market context + atr_percent: float + adx: float + rsi: float + volume_ratio: float + price_position: float + + # Regime calculation + regime: MarketRegime + + # Threshold comparison + static_threshold: float # Current system (90 LONG, 80 SHORT) + dynamic_threshold: float # Calculated adaptive threshold + + # Execution decision + executes_static: bool # Would execute with static threshold + executes_dynamic: bool # Would execute with dynamic threshold + + # Outcome (if known from Trade table) + actual_pnl: float = None + actual_exit_reason: str = None + + +class RegimeDetector: + """Calculate market regime from signal metrics""" + + def __init__(self, lookback_window: int = 50): + self.lookback_window = lookback_window + + def calculate_regime(self, signal_data: Dict, recent_signals: pd.DataFrame) -> MarketRegime: + """ + Calculate market regime at signal time using recent signal history + + Args: + signal_data: Current signal metrics + recent_signals: Last N signals for percentile calculations + """ + # Volatility score: ATR% percentile over recent signals + atr_values = recent_signals['atr'].dropna() + if len(atr_values) > 0: + volatility_score = self._percentile_score(signal_data['atr'], atr_values) + else: + volatility_score = 50.0 + + # Trend score: ADX normalized to 0-100 + # ADX <15 = weak (0-30), ADX 15-25 = moderate (30-60), ADX >25 = strong (60-100) + adx = signal_data['adx'] + if adx < 15: + trend_score = (adx / 15) * 30 + elif adx < 25: + trend_score = 30 + ((adx - 15) / 10) * 30 + else: + trend_score = 60 + min(((adx - 25) / 25) * 40, 40) + + # Momentum score: RSI deviation from 50 + # RSI 40-60 = neutral (50), RSI <30 or >70 = extreme (0 or 100) + rsi = signal_data['rsi'] + if 40 <= rsi <= 60: + momentum_score = 40 + (abs(rsi - 50) / 10) * 20 # 40-60 range + elif rsi < 40: + momentum_score = max((rsi / 40) * 40, 0) + else: # rsi > 60 + momentum_score = min(60 + ((rsi - 60) / 40) * 40, 100) + + # Volume score: Volume ratio normalized + # <0.8x = dead (0-30), 0.8-1.5x = normal (30-70), >2.0x = climax (70-100) + vol_ratio = signal_data['volumeRatio'] + if vol_ratio < 0.8: + volume_score = (vol_ratio / 0.8) * 30 + elif vol_ratio < 1.5: + volume_score = 30 + ((vol_ratio - 0.8) / 0.7) * 40 + else: + volume_score = min(70 + ((vol_ratio - 1.5) / 0.5) * 30, 100) + + # Position score: Price position in range + # Extremes <10% or >90% = high score, mid-range = lower score + pos = signal_data['pricePosition'] + if pos < 10 or pos > 90: + position_score = 80 + min(abs(50 - pos), 20) + elif 30 <= pos <= 70: + position_score = 40 - abs(pos - 50) / 20 * 10 + else: + position_score = 50 + (20 - abs(pos - 50)) / 20 * 30 + + # Composite score: Weighted average (matches Layer 1 design) + composite_score = ( + trend_score * 0.35 + + volatility_score * 0.30 + + momentum_score * 0.15 + + volume_score * 0.10 + + position_score * 0.10 + ) + + # Classify regime type + if trend_score > 60 and volatility_score < 60: + regime_type = "TRENDING" + elif trend_score < 40 and volatility_score < 50: + regime_type = "CHOPPY" + elif momentum_score > 70 or momentum_score < 30: + regime_type = "REVERSAL" + elif volatility_score > 70 and volume_score > 70: + regime_type = "BREAKOUT" + else: + regime_type = "MIXED" + + return MarketRegime( + timestamp=signal_data['timestamp'], + volatility_score=volatility_score, + trend_score=trend_score, + momentum_score=momentum_score, + volume_score=volume_score, + position_score=position_score, + composite_score=composite_score, + regime_type=regime_type + ) + + def _percentile_score(self, value: float, distribution: pd.Series) -> float: + """Calculate percentile score (0-100) for value in distribution""" + if len(distribution) == 0: + return 50.0 + percentile = (distribution < value).sum() / len(distribution) + return percentile * 100 + + +class DynamicThresholdCalculator: + """Calculate adaptive threshold based on market regime""" + + def __init__(self): + # Static baselines (current system) + self.baseline_long = 90 + self.baseline_short = 80 + + # Adjustment ranges + self.max_regime_adjustment = 15 + self.min_threshold = 70 + self.max_threshold = 95 + + def calculate_threshold(self, direction: str, regime: MarketRegime) -> Tuple[float, str]: + """ + Calculate dynamic threshold for signal + + Returns: + (threshold, reasoning) + """ + baseline = self.baseline_long if direction == 'long' else self.baseline_short + + # Regime adjustment: Lower threshold in TRENDING, raise in CHOPPY + if regime.regime_type == "TRENDING": + regime_adj = -10 # Easier to enter trending markets + reason = f"Trending market (trend={regime.trend_score:.0f})" + elif regime.regime_type == "CHOPPY": + regime_adj = +10 # Harder to enter choppy markets + reason = f"Choppy market (trend={regime.trend_score:.0f})" + elif regime.regime_type == "REVERSAL": + regime_adj = +5 # Slightly harder (reversals risky) + reason = f"Reversal conditions (momentum={regime.momentum_score:.0f})" + elif regime.regime_type == "BREAKOUT": + regime_adj = -5 # Slightly easier (breakouts can run) + reason = f"Breakout conditions (vol={regime.volatility_score:.0f})" + else: + regime_adj = 0 + reason = "Mixed market conditions" + + # Apply adjustment and bounds + threshold = baseline + regime_adj + threshold = max(self.min_threshold, min(self.max_threshold, threshold)) + + reasoning = f"{reason}, threshold={threshold:.0f} (baseline={baseline}, adj={regime_adj:+.0f})" + + return threshold, reasoning + + +class DynamicThresholdBacktester: + """Main backtester class""" + + def __init__(self, db_config: Dict): + self.db_config = db_config + self.regime_detector = RegimeDetector(lookback_window=50) + self.threshold_calculator = DynamicThresholdCalculator() + + def load_historical_signals(self, days: int = 30) -> pd.DataFrame: + """Load signals from BlockedSignal table""" + conn = psycopg2.connect(**self.db_config) + + query = """ + SELECT + "createdAt" as timestamp, + symbol, + direction, + "signalQualityScore" as quality_score, + "blockReason" as block_reason, + "entryPrice" as entry_price, + atr, + adx, + rsi, + "volumeRatio" as volume_ratio, + "pricePosition" as price_position + FROM "BlockedSignal" + WHERE symbol = 'SOL-PERP' + AND "createdAt" >= NOW() - INTERVAL '%s days' + ORDER BY "createdAt" ASC + """ + + df = pd.read_sql_query(query, conn, params=(days,)) + conn.close() + + return df + + def load_executed_trades(self, days: int = 30) -> pd.DataFrame: + """Load executed trades from Trade table for outcome comparison""" + conn = psycopg2.connect(**self.db_config) + + query = """ + SELECT + "createdAt" as timestamp, + symbol, + direction, + "signalQualityScore" as quality_score, + "entryPrice" as entry_price, + "exitPrice" as exit_price, + "realizedPnL" as realized_pnl, + "exitReason" as exit_reason + FROM "Trade" + WHERE symbol = 'SOL-PERP' + AND "createdAt" >= NOW() - INTERVAL '%s days' + AND "exitReason" IS NOT NULL + ORDER BY "createdAt" ASC + """ + + df = pd.read_sql_query(query, conn, params=(days,)) + conn.close() + + return df + + def run_backtest(self, days: int = 30) -> pd.DataFrame: + """ + Run complete backtest comparing static vs dynamic thresholds + + Returns: + DataFrame with signal evaluations + """ + print(f"\n📊 Loading {days} days of historical signals...") + signals_df = self.load_historical_signals(days) + trades_df = self.load_executed_trades(days) + + print(f"✅ Loaded {len(signals_df)} signals, {len(trades_df)} executed trades") + + evaluations = [] + + for idx, row in signals_df.iterrows(): + # Get recent signals for regime calculation (lookback window) + recent_start_idx = max(0, idx - self.regime_detector.lookback_window) + recent_signals = signals_df.iloc[recent_start_idx:idx] + + # Calculate regime at this signal time + signal_data = { + 'timestamp': row['timestamp'], + 'atr': row['atr'], + 'adx': row['adx'], + 'rsi': row['rsi'], + 'volumeRatio': row['volume_ratio'], + 'pricePosition': row['price_position'] + } + + regime = self.regime_detector.calculate_regime(signal_data, recent_signals) + + # Calculate dynamic threshold + dynamic_threshold, reasoning = self.threshold_calculator.calculate_threshold( + row['direction'], regime + ) + + # Static threshold (current system) + static_threshold = 90 if row['direction'] == 'long' else 80 + + # Would it execute? + executes_static = row['quality_score'] >= static_threshold + executes_dynamic = row['quality_score'] >= dynamic_threshold + + # Try to match with executed trade for actual P&L + actual_pnl = None + actual_exit_reason = None + + matching_trades = trades_df[ + (trades_df['timestamp'] - row['timestamp']).abs() < pd.Timedelta(minutes=1) + ] + + if len(matching_trades) > 0: + trade = matching_trades.iloc[0] + actual_pnl = trade['realized_pnl'] + actual_exit_reason = trade['exit_reason'] + + evaluation = SignalEvaluation( + timestamp=row['timestamp'], + symbol=row['symbol'], + direction=row['direction'], + quality_score=row['quality_score'], + entry_price=row['entry_price'], + atr_percent=row['atr'], + adx=row['adx'], + rsi=row['rsi'], + volume_ratio=row['volume_ratio'], + price_position=row['price_position'], + regime=regime, + static_threshold=static_threshold, + dynamic_threshold=dynamic_threshold, + executes_static=executes_static, + executes_dynamic=executes_dynamic, + actual_pnl=actual_pnl, + actual_exit_reason=actual_exit_reason + ) + + evaluations.append(evaluation) + + # Progress indicator + if (idx + 1) % 1000 == 0: + print(f" Processed {idx + 1}/{len(signals_df)} signals...") + + print(f"✅ Completed regime analysis for all signals\n") + + return pd.DataFrame([vars(e) for e in evaluations]) + + def analyze_results(self, evaluations_df: pd.DataFrame): + """Analyze and compare static vs dynamic threshold performance""" + print("=" * 80) + print("DYNAMIC THRESHOLD BACKTEST RESULTS") + print("=" * 80) + + # Static threshold performance + static_signals = evaluations_df[evaluations_df['executes_static']] + static_with_outcome = static_signals[static_signals['actual_pnl'].notna()] + + print("\n📈 STATIC THRESHOLDS (Current System: LONG≥90, SHORT≥80)") + print(f" Total Signals Executed: {len(static_signals)}") + print(f" With Known Outcomes: {len(static_with_outcome)}") + + if len(static_with_outcome) > 0: + static_win_rate = (static_with_outcome['actual_pnl'] > 0).sum() / len(static_with_outcome) * 100 + static_total_pnl = static_with_outcome['actual_pnl'].sum() + static_avg_pnl = static_with_outcome['actual_pnl'].mean() + + print(f" Win Rate: {static_win_rate:.1f}%") + print(f" Total P&L: ${static_total_pnl:.2f}") + print(f" Average P&L: ${static_avg_pnl:.2f}") + + # Dynamic threshold performance + dynamic_signals = evaluations_df[evaluations_df['executes_dynamic']] + dynamic_with_outcome = dynamic_signals[dynamic_signals['actual_pnl'].notna()] + + print("\n🎯 DYNAMIC THRESHOLDS (Proposed: Regime-Adaptive)") + print(f" Total Signals Executed: {len(dynamic_signals)}") + print(f" With Known Outcomes: {len(dynamic_with_outcome)}") + + if len(dynamic_with_outcome) > 0: + dynamic_win_rate = (dynamic_with_outcome['actual_pnl'] > 0).sum() / len(dynamic_with_outcome) * 100 + dynamic_total_pnl = dynamic_with_outcome['actual_pnl'].sum() + dynamic_avg_pnl = dynamic_with_outcome['actual_pnl'].mean() + + print(f" Win Rate: {dynamic_win_rate:.1f}%") + print(f" Total P&L: ${dynamic_total_pnl:.2f}") + print(f" Average P&L: ${dynamic_avg_pnl:.2f}") + + # Improvement calculations + if len(static_with_outcome) > 0: + win_rate_delta = dynamic_win_rate - static_win_rate + pnl_delta = dynamic_total_pnl - static_total_pnl + + print("\n💡 IMPROVEMENT vs STATIC") + print(f" Win Rate: {win_rate_delta:+.1f}% ({dynamic_win_rate:.1f}% vs {static_win_rate:.1f}%)") + print(f" Total P&L: ${pnl_delta:+.2f} (${dynamic_total_pnl:.2f} vs ${static_total_pnl:.2f})") + print(f" Signal Efficiency: {len(dynamic_signals) - len(static_signals):+d} signals") + + # Regime distribution + print("\n📊 REGIME DISTRIBUTION") + regime_counts = evaluations_df['regime'].apply(lambda r: r.regime_type).value_counts() + for regime, count in regime_counts.items(): + pct = count / len(evaluations_df) * 100 + print(f" {regime}: {count} signals ({pct:.1f}%)") + + # Threshold adjustment analysis + print("\n⚙️ THRESHOLD ADJUSTMENTS") + avg_long_threshold = evaluations_df[evaluations_df['direction'] == 'long']['dynamic_threshold'].mean() + avg_short_threshold = evaluations_df[evaluations_df['direction'] == 'short']['dynamic_threshold'].mean() + + print(f" LONG: {avg_long_threshold:.1f} average (baseline 90)") + print(f" SHORT: {avg_short_threshold:.1f} average (baseline 80)") + + # Signals that would execute differently + different = evaluations_df[evaluations_df['executes_static'] != evaluations_df['executes_dynamic']] + + print(f"\n🔄 SIGNALS WITH DIFFERENT EXECUTION DECISION: {len(different)}") + print(f" Dynamic would execute but Static wouldn't: {len(different[~different['executes_static'] & different['executes_dynamic']])}") + print(f" Static would execute but Dynamic wouldn't: {len(different[different['executes_static'] & ~different['executes_dynamic']])}") + + print("\n" + "=" * 80) + + +def main(): + """Run the dynamic threshold backtest""" + + # Database connection config + db_config = { + 'host': 'localhost', + 'port': 55432, # Docker PostgreSQL runs on port 55432 + 'database': 'trading_bot_v4', + 'user': 'postgres', + 'password': 'postgres' # Update if needed + } + + # Run backtest + backtester = DynamicThresholdBacktester(db_config) + + print("\n" + "=" * 80) + print("DYNAMIC THRESHOLD BACKTESTING") + print("Testing adaptive thresholds on 30 days of historical signals") + print("=" * 80) + + # Run backtest on 30 days of data + evaluations_df = backtester.run_backtest(days=30) + + # Analyze and display results + backtester.analyze_results(evaluations_df) + + # Save detailed results + output_file = f"dynamic_threshold_backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + + # Flatten regime data for CSV + evaluations_csv = evaluations_df.copy() + evaluations_csv['regime_type'] = evaluations_csv['regime'].apply(lambda r: r.regime_type) + evaluations_csv['regime_composite_score'] = evaluations_csv['regime'].apply(lambda r: r.composite_score) + evaluations_csv['regime_trend_score'] = evaluations_csv['regime'].apply(lambda r: r.trend_score) + evaluations_csv['regime_volatility_score'] = evaluations_csv['regime'].apply(lambda r: r.volatility_score) + evaluations_csv = evaluations_csv.drop('regime', axis=1) + + evaluations_csv.to_csv(output_file, index=False) + print(f"\n💾 Detailed results saved to: {output_file}") + print(f" Total signals analyzed: {len(evaluations_csv)}") + + print("\n✅ Backtest complete!\n") + + +if __name__ == "__main__": + main()