feat: Add v11 test sweep system (256 combinations) with office hours scheduling

Co-authored-by: mindesbunister <32161838+mindesbunister@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-06 19:15:54 +00:00
parent 67cc7598f2
commit eb0d41aed5
7 changed files with 1113 additions and 0 deletions

298
cluster/v11_test_worker.py Executable file
View File

@@ -0,0 +1,298 @@
#!/usr/bin/env python3
"""
V11 Test Parameter Sweep Worker
Processes chunks of v11 test parameter configurations (256 combinations total).
Uses 27 cores (85% CPU) for multiprocessing.
Test parameter grid (2 values each = 2^8 = 256 combinations):
- flip_threshold: 0.5, 0.6
- adx_min: 18, 21
- long_pos_max: 75, 80
- short_pos_min: 20, 25
- vol_min: 0.8, 1.0
- entry_buffer_atr: 0.15, 0.20
- rsi_long_min: 35, 40
- rsi_short_max: 65, 70
"""
import sys
import csv
import pandas as pd
from pathlib import Path
from typing import Dict, List, Any
from multiprocessing import Pool
import functools
import itertools
# Add backtester to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from backtester.v11_moneyline_all_filters import (
money_line_v11_signals,
MoneyLineV11Inputs
)
from backtester.simulator import simulate_money_line
# CPU limit: 85% of 32 threads = 27 cores
MAX_WORKERS = 27
# Test parameter grid (256 combinations)
PARAMETER_GRID = {
'flip_threshold': [0.5, 0.6],
'adx_min': [18, 21],
'long_pos_max': [75, 80],
'short_pos_min': [20, 25],
'vol_min': [0.8, 1.0],
'entry_buffer_atr': [0.15, 0.20],
'rsi_long_min': [35, 40],
'rsi_short_max': [65, 70],
}
def load_market_data(csv_file: str) -> pd.DataFrame:
"""Load OHLCV data from CSV"""
df = pd.read_csv(csv_file)
# Ensure required columns exist
required = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
for col in required:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
# Convert timestamp if needed
if df['timestamp'].dtype == 'object':
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
print(f"✓ Loaded {len(df):,} bars from {csv_file}")
return df
def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Run backtest for single v11 test parameter configuration
Returns dict with:
- params: original config dict
- pnl: total P&L
- trades: number of trades
- win_rate: % winners
- profit_factor: wins/losses ratio
- max_drawdown: max drawdown $
"""
try:
# Create v11 inputs
inputs = MoneyLineV11Inputs(
flip_threshold=config['flip_threshold'],
adx_min=config['adx_min'],
long_pos_max=config['long_pos_max'],
short_pos_min=config['short_pos_min'],
vol_min=config['vol_min'],
entry_buffer_atr=config['entry_buffer_atr'],
rsi_long_min=config['rsi_long_min'],
rsi_short_max=config['rsi_short_max'],
)
# Generate signals
signals = money_line_v11_signals(df, inputs)
if not signals:
return {
'params': config,
'pnl': 0.0,
'trades': 0,
'win_rate': 0.0,
'profit_factor': 0.0,
'max_drawdown': 0.0,
}
# Simple backtesting: track equity curve
equity = 1000.0 # Starting capital
peak_equity = equity
max_drawdown = 0.0
wins = 0
losses = 0
win_pnl = 0.0
loss_pnl = 0.0
for signal in signals:
# Simple trade simulation
# TP1 at +0.86%, SL at -1.29% (ATR-based defaults)
entry = signal.entry_price
# Look ahead in data to see if TP or SL hit
signal_idx = df.index.get_loc(signal.timestamp)
# Look ahead up to 100 bars
max_bars = min(100, len(df) - signal_idx - 1)
if max_bars <= 0:
continue
future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars]
if signal.direction == "long":
tp_price = entry * 1.0086 # +0.86%
sl_price = entry * 0.9871 # -1.29%
# Check if TP or SL hit
hit_tp = (future_data['high'] >= tp_price).any()
hit_sl = (future_data['low'] <= sl_price).any()
if hit_tp:
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
equity += pnl
wins += 1
win_pnl += pnl
elif hit_sl:
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
equity += pnl
losses += 1
loss_pnl += abs(pnl)
else: # short
tp_price = entry * 0.9914 # -0.86%
sl_price = entry * 1.0129 # +1.29%
# Check if TP or SL hit
hit_tp = (future_data['low'] <= tp_price).any()
hit_sl = (future_data['high'] >= sl_price).any()
if hit_tp:
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
equity += pnl
wins += 1
win_pnl += pnl
elif hit_sl:
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
equity += pnl
losses += 1
loss_pnl += abs(pnl)
# Track drawdown
peak_equity = max(peak_equity, equity)
current_drawdown = peak_equity - equity
max_drawdown = max(max_drawdown, current_drawdown)
total_trades = wins + losses
win_rate = wins / total_trades if total_trades > 0 else 0.0
profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0)
total_pnl = equity - 1000.0
return {
'params': config,
'pnl': round(total_pnl, 2),
'trades': total_trades,
'win_rate': round(win_rate * 100, 1),
'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0,
'max_drawdown': round(max_drawdown, 2),
}
except Exception as e:
print(f"✗ Error backtesting config: {e}")
return {
'params': config,
'pnl': 0.0,
'trades': 0,
'win_rate': 0.0,
'profit_factor': 0.0,
'max_drawdown': 0.0,
}
def generate_parameter_combinations() -> List[Dict[str, Any]]:
"""Generate all 256 parameter combinations"""
keys = PARAMETER_GRID.keys()
values = PARAMETER_GRID.values()
combinations = []
for combo in itertools.product(*values):
config = dict(zip(keys, combo))
combinations.append(config)
return combinations
def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int):
"""Process a chunk of parameter combinations"""
print(f"\n{'='*60}")
print(f"V11 Test Worker - {chunk_id}")
print(f"Processing combinations {start_idx} to {end_idx-1}")
print(f"{'='*60}\n")
# Load market data
df = load_market_data(data_file)
# Generate all combinations
all_combos = generate_parameter_combinations()
print(f"✓ Generated {len(all_combos)} total combinations")
# Get this chunk's combinations
chunk_combos = all_combos[start_idx:end_idx]
print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n")
# Backtest with multiprocessing
print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n")
with Pool(processes=MAX_WORKERS) as pool:
backtest_func = functools.partial(backtest_config, df)
results = pool.map(backtest_func, chunk_combos)
print(f"\n✓ Completed {len(results)} backtests")
# Write results to CSV
output_dir = Path('v11_test_results')
output_dir.mkdir(exist_ok=True)
csv_file = output_dir / f"{chunk_id}_results.csv"
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min',
'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max',
'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades'
])
# Data rows
for result in results:
params = result['params']
writer.writerow([
params['flip_threshold'],
params['adx_min'],
params['long_pos_max'],
params['short_pos_min'],
params['vol_min'],
params['entry_buffer_atr'],
params['rsi_long_min'],
params['rsi_short_max'],
result['pnl'],
result['win_rate'],
result['profit_factor'],
result['max_drawdown'],
result['trades'],
])
print(f"✓ Results saved to {csv_file}")
# Show top 5 results
sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True)
print(f"\n🏆 Top 5 Results:")
for i, r in enumerate(sorted_results[:5], 1):
print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%")
if __name__ == '__main__':
if len(sys.argv) != 4:
print("Usage: python v11_test_worker.py <data_file> <chunk_id> <start_idx>")
sys.exit(1)
data_file = sys.argv[1]
chunk_id = sys.argv[2]
start_idx = int(sys.argv[3])
# Calculate end index (128 combos per chunk)
end_idx = start_idx + 128
process_chunk(data_file, chunk_id, start_idx, end_idx)