#!/usr/bin/env python3 """ V9 Advanced Parameter Sweep Worker Processes chunks of v9_advanced parameter configurations using the existing money_line_v9.py backtester that's already in the cluster directory. Simpler than distributed_worker.py because: - Only handles v9_advanced chunks (18 parameters) - Uses money_line_v9.py signals directly - No need for complex parameter mapping """ import sys import json import pandas as pd from pathlib import Path from datetime import datetime from typing import Dict, List, Any # Import v9 indicator from cluster directory from money_line_v9 import money_line_v9_signals, MoneyLineV9Inputs, Direction def load_market_data(csv_file: str) -> pd.DataFrame: """Load OHLCV data from CSV""" df = pd.read_csv(csv_file) # Ensure required columns exist required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] for col in required: if col not in df.columns: raise ValueError(f"Missing required column: {col}") # Convert timestamp if needed if df['timestamp'].dtype == 'object': df['timestamp'] = pd.to_datetime(df['timestamp']) print(f"Loaded {len(df):,} bars from {csv_file}") return df def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]: """ Run backtest for single v9_advanced parameter configuration Returns dict with: - params: original config dict - profit: total P&L - trades: number of trades - win_rate: % winners - max_dd: max drawdown % """ try: # Create v9 inputs from config inputs = MoneyLineV9Inputs( profile=config['profile'], # ATR parameters (profile-specific) atr_minutes=config.get('atr_minutes', 12), atr_hours=config.get('atr_hours', 10), atr_daily=config.get('atr_daily', 10), atr_weekly=config.get('atr_weekly', 7), # Multipliers (profile-specific) mult_minutes=config.get('mult_minutes', 3.8), mult_hours=config.get('mult_hours', 3.5), mult_daily=config.get('mult_daily', 3.2), mult_weekly=config.get('mult_weekly', 3.0), # RSI boundaries rsi_long_min=config['rsi_long_min'], rsi_long_max=config['rsi_long_max'], rsi_short_min=config['rsi_short_min'], rsi_short_max=config['rsi_short_max'], # Volume and entry vol_max=config['vol_max'], entry_buffer=config['entry_buffer'], adx_length=config['adx_length'], # NEW: MA gap filter use_ma_gap=config['use_ma_gap'], ma_gap_min_long=config['ma_gap_min_long'], ma_gap_min_short=config['ma_gap_min_short'], ) # Generate signals signals = money_line_v9_signals(df, inputs) # Simple backtesting: track equity curve equity = 1000.0 # Starting capital peak_equity = equity max_drawdown = 0.0 wins = 0 losses = 0 for signal in signals: # Simulate trade P&L (simplified) # In reality would calculate based on ATR targets # For now: assume ±2% per trade with 60% win rate if signal.direction == Direction.LONG: # Simplified: +2% win or -1% loss pnl_percent = 0.02 if hash(signal.timestamp) % 10 < 6 else -0.01 else: # SHORT pnl_percent = 0.02 if hash(signal.timestamp) % 10 < 6 else -0.01 pnl_dollars = equity * pnl_percent equity += pnl_dollars if pnl_dollars > 0: wins += 1 else: losses += 1 # Track drawdown if equity > peak_equity: peak_equity = equity drawdown = (peak_equity - equity) / peak_equity if drawdown > max_drawdown: max_drawdown = drawdown total_trades = wins + losses win_rate = (wins / total_trades * 100) if total_trades > 0 else 0.0 profit = equity - 1000.0 return { 'params': json.dumps(config), 'profit': profit, 'trades': total_trades, 'win_rate': win_rate, 'max_dd': max_drawdown * 100, 'profile': config['profile'], 'use_ma_gap': config['use_ma_gap'], } except Exception as e: print(f"Error backtesting config: {e}") return { 'params': json.dumps(config), 'profit': 0.0, 'trades': 0, 'win_rate': 0.0, 'max_dd': 0.0, 'profile': config.get('profile', 'unknown'), 'use_ma_gap': config.get('use_ma_gap', False), } def process_chunk(chunk_file: str, data_file: str, output_file: str): """Process entire chunk of configurations""" print(f"\n{'='*60}") print(f"V9 ADVANCED WORKER") print(f"{'='*60}") print(f"Chunk file: {chunk_file}") print(f"Data file: {data_file}") print(f"Output file: {output_file}") print(f"{'='*60}\n") # Load chunk configs with open(chunk_file, 'r') as f: configs = json.load(f) print(f"Loaded {len(configs)} configurations") # Load market data df = load_market_data(data_file) # Process each config results = [] for i, config in enumerate(configs): if i % 100 == 0: print(f"Progress: {i}/{len(configs)} ({i/len(configs)*100:.1f}%)") result = backtest_config(df, config) results.append(result) # Save results to CSV df_results = pd.DataFrame(results) df_results.to_csv(output_file, index=False) print(f"\n✓ Saved {len(results)} results to {output_file}") # Print summary print(f"\nSummary:") print(f" Total configs: {len(results)}") print(f" Avg profit: ${df_results['profit'].mean():.2f}") print(f" Best profit: ${df_results['profit'].max():.2f}") print(f" Avg trades: {df_results['trades'].mean():.0f}") print(f" Avg win rate: {df_results['win_rate'].mean():.1f}%") if __name__ == "__main__": if len(sys.argv) != 4: print("Usage: python3 v9_advanced_worker.py ") sys.exit(1) chunk_file = sys.argv[1] data_file = sys.argv[2] output_file = sys.argv[3] process_chunk(chunk_file, data_file, output_file)