#!/usr/bin/env python3 """ V11 Test Parameter Sweep Worker Processes chunks of v11 test parameter configurations (256 combinations total). Uses 27 cores (85% CPU) for multiprocessing. Test parameter grid (2 values each = 2^8 = 256 combinations): - flip_threshold: 0.5, 0.6 - adx_min: 18, 21 - long_pos_max: 75, 80 - short_pos_min: 20, 25 - vol_min: 0.8, 1.0 - entry_buffer_atr: 0.15, 0.20 - rsi_long_min: 35, 40 - rsi_short_max: 65, 70 """ import sys import csv import pandas as pd from pathlib import Path from typing import Dict, List, Any from multiprocessing import Pool import functools import itertools # Add backtester to path sys.path.insert(0, str(Path(__file__).parent.parent)) from backtester.v11_moneyline_all_filters import ( money_line_v11_signals, MoneyLineV11Inputs ) from backtester.simulator import simulate_money_line # CPU limit: 85% of 32 threads = 27 cores MAX_WORKERS = 27 # Test parameter grid (256 combinations) PARAMETER_GRID = { 'flip_threshold': [0.5, 0.6], 'adx_min': [18, 21], 'long_pos_max': [75, 80], 'short_pos_min': [20, 25], 'vol_min': [0.8, 1.0], 'entry_buffer_atr': [0.15, 0.20], 'rsi_long_min': [35, 40], 'rsi_short_max': [65, 70], } def load_market_data(csv_file: str) -> pd.DataFrame: """Load OHLCV data from CSV""" df = pd.read_csv(csv_file) # Ensure required columns exist required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] for col in required: if col not in df.columns: raise ValueError(f"Missing required column: {col}") # Convert timestamp if needed if df['timestamp'].dtype == 'object': df['timestamp'] = pd.to_datetime(df['timestamp']) df = df.set_index('timestamp') print(f"✓ Loaded {len(df):,} bars from {csv_file}") return df def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]: """ Run backtest for single v11 test parameter configuration Returns dict with: - params: original config dict - pnl: total P&L - trades: number of trades - win_rate: % winners - profit_factor: wins/losses ratio - max_drawdown: max drawdown $ """ try: # Create v11 inputs inputs = MoneyLineV11Inputs( flip_threshold=config['flip_threshold'], adx_min=config['adx_min'], long_pos_max=config['long_pos_max'], short_pos_min=config['short_pos_min'], vol_min=config['vol_min'], entry_buffer_atr=config['entry_buffer_atr'], rsi_long_min=config['rsi_long_min'], rsi_short_max=config['rsi_short_max'], ) # Generate signals signals = money_line_v11_signals(df, inputs) if not signals: return { 'params': config, 'pnl': 0.0, 'trades': 0, 'win_rate': 0.0, 'profit_factor': 0.0, 'max_drawdown': 0.0, } # Simple backtesting: track equity curve equity = 1000.0 # Starting capital peak_equity = equity max_drawdown = 0.0 wins = 0 losses = 0 win_pnl = 0.0 loss_pnl = 0.0 for signal in signals: # Simple trade simulation # TP1 at +0.86%, SL at -1.29% (ATR-based defaults) entry = signal.entry_price # Look ahead in data to see if TP or SL hit signal_idx = df.index.get_loc(signal.timestamp) # Look ahead up to 100 bars max_bars = min(100, len(df) - signal_idx - 1) if max_bars <= 0: continue future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars] if signal.direction == "long": tp_price = entry * 1.0086 # +0.86% sl_price = entry * 0.9871 # -1.29% # Check if TP or SL hit hit_tp = (future_data['high'] >= tp_price).any() hit_sl = (future_data['low'] <= sl_price).any() if hit_tp: pnl = 1000.0 * 0.0086 # $8.60 on $1000 position equity += pnl wins += 1 win_pnl += pnl elif hit_sl: pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position equity += pnl losses += 1 loss_pnl += abs(pnl) else: # short tp_price = entry * 0.9914 # -0.86% sl_price = entry * 1.0129 # +1.29% # Check if TP or SL hit hit_tp = (future_data['low'] <= tp_price).any() hit_sl = (future_data['high'] >= sl_price).any() if hit_tp: pnl = 1000.0 * 0.0086 # $8.60 on $1000 position equity += pnl wins += 1 win_pnl += pnl elif hit_sl: pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position equity += pnl losses += 1 loss_pnl += abs(pnl) # Track drawdown peak_equity = max(peak_equity, equity) current_drawdown = peak_equity - equity max_drawdown = max(max_drawdown, current_drawdown) total_trades = wins + losses win_rate = wins / total_trades if total_trades > 0 else 0.0 profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0) total_pnl = equity - 1000.0 return { 'params': config, 'pnl': round(total_pnl, 2), 'trades': total_trades, 'win_rate': round(win_rate * 100, 1), 'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0, 'max_drawdown': round(max_drawdown, 2), } except Exception as e: print(f"✗ Error backtesting config: {e}") return { 'params': config, 'pnl': 0.0, 'trades': 0, 'win_rate': 0.0, 'profit_factor': 0.0, 'max_drawdown': 0.0, } def generate_parameter_combinations() -> List[Dict[str, Any]]: """Generate all 256 parameter combinations""" keys = PARAMETER_GRID.keys() values = PARAMETER_GRID.values() combinations = [] for combo in itertools.product(*values): config = dict(zip(keys, combo)) combinations.append(config) return combinations def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int): """Process a chunk of parameter combinations""" print(f"\n{'='*60}") print(f"V11 Test Worker - {chunk_id}") print(f"Processing combinations {start_idx} to {end_idx-1}") print(f"{'='*60}\n") # Load market data df = load_market_data(data_file) # Generate all combinations all_combos = generate_parameter_combinations() print(f"✓ Generated {len(all_combos)} total combinations") # Get this chunk's combinations chunk_combos = all_combos[start_idx:end_idx] print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n") # Backtest with multiprocessing print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n") with Pool(processes=MAX_WORKERS) as pool: backtest_func = functools.partial(backtest_config, df) results = pool.map(backtest_func, chunk_combos) print(f"\n✓ Completed {len(results)} backtests") # Write results to CSV output_dir = Path('v11_test_results') output_dir.mkdir(exist_ok=True) csv_file = output_dir / f"{chunk_id}_results.csv" with open(csv_file, 'w', newline='') as f: writer = csv.writer(f) # Header writer.writerow([ 'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min', 'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max', 'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades' ]) # Data rows for result in results: params = result['params'] writer.writerow([ params['flip_threshold'], params['adx_min'], params['long_pos_max'], params['short_pos_min'], params['vol_min'], params['entry_buffer_atr'], params['rsi_long_min'], params['rsi_short_max'], result['pnl'], result['win_rate'], result['profit_factor'], result['max_drawdown'], result['trades'], ]) print(f"✓ Results saved to {csv_file}") # Show top 5 results sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True) print(f"\n🏆 Top 5 Results:") for i, r in enumerate(sorted_results[:5], 1): print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%") if __name__ == '__main__': if len(sys.argv) != 4: print("Usage: python v11_test_worker.py ") sys.exit(1) data_file = sys.argv[1] chunk_id = sys.argv[2] start_idx = int(sys.argv[3]) # Calculate end index (128 combos per chunk) end_idx = start_idx + 128 process_chunk(data_file, chunk_id, start_idx, end_idx)