THREE critical bugs in cluster/v11_test_worker.py: 1. Missing use_quality_filters parameter when creating MoneyLineV11Inputs - Parameter defaults to True but wasn't being passed explicitly - Fix: Added use_quality_filters=True to inputs creation 2. Missing fixed RSI parameters (rsi_long_max, rsi_short_min) - Worker only passed rsi_long_min and rsi_short_max (sweep params) - Missing rsi_long_max=70 and rsi_short_min=30 (fixed params) - Fix: Added both fixed parameters to inputs creation 3. Import path mismatch - worker imported OLD version - Worker added cluster/ to sys.path, imported from parent directory - Old v11_moneyline_all_filters.py (21:40) missing use_quality_filters - Fixed v11_moneyline_all_filters.py was in backtester/ subdirectory - Fix: Deployed corrected file to /home/comprehensive_sweep/ Result: 0 signals → 1,096-1,186 signals per config ✓ Verified: Local test (314 signals), EPYC dataset test (1,186 signals), Worker log now shows signal variety across 27 concurrent configs. Progressive sweep now running successfully on EPYC cluster.
332 lines
12 KiB
Python
Executable File
332 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
V11 Test Parameter Sweep Worker
|
||
|
||
Processes chunks of v11 test parameter configurations (512 combinations total).
|
||
Uses 27 cores (85% CPU) for multiprocessing.
|
||
|
||
PROGRESSIVE SWEEP - Stage 1: Ultra-Permissive (start from 0 filters)
|
||
Goal: Find which parameter values allow signals through.
|
||
|
||
Test parameter grid (2×4×2×2×2×2×2×2 = 512 combinations):
|
||
- flip_threshold: 0.4, 0.5
|
||
- adx_min: 0, 5, 10, 15 (START FROM ZERO - filter disabled at 0)
|
||
- long_pos_max: 95, 100 (very loose)
|
||
- short_pos_min: 0, 5 (START FROM ZERO - filter disabled at 0)
|
||
- vol_min: 0.0, 0.5 (START FROM ZERO - filter disabled at 0)
|
||
- entry_buffer_atr: 0.0, 0.10 (START FROM ZERO - filter disabled at 0)
|
||
- rsi_long_min: 25, 30 (permissive)
|
||
- rsi_short_max: 75, 80 (permissive)
|
||
|
||
Expected outcomes:
|
||
- adx_min=0 configs: 150-300 signals (almost no filtering)
|
||
- adx_min=15 configs: 10-40 signals (strict filtering)
|
||
- If all still 0 → base indicator broken, not the filters
|
||
"""
|
||
|
||
import sys
|
||
import csv
|
||
import pandas as pd
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any
|
||
from multiprocessing import Pool
|
||
import functools
|
||
import itertools
|
||
|
||
# Add current directory to path for v11_moneyline_all_filters import
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
|
||
from v11_moneyline_all_filters import (
|
||
money_line_v11_signals,
|
||
MoneyLineV11Inputs
|
||
)
|
||
from backtester.simulator import simulate_money_line
|
||
|
||
# CPU limit: 85% of 32 threads = 27 cores
|
||
MAX_WORKERS = 27
|
||
|
||
# Global data file path (set by init_worker)
|
||
_DATA_FILE = None
|
||
|
||
def init_worker(data_file):
|
||
"""Initialize worker process with data file path"""
|
||
global _DATA_FILE
|
||
_DATA_FILE = data_file
|
||
|
||
# PROGRESSIVE Test parameter grid (512 combinations)
|
||
# Stage 1: Ultra-permissive - Start from 0 (filters disabled) to find baseline
|
||
# Strategy: "Go upwards from 0 until you find something"
|
||
PARAMETER_GRID = {
|
||
'flip_threshold': [0.4, 0.5], # 2 values - range: loose to normal
|
||
'adx_min': [0, 5, 10, 15], # 4 values - START FROM 0 (no filter)
|
||
'long_pos_max': [95, 100], # 2 values - very permissive
|
||
'short_pos_min': [0, 5], # 2 values - START FROM 0 (no filter)
|
||
'vol_min': [0.0, 0.5], # 2 values - START FROM 0 (no filter)
|
||
'entry_buffer_atr': [0.0, 0.10], # 2 values - START FROM 0 (no filter)
|
||
'rsi_long_min': [25, 30], # 2 values - permissive
|
||
'rsi_short_max': [75, 80], # 2 values - permissive
|
||
}
|
||
# Total: 2×4×2×2×2×2×2×2 = 512 combos
|
||
# Expected: adx_min=0 configs will generate 150-300 signals (proves v11 logic works)
|
||
# If all still 0 signals with adx_min=0 → base indicator broken, not the filters
|
||
|
||
|
||
def load_market_data(csv_file: str) -> pd.DataFrame:
|
||
"""Load OHLCV data from CSV"""
|
||
df = pd.read_csv(csv_file)
|
||
|
||
# Ensure required columns exist
|
||
required = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
|
||
for col in required:
|
||
if col not in df.columns:
|
||
raise ValueError(f"Missing required column: {col}")
|
||
|
||
# Convert timestamp if needed
|
||
if df['timestamp'].dtype == 'object':
|
||
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
||
|
||
df = df.set_index('timestamp')
|
||
print(f"✓ Loaded {len(df):,} bars from {csv_file}")
|
||
return df
|
||
|
||
|
||
def backtest_config(config: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
Run backtest for single v11 test parameter configuration
|
||
|
||
Loads data from global _DATA_FILE path on first call.
|
||
|
||
Returns dict with:
|
||
- params: original config dict
|
||
- pnl: total P&L
|
||
- trades: number of trades
|
||
- win_rate: % winners
|
||
- profit_factor: wins/losses ratio
|
||
- max_drawdown: max drawdown $
|
||
"""
|
||
# Load data (cached per worker process)
|
||
global _DATA_FILE
|
||
df = pd.read_csv(_DATA_FILE)
|
||
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
||
df = df.set_index('timestamp')
|
||
|
||
try:
|
||
# Create v11 inputs
|
||
inputs = MoneyLineV11Inputs(
|
||
use_quality_filters=True, # 🔧 FIX: Enable filters for progressive sweep
|
||
flip_threshold=config['flip_threshold'],
|
||
adx_min=config['adx_min'],
|
||
long_pos_max=config['long_pos_max'],
|
||
short_pos_min=config['short_pos_min'],
|
||
vol_min=config['vol_min'],
|
||
entry_buffer_atr=config['entry_buffer_atr'],
|
||
rsi_long_min=config['rsi_long_min'],
|
||
rsi_long_max=70, # 🔧 FIX: Add missing fixed parameter
|
||
rsi_short_min=30, # 🔧 FIX: Add missing fixed parameter
|
||
rsi_short_max=config['rsi_short_max'],
|
||
)
|
||
|
||
print(f" Generating signals...", flush=True)
|
||
# Generate signals
|
||
signals = money_line_v11_signals(df, inputs)
|
||
print(f" Got {len(signals)} signals, simulating...", flush=True)
|
||
|
||
if not signals:
|
||
return {
|
||
'params': config,
|
||
'pnl': 0.0,
|
||
'trades': 0,
|
||
'win_rate': 0.0,
|
||
'profit_factor': 0.0,
|
||
'max_drawdown': 0.0,
|
||
}
|
||
|
||
# Simple backtesting: track equity curve
|
||
equity = 1000.0 # Starting capital
|
||
peak_equity = equity
|
||
max_drawdown = 0.0
|
||
wins = 0
|
||
losses = 0
|
||
win_pnl = 0.0
|
||
loss_pnl = 0.0
|
||
|
||
for signal in signals:
|
||
# Simple trade simulation
|
||
# TP1 at +0.86%, SL at -1.29% (ATR-based defaults)
|
||
entry = signal.entry_price
|
||
|
||
# Look ahead in data to see if TP or SL hit
|
||
signal_idx = df.index.get_loc(signal.timestamp)
|
||
|
||
# Look ahead up to 100 bars
|
||
max_bars = min(100, len(df) - signal_idx - 1)
|
||
if max_bars <= 0:
|
||
continue
|
||
|
||
future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars]
|
||
|
||
if signal.direction == "long":
|
||
tp_price = entry * 1.0086 # +0.86%
|
||
sl_price = entry * 0.9871 # -1.29%
|
||
|
||
# Check if TP or SL hit
|
||
hit_tp = (future_data['high'] >= tp_price).any()
|
||
hit_sl = (future_data['low'] <= sl_price).any()
|
||
|
||
if hit_tp:
|
||
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
|
||
equity += pnl
|
||
wins += 1
|
||
win_pnl += pnl
|
||
elif hit_sl:
|
||
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
|
||
equity += pnl
|
||
losses += 1
|
||
loss_pnl += abs(pnl)
|
||
else: # short
|
||
tp_price = entry * 0.9914 # -0.86%
|
||
sl_price = entry * 1.0129 # +1.29%
|
||
|
||
# Check if TP or SL hit
|
||
hit_tp = (future_data['low'] <= tp_price).any()
|
||
hit_sl = (future_data['high'] >= sl_price).any()
|
||
|
||
if hit_tp:
|
||
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
|
||
equity += pnl
|
||
wins += 1
|
||
win_pnl += pnl
|
||
elif hit_sl:
|
||
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
|
||
equity += pnl
|
||
losses += 1
|
||
loss_pnl += abs(pnl)
|
||
|
||
# Track drawdown
|
||
peak_equity = max(peak_equity, equity)
|
||
current_drawdown = peak_equity - equity
|
||
max_drawdown = max(max_drawdown, current_drawdown)
|
||
|
||
total_trades = wins + losses
|
||
win_rate = wins / total_trades if total_trades > 0 else 0.0
|
||
profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0)
|
||
total_pnl = equity - 1000.0
|
||
|
||
return {
|
||
'params': config,
|
||
'pnl': round(total_pnl, 2),
|
||
'trades': total_trades,
|
||
'win_rate': round(win_rate * 100, 1),
|
||
'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0,
|
||
'max_drawdown': round(max_drawdown, 2),
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"✗ Error backtesting config: {e}")
|
||
return {
|
||
'params': config,
|
||
'pnl': 0.0,
|
||
'trades': 0,
|
||
'win_rate': 0.0,
|
||
'profit_factor': 0.0,
|
||
'max_drawdown': 0.0,
|
||
}
|
||
|
||
|
||
def generate_parameter_combinations() -> List[Dict[str, Any]]:
|
||
"""Generate all 256 parameter combinations"""
|
||
keys = PARAMETER_GRID.keys()
|
||
values = PARAMETER_GRID.values()
|
||
|
||
combinations = []
|
||
for combo in itertools.product(*values):
|
||
config = dict(zip(keys, combo))
|
||
combinations.append(config)
|
||
|
||
return combinations
|
||
|
||
|
||
def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int):
|
||
"""Process a chunk of parameter combinations"""
|
||
print(f"\n{'='*60}")
|
||
print(f"V11 Test Worker - {chunk_id}")
|
||
print(f"Processing combinations {start_idx} to {end_idx-1}")
|
||
print(f"{'='*60}\n")
|
||
|
||
# Load market data
|
||
df = load_market_data(data_file)
|
||
|
||
# Generate all combinations
|
||
all_combos = generate_parameter_combinations()
|
||
print(f"✓ Generated {len(all_combos)} total combinations")
|
||
|
||
# Get this chunk's combinations
|
||
chunk_combos = all_combos[start_idx:end_idx]
|
||
print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n")
|
||
|
||
# Backtest with multiprocessing (pass data file path instead of dataframe)
|
||
print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n")
|
||
|
||
with Pool(processes=MAX_WORKERS, initializer=init_worker, initargs=(data_file,)) as pool:
|
||
results = pool.map(backtest_config, chunk_combos)
|
||
|
||
print(f"\n✓ Completed {len(results)} backtests")
|
||
|
||
# Write results to CSV
|
||
output_dir = Path('v11_test_results')
|
||
output_dir.mkdir(exist_ok=True)
|
||
|
||
csv_file = output_dir / f"{chunk_id}_results.csv"
|
||
|
||
with open(csv_file, 'w', newline='') as f:
|
||
writer = csv.writer(f)
|
||
|
||
# Header
|
||
writer.writerow([
|
||
'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min',
|
||
'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max',
|
||
'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades'
|
||
])
|
||
|
||
# Data rows
|
||
for result in results:
|
||
params = result['params']
|
||
writer.writerow([
|
||
params['flip_threshold'],
|
||
params['adx_min'],
|
||
params['long_pos_max'],
|
||
params['short_pos_min'],
|
||
params['vol_min'],
|
||
params['entry_buffer_atr'],
|
||
params['rsi_long_min'],
|
||
params['rsi_short_max'],
|
||
result['pnl'],
|
||
result['win_rate'],
|
||
result['profit_factor'],
|
||
result['max_drawdown'],
|
||
result['trades'],
|
||
])
|
||
|
||
print(f"✓ Results saved to {csv_file}")
|
||
|
||
# Show top 5 results
|
||
sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True)
|
||
print(f"\n🏆 Top 5 Results:")
|
||
for i, r in enumerate(sorted_results[:5], 1):
|
||
print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
if len(sys.argv) != 4:
|
||
print("Usage: python v11_test_worker.py <data_file> <chunk_id> <start_idx>")
|
||
sys.exit(1)
|
||
|
||
data_file = sys.argv[1]
|
||
chunk_id = sys.argv[2]
|
||
start_idx = int(sys.argv[3])
|
||
|
||
# Calculate end index (256 combos per chunk)
|
||
end_idx = start_idx + 256
|
||
|
||
process_chunk(data_file, chunk_id, start_idx, end_idx)
|