- Created comprehensive exit strategy analysis from 30-day backtest - Key finding: Average loss -1.84 vs average win /bin/bash.76 (42 asymmetry) - Root cause: Position management not working, not entry quality - Dynamic thresholds tested: only +.21/month improvement (rejected) - Backtesting infrastructure: 487-line Python script with regime analysis - Database: PostgreSQL integration for 78 real trades Nov 23 - Dec 23 - Next steps: Fix exit strategy, not thresholds (exits are the problem)
487 lines
19 KiB
Python
487 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Dynamic Threshold Backtesting - Test adaptive thresholds on historical data
|
|
WITHOUT implementing in production.
|
|
|
|
This script:
|
|
1. Loads historical signals from database (BlockedSignal + Trade tables)
|
|
2. Simulates what market regime would have been at each signal
|
|
3. Calculates what dynamic threshold would have been
|
|
4. Compares outcomes: static vs dynamic thresholds
|
|
5. Outputs: Win rate, P&L, signal efficiency for both scenarios
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Tuple
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
|
|
@dataclass
|
|
class MarketRegime:
|
|
"""Market regime at signal time"""
|
|
timestamp: datetime
|
|
volatility_score: float # 0-100 (ATR% percentile)
|
|
trend_score: float # 0-100 (ADX normalized)
|
|
momentum_score: float # 0-100 (RSI deviation)
|
|
volume_score: float # 0-100 (volume ratio normalized)
|
|
position_score: float # 0-100 (price position)
|
|
composite_score: float # 0-100 (weighted average)
|
|
regime_type: str # TRENDING, CHOPPY, REVERSAL, BREAKOUT
|
|
|
|
|
|
@dataclass
|
|
class SignalEvaluation:
|
|
"""Signal with both static and dynamic threshold evaluation"""
|
|
timestamp: datetime
|
|
symbol: str
|
|
direction: str
|
|
quality_score: float
|
|
entry_price: float
|
|
|
|
# Market context
|
|
atr_percent: float
|
|
adx: float
|
|
rsi: float
|
|
volume_ratio: float
|
|
price_position: float
|
|
|
|
# Regime calculation
|
|
regime: MarketRegime
|
|
|
|
# Threshold comparison
|
|
static_threshold: float # Current system (90 LONG, 80 SHORT)
|
|
dynamic_threshold: float # Calculated adaptive threshold
|
|
|
|
# Execution decision
|
|
executes_static: bool # Would execute with static threshold
|
|
executes_dynamic: bool # Would execute with dynamic threshold
|
|
|
|
# Outcome (if known from Trade table)
|
|
actual_pnl: float = None
|
|
actual_exit_reason: str = None
|
|
|
|
|
|
class RegimeDetector:
|
|
"""Calculate market regime from signal metrics"""
|
|
|
|
def __init__(self, lookback_window: int = 50):
|
|
self.lookback_window = lookback_window
|
|
|
|
def calculate_regime(self, signal_data: Dict, recent_signals: pd.DataFrame) -> MarketRegime:
|
|
"""
|
|
Calculate market regime at signal time using recent signal history
|
|
|
|
Args:
|
|
signal_data: Current signal metrics
|
|
recent_signals: Last N signals for percentile calculations
|
|
"""
|
|
# Volatility score: ATR% percentile over recent signals
|
|
atr_values = recent_signals['atr'].dropna()
|
|
if len(atr_values) > 0:
|
|
volatility_score = self._percentile_score(signal_data['atr'], atr_values)
|
|
else:
|
|
volatility_score = 50.0
|
|
|
|
# Trend score: ADX normalized to 0-100
|
|
# ADX <15 = weak (0-30), ADX 15-25 = moderate (30-60), ADX >25 = strong (60-100)
|
|
adx = signal_data['adx']
|
|
if adx < 15:
|
|
trend_score = (adx / 15) * 30
|
|
elif adx < 25:
|
|
trend_score = 30 + ((adx - 15) / 10) * 30
|
|
else:
|
|
trend_score = 60 + min(((adx - 25) / 25) * 40, 40)
|
|
|
|
# Momentum score: RSI deviation from 50
|
|
# RSI 40-60 = neutral (50), RSI <30 or >70 = extreme (0 or 100)
|
|
rsi = signal_data['rsi']
|
|
if 40 <= rsi <= 60:
|
|
momentum_score = 40 + (abs(rsi - 50) / 10) * 20 # 40-60 range
|
|
elif rsi < 40:
|
|
momentum_score = max((rsi / 40) * 40, 0)
|
|
else: # rsi > 60
|
|
momentum_score = min(60 + ((rsi - 60) / 40) * 40, 100)
|
|
|
|
# Volume score: Volume ratio normalized
|
|
# <0.8x = dead (0-30), 0.8-1.5x = normal (30-70), >2.0x = climax (70-100)
|
|
vol_ratio = signal_data['volumeRatio']
|
|
if vol_ratio < 0.8:
|
|
volume_score = (vol_ratio / 0.8) * 30
|
|
elif vol_ratio < 1.5:
|
|
volume_score = 30 + ((vol_ratio - 0.8) / 0.7) * 40
|
|
else:
|
|
volume_score = min(70 + ((vol_ratio - 1.5) / 0.5) * 30, 100)
|
|
|
|
# Position score: Price position in range
|
|
# Extremes <10% or >90% = high score, mid-range = lower score
|
|
pos = signal_data['pricePosition']
|
|
if pos < 10 or pos > 90:
|
|
position_score = 80 + min(abs(50 - pos), 20)
|
|
elif 30 <= pos <= 70:
|
|
position_score = 40 - abs(pos - 50) / 20 * 10
|
|
else:
|
|
position_score = 50 + (20 - abs(pos - 50)) / 20 * 30
|
|
|
|
# Composite score: Weighted average (matches Layer 1 design)
|
|
composite_score = (
|
|
trend_score * 0.35 +
|
|
volatility_score * 0.30 +
|
|
momentum_score * 0.15 +
|
|
volume_score * 0.10 +
|
|
position_score * 0.10
|
|
)
|
|
|
|
# Classify regime type
|
|
if trend_score > 60 and volatility_score < 60:
|
|
regime_type = "TRENDING"
|
|
elif trend_score < 40 and volatility_score < 50:
|
|
regime_type = "CHOPPY"
|
|
elif momentum_score > 70 or momentum_score < 30:
|
|
regime_type = "REVERSAL"
|
|
elif volatility_score > 70 and volume_score > 70:
|
|
regime_type = "BREAKOUT"
|
|
else:
|
|
regime_type = "MIXED"
|
|
|
|
return MarketRegime(
|
|
timestamp=signal_data['timestamp'],
|
|
volatility_score=volatility_score,
|
|
trend_score=trend_score,
|
|
momentum_score=momentum_score,
|
|
volume_score=volume_score,
|
|
position_score=position_score,
|
|
composite_score=composite_score,
|
|
regime_type=regime_type
|
|
)
|
|
|
|
def _percentile_score(self, value: float, distribution: pd.Series) -> float:
|
|
"""Calculate percentile score (0-100) for value in distribution"""
|
|
if len(distribution) == 0:
|
|
return 50.0
|
|
percentile = (distribution < value).sum() / len(distribution)
|
|
return percentile * 100
|
|
|
|
|
|
class DynamicThresholdCalculator:
|
|
"""Calculate adaptive threshold based on market regime"""
|
|
|
|
def __init__(self):
|
|
# Static baselines (current system)
|
|
self.baseline_long = 90
|
|
self.baseline_short = 80
|
|
|
|
# Adjustment ranges
|
|
self.max_regime_adjustment = 15
|
|
self.min_threshold = 70
|
|
self.max_threshold = 95
|
|
|
|
def calculate_threshold(self, direction: str, regime: MarketRegime) -> Tuple[float, str]:
|
|
"""
|
|
Calculate dynamic threshold for signal
|
|
|
|
Returns:
|
|
(threshold, reasoning)
|
|
"""
|
|
baseline = self.baseline_long if direction == 'long' else self.baseline_short
|
|
|
|
# Regime adjustment: Lower threshold in TRENDING, raise in CHOPPY
|
|
if regime.regime_type == "TRENDING":
|
|
regime_adj = -10 # Easier to enter trending markets
|
|
reason = f"Trending market (trend={regime.trend_score:.0f})"
|
|
elif regime.regime_type == "CHOPPY":
|
|
regime_adj = +10 # Harder to enter choppy markets
|
|
reason = f"Choppy market (trend={regime.trend_score:.0f})"
|
|
elif regime.regime_type == "REVERSAL":
|
|
regime_adj = +5 # Slightly harder (reversals risky)
|
|
reason = f"Reversal conditions (momentum={regime.momentum_score:.0f})"
|
|
elif regime.regime_type == "BREAKOUT":
|
|
regime_adj = -5 # Slightly easier (breakouts can run)
|
|
reason = f"Breakout conditions (vol={regime.volatility_score:.0f})"
|
|
else:
|
|
regime_adj = 0
|
|
reason = "Mixed market conditions"
|
|
|
|
# Apply adjustment and bounds
|
|
threshold = baseline + regime_adj
|
|
threshold = max(self.min_threshold, min(self.max_threshold, threshold))
|
|
|
|
reasoning = f"{reason}, threshold={threshold:.0f} (baseline={baseline}, adj={regime_adj:+.0f})"
|
|
|
|
return threshold, reasoning
|
|
|
|
|
|
class DynamicThresholdBacktester:
|
|
"""Main backtester class"""
|
|
|
|
def __init__(self, db_config: Dict):
|
|
self.db_config = db_config
|
|
self.regime_detector = RegimeDetector(lookback_window=50)
|
|
self.threshold_calculator = DynamicThresholdCalculator()
|
|
|
|
def load_historical_signals(self, days: int = 30) -> pd.DataFrame:
|
|
"""Load signals from BlockedSignal table"""
|
|
conn = psycopg2.connect(**self.db_config)
|
|
|
|
query = """
|
|
SELECT
|
|
"createdAt" as timestamp,
|
|
symbol,
|
|
direction,
|
|
"signalQualityScore" as quality_score,
|
|
"blockReason" as block_reason,
|
|
"entryPrice" as entry_price,
|
|
atr,
|
|
adx,
|
|
rsi,
|
|
"volumeRatio" as volume_ratio,
|
|
"pricePosition" as price_position
|
|
FROM "BlockedSignal"
|
|
WHERE symbol = 'SOL-PERP'
|
|
AND "createdAt" >= NOW() - INTERVAL '%s days'
|
|
ORDER BY "createdAt" ASC
|
|
"""
|
|
|
|
df = pd.read_sql_query(query, conn, params=(days,))
|
|
conn.close()
|
|
|
|
return df
|
|
|
|
def load_executed_trades(self, days: int = 30) -> pd.DataFrame:
|
|
"""Load executed trades from Trade table for outcome comparison"""
|
|
conn = psycopg2.connect(**self.db_config)
|
|
|
|
query = """
|
|
SELECT
|
|
"createdAt" as timestamp,
|
|
symbol,
|
|
direction,
|
|
"signalQualityScore" as quality_score,
|
|
"entryPrice" as entry_price,
|
|
"exitPrice" as exit_price,
|
|
"realizedPnL" as realized_pnl,
|
|
"exitReason" as exit_reason
|
|
FROM "Trade"
|
|
WHERE symbol = 'SOL-PERP'
|
|
AND "createdAt" >= NOW() - INTERVAL '%s days'
|
|
AND "exitReason" IS NOT NULL
|
|
ORDER BY "createdAt" ASC
|
|
"""
|
|
|
|
df = pd.read_sql_query(query, conn, params=(days,))
|
|
conn.close()
|
|
|
|
return df
|
|
|
|
def run_backtest(self, days: int = 30) -> pd.DataFrame:
|
|
"""
|
|
Run complete backtest comparing static vs dynamic thresholds
|
|
|
|
Returns:
|
|
DataFrame with signal evaluations
|
|
"""
|
|
print(f"\n📊 Loading {days} days of historical signals...")
|
|
signals_df = self.load_historical_signals(days)
|
|
trades_df = self.load_executed_trades(days)
|
|
|
|
print(f"✅ Loaded {len(signals_df)} signals, {len(trades_df)} executed trades")
|
|
|
|
evaluations = []
|
|
|
|
for idx, row in signals_df.iterrows():
|
|
# Get recent signals for regime calculation (lookback window)
|
|
recent_start_idx = max(0, idx - self.regime_detector.lookback_window)
|
|
recent_signals = signals_df.iloc[recent_start_idx:idx]
|
|
|
|
# Calculate regime at this signal time
|
|
signal_data = {
|
|
'timestamp': row['timestamp'],
|
|
'atr': row['atr'],
|
|
'adx': row['adx'],
|
|
'rsi': row['rsi'],
|
|
'volumeRatio': row['volume_ratio'],
|
|
'pricePosition': row['price_position']
|
|
}
|
|
|
|
regime = self.regime_detector.calculate_regime(signal_data, recent_signals)
|
|
|
|
# Calculate dynamic threshold
|
|
dynamic_threshold, reasoning = self.threshold_calculator.calculate_threshold(
|
|
row['direction'], regime
|
|
)
|
|
|
|
# Static threshold (current system)
|
|
static_threshold = 90 if row['direction'] == 'long' else 80
|
|
|
|
# Would it execute?
|
|
executes_static = row['quality_score'] >= static_threshold
|
|
executes_dynamic = row['quality_score'] >= dynamic_threshold
|
|
|
|
# Try to match with executed trade for actual P&L
|
|
actual_pnl = None
|
|
actual_exit_reason = None
|
|
|
|
matching_trades = trades_df[
|
|
(trades_df['timestamp'] - row['timestamp']).abs() < pd.Timedelta(minutes=1)
|
|
]
|
|
|
|
if len(matching_trades) > 0:
|
|
trade = matching_trades.iloc[0]
|
|
actual_pnl = trade['realized_pnl']
|
|
actual_exit_reason = trade['exit_reason']
|
|
|
|
evaluation = SignalEvaluation(
|
|
timestamp=row['timestamp'],
|
|
symbol=row['symbol'],
|
|
direction=row['direction'],
|
|
quality_score=row['quality_score'],
|
|
entry_price=row['entry_price'],
|
|
atr_percent=row['atr'],
|
|
adx=row['adx'],
|
|
rsi=row['rsi'],
|
|
volume_ratio=row['volume_ratio'],
|
|
price_position=row['price_position'],
|
|
regime=regime,
|
|
static_threshold=static_threshold,
|
|
dynamic_threshold=dynamic_threshold,
|
|
executes_static=executes_static,
|
|
executes_dynamic=executes_dynamic,
|
|
actual_pnl=actual_pnl,
|
|
actual_exit_reason=actual_exit_reason
|
|
)
|
|
|
|
evaluations.append(evaluation)
|
|
|
|
# Progress indicator
|
|
if (idx + 1) % 1000 == 0:
|
|
print(f" Processed {idx + 1}/{len(signals_df)} signals...")
|
|
|
|
print(f"✅ Completed regime analysis for all signals\n")
|
|
|
|
return pd.DataFrame([vars(e) for e in evaluations])
|
|
|
|
def analyze_results(self, evaluations_df: pd.DataFrame):
|
|
"""Analyze and compare static vs dynamic threshold performance"""
|
|
print("=" * 80)
|
|
print("DYNAMIC THRESHOLD BACKTEST RESULTS")
|
|
print("=" * 80)
|
|
|
|
# Static threshold performance
|
|
static_signals = evaluations_df[evaluations_df['executes_static']]
|
|
static_with_outcome = static_signals[static_signals['actual_pnl'].notna()]
|
|
|
|
print("\n📈 STATIC THRESHOLDS (Current System: LONG≥90, SHORT≥80)")
|
|
print(f" Total Signals Executed: {len(static_signals)}")
|
|
print(f" With Known Outcomes: {len(static_with_outcome)}")
|
|
|
|
if len(static_with_outcome) > 0:
|
|
static_win_rate = (static_with_outcome['actual_pnl'] > 0).sum() / len(static_with_outcome) * 100
|
|
static_total_pnl = static_with_outcome['actual_pnl'].sum()
|
|
static_avg_pnl = static_with_outcome['actual_pnl'].mean()
|
|
|
|
print(f" Win Rate: {static_win_rate:.1f}%")
|
|
print(f" Total P&L: ${static_total_pnl:.2f}")
|
|
print(f" Average P&L: ${static_avg_pnl:.2f}")
|
|
|
|
# Dynamic threshold performance
|
|
dynamic_signals = evaluations_df[evaluations_df['executes_dynamic']]
|
|
dynamic_with_outcome = dynamic_signals[dynamic_signals['actual_pnl'].notna()]
|
|
|
|
print("\n🎯 DYNAMIC THRESHOLDS (Proposed: Regime-Adaptive)")
|
|
print(f" Total Signals Executed: {len(dynamic_signals)}")
|
|
print(f" With Known Outcomes: {len(dynamic_with_outcome)}")
|
|
|
|
if len(dynamic_with_outcome) > 0:
|
|
dynamic_win_rate = (dynamic_with_outcome['actual_pnl'] > 0).sum() / len(dynamic_with_outcome) * 100
|
|
dynamic_total_pnl = dynamic_with_outcome['actual_pnl'].sum()
|
|
dynamic_avg_pnl = dynamic_with_outcome['actual_pnl'].mean()
|
|
|
|
print(f" Win Rate: {dynamic_win_rate:.1f}%")
|
|
print(f" Total P&L: ${dynamic_total_pnl:.2f}")
|
|
print(f" Average P&L: ${dynamic_avg_pnl:.2f}")
|
|
|
|
# Improvement calculations
|
|
if len(static_with_outcome) > 0:
|
|
win_rate_delta = dynamic_win_rate - static_win_rate
|
|
pnl_delta = dynamic_total_pnl - static_total_pnl
|
|
|
|
print("\n💡 IMPROVEMENT vs STATIC")
|
|
print(f" Win Rate: {win_rate_delta:+.1f}% ({dynamic_win_rate:.1f}% vs {static_win_rate:.1f}%)")
|
|
print(f" Total P&L: ${pnl_delta:+.2f} (${dynamic_total_pnl:.2f} vs ${static_total_pnl:.2f})")
|
|
print(f" Signal Efficiency: {len(dynamic_signals) - len(static_signals):+d} signals")
|
|
|
|
# Regime distribution
|
|
print("\n📊 REGIME DISTRIBUTION")
|
|
regime_counts = evaluations_df['regime'].apply(lambda r: r.regime_type).value_counts()
|
|
for regime, count in regime_counts.items():
|
|
pct = count / len(evaluations_df) * 100
|
|
print(f" {regime}: {count} signals ({pct:.1f}%)")
|
|
|
|
# Threshold adjustment analysis
|
|
print("\n⚙️ THRESHOLD ADJUSTMENTS")
|
|
avg_long_threshold = evaluations_df[evaluations_df['direction'] == 'long']['dynamic_threshold'].mean()
|
|
avg_short_threshold = evaluations_df[evaluations_df['direction'] == 'short']['dynamic_threshold'].mean()
|
|
|
|
print(f" LONG: {avg_long_threshold:.1f} average (baseline 90)")
|
|
print(f" SHORT: {avg_short_threshold:.1f} average (baseline 80)")
|
|
|
|
# Signals that would execute differently
|
|
different = evaluations_df[evaluations_df['executes_static'] != evaluations_df['executes_dynamic']]
|
|
|
|
print(f"\n🔄 SIGNALS WITH DIFFERENT EXECUTION DECISION: {len(different)}")
|
|
print(f" Dynamic would execute but Static wouldn't: {len(different[~different['executes_static'] & different['executes_dynamic']])}")
|
|
print(f" Static would execute but Dynamic wouldn't: {len(different[different['executes_static'] & ~different['executes_dynamic']])}")
|
|
|
|
print("\n" + "=" * 80)
|
|
|
|
|
|
def main():
|
|
"""Run the dynamic threshold backtest"""
|
|
|
|
# Database connection config
|
|
db_config = {
|
|
'host': 'localhost',
|
|
'port': 55432, # Docker PostgreSQL runs on port 55432
|
|
'database': 'trading_bot_v4',
|
|
'user': 'postgres',
|
|
'password': 'postgres' # Update if needed
|
|
}
|
|
|
|
# Run backtest
|
|
backtester = DynamicThresholdBacktester(db_config)
|
|
|
|
print("\n" + "=" * 80)
|
|
print("DYNAMIC THRESHOLD BACKTESTING")
|
|
print("Testing adaptive thresholds on 30 days of historical signals")
|
|
print("=" * 80)
|
|
|
|
# Run backtest on 30 days of data
|
|
evaluations_df = backtester.run_backtest(days=30)
|
|
|
|
# Analyze and display results
|
|
backtester.analyze_results(evaluations_df)
|
|
|
|
# Save detailed results
|
|
output_file = f"dynamic_threshold_backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
|
|
# Flatten regime data for CSV
|
|
evaluations_csv = evaluations_df.copy()
|
|
evaluations_csv['regime_type'] = evaluations_csv['regime'].apply(lambda r: r.regime_type)
|
|
evaluations_csv['regime_composite_score'] = evaluations_csv['regime'].apply(lambda r: r.composite_score)
|
|
evaluations_csv['regime_trend_score'] = evaluations_csv['regime'].apply(lambda r: r.trend_score)
|
|
evaluations_csv['regime_volatility_score'] = evaluations_csv['regime'].apply(lambda r: r.volatility_score)
|
|
evaluations_csv = evaluations_csv.drop('regime', axis=1)
|
|
|
|
evaluations_csv.to_csv(output_file, index=False)
|
|
print(f"\n💾 Detailed results saved to: {output_file}")
|
|
print(f" Total signals analyzed: {len(evaluations_csv)}")
|
|
|
|
print("\n✅ Backtest complete!\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|