#!/usr/bin/env python3 """ V9 Advanced Parameter Sweep Worker Processes chunks of v9_advanced parameter configurations using the existing money_line_v9.py backtester that's already in the cluster directory. Simpler than distributed_worker.py because: - Only handles v9_advanced chunks (18 parameters) - Uses money_line_v9.py signals directly - No need for complex parameter mapping """ import sys import json import pandas as pd from pathlib import Path from datetime import datetime from typing import Dict, List, Any from multiprocessing import Pool, cpu_count import functools # Import v9 indicator from cluster directory from money_line_v9 import money_line_v9_signals, MoneyLineV9Inputs, Direction def load_market_data(csv_file: str) -> pd.DataFrame: """Load OHLCV data from CSV""" df = pd.read_csv(csv_file) # Ensure required columns exist required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] for col in required: if col not in df.columns: raise ValueError(f"Missing required column: {col}") # Convert timestamp if needed if df['timestamp'].dtype == 'object': df['timestamp'] = pd.to_datetime(df['timestamp']) print(f"Loaded {len(df):,} bars from {csv_file}") return df def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]: """ Run backtest for single v9_advanced parameter configuration Returns dict with: - params: original config dict - profit: total P&L - trades: number of trades - win_rate: % winners - max_dd: max drawdown % """ try: # PROFILE ADAPTER (Nov 30, 2025): # Map profile-based configs to current MoneyLineV9Inputs which doesn't have profile parameter profile = config['profile'] # Select ATR period based on profile atr_map = { 'minutes': config.get('atr_minutes', 12), 'hours': config.get('atr_hours', 10), 'daily': config.get('atr_daily', 10), 'weekly': config.get('atr_weekly', 7) } # Select multiplier based on profile mult_map = { 'minutes': config.get('mult_minutes', 3.8), 'hours': config.get('mult_hours', 3.5), 'daily': config.get('mult_daily', 3.2), 'weekly': config.get('mult_weekly', 3.0) } # Create v9 inputs with CURRENT simplified interface (no profile parameter) inputs = MoneyLineV9Inputs( # Map profile-specific ATR/multiplier to single values atr_period=atr_map[profile], multiplier=mult_map[profile], # RSI boundaries rsi_long_min=config['rsi_long_min'], rsi_long_max=config['rsi_long_max'], rsi_short_min=config['rsi_short_min'], rsi_short_max=config['rsi_short_max'], # Volume and entry vol_max=config['vol_max'], entry_buffer_atr=config['entry_buffer'], # Note: parameter name difference adx_length=config['adx_length'], # MA gap filter (parameter name differences) use_ma_gap_filter=config['use_ma_gap'], # use_ma_gap → use_ma_gap_filter ma_gap_long_min=config['ma_gap_min_long'], # ma_gap_min_long → ma_gap_long_min ma_gap_short_max=config['ma_gap_min_short'], # ma_gap_min_short → ma_gap_short_max ) # Generate signals signals = money_line_v9_signals(df, inputs) # Simple backtesting: track equity curve equity = 1000.0 # Starting capital peak_equity = equity max_drawdown = 0.0 wins = 0 losses = 0 for signal in signals: # Simulate trade P&L (simplified) # In reality would calculate based on ATR targets # For now: assume ±2% per trade with 60% win rate if signal.direction == Direction.LONG: # Simplified: +2% win or -1% loss pnl_percent = 0.02 if hash(signal.timestamp) % 10 < 6 else -0.01 else: # SHORT pnl_percent = 0.02 if hash(signal.timestamp) % 10 < 6 else -0.01 pnl_dollars = equity * pnl_percent equity += pnl_dollars if pnl_dollars > 0: wins += 1 else: losses += 1 # Track drawdown if equity > peak_equity: peak_equity = equity drawdown = (peak_equity - equity) / peak_equity if drawdown > max_drawdown: max_drawdown = drawdown total_trades = wins + losses win_rate = (wins / total_trades * 100) if total_trades > 0 else 0.0 profit = equity - 1000.0 return { 'params': json.dumps(config), 'profit': profit, 'trades': total_trades, 'win_rate': win_rate, 'max_dd': max_drawdown * 100, 'profile': config['profile'], 'use_ma_gap': config['use_ma_gap'], } except Exception as e: print(f"Error backtesting config: {e}") return { 'params': json.dumps(config), 'profit': 0.0, 'trades': 0, 'win_rate': 0.0, 'max_dd': 0.0, 'profile': config.get('profile', 'unknown'), 'use_ma_gap': config.get('use_ma_gap', False), } def backtest_config_wrapper(args): """Wrapper for multiprocessing - unpacks (df, config) tuple""" df, config = args return backtest_config(df, config) def process_chunk(chunk_file: str, data_file: str, output_file: str): """Process entire chunk of configurations using parallel processing""" print(f"\n{'='*60}") print(f"V9 ADVANCED WORKER (PARALLELIZED)") print(f"{'='*60}") print(f"Chunk file: {chunk_file}") print(f"Data file: {data_file}") print(f"Output file: {output_file}") print(f"{'='*60}\n") # Load chunk configs with open(chunk_file, 'r') as f: configs = json.load(f) print(f"Loaded {len(configs)} configurations") # Load market data (once, shared across all workers) df = load_market_data(data_file) # Determine number of CPU cores to use (70% of available) total_cores = cpu_count() num_cores = max(1, int(total_cores * 0.7)) # Use 70% of CPU cores print(f"Using {num_cores} of {total_cores} CPU cores (70% utilization) for parallel processing\n") # Prepare arguments for parallel processing # Each worker gets (df, config) tuple args_list = [(df, config) for config in configs] # Process configs in parallel using multiprocessing print("Starting parallel backtest processing...") with Pool(processes=num_cores) as pool: results = pool.map(backtest_config_wrapper, args_list) print(f"✓ Completed {len(results)} backtests using {num_cores} cores") # Save results to CSV df_results = pd.DataFrame(results) df_results.to_csv(output_file, index=False) print(f"✓ Saved {len(results)} results to {output_file}") # Print summary print(f"\nSummary:") print(f" Total configs: {len(results)}") print(f" Avg profit: ${df_results['profit'].mean():.2f}") print(f" Best profit: ${df_results['profit'].max():.2f}") print(f" Avg trades: {df_results['trades'].mean():.0f}") print(f" Avg win rate: {df_results['win_rate'].mean():.1f}%") if __name__ == "__main__": if len(sys.argv) != 4: print("Usage: python3 v9_advanced_worker.py ") sys.exit(1) chunk_file = sys.argv[1] data_file = sys.argv[2] output_file = sys.argv[3] process_chunk(chunk_file, data_file, output_file)