#!/usr/bin/env python3 """ Distributed v9 advanced parameter sweep for cluster execution. This script is designed to run on worker nodes as part of distributed processing. The coordinator will split the 800K+ configurations into chunks and distribute them across the cluster. Expected per-worker throughput: ~300-500 configs/hour Total runtime: 40-80 hours on 2-worker cluster """ import argparse import sys from pathlib import Path # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) import pandas as pd from tqdm import tqdm from backtester.data_loader import load_data from backtester.indicators.money_line_v9 import MoneyLineV9Inputs, money_line_v9_signals from backtester.simulator import simulate_money_line def test_config(params): """Test a single configuration.""" # Load data once (cached) df = load_data("solusdt_5m.csv") # Create inputs with parameters inputs = MoneyLineV9Inputs( # Basic optimized params (FIXED from previous sweep) confirm_bars=0, flip_threshold_percent=0.5, cooldown_bars=3, adx_min=21, long_pos_max=75, short_pos_min=20, vol_min=1.0, # ADVANCED OPTIMIZATION PARAMETERS: atr_period=params['atr_period'], multiplier=params['multiplier'], adx_length=params['adx_length'], rsi_length=params['rsi_length'], rsi_long_min=params['rsi_long_min'], rsi_long_max=params['rsi_long_max'], rsi_short_min=params['rsi_short_min'], rsi_short_max=params['rsi_short_max'], vol_max=params['vol_max'], entry_buffer_atr=params['entry_buffer_atr'], use_heikin_ashi=params['use_heikin_ashi'], use_ma_gap_filter=params['use_ma_gap_filter'], ma_gap_long_min=params['ma_gap_long_min'], ma_gap_short_max=params['ma_gap_short_max'], ) try: # Generate signals signals = money_line_v9_signals(df, inputs) # Simulate trades results = simulate_money_line(df, signals) return { 'atr_period': params['atr_period'], 'multiplier': params['multiplier'], 'adx_length': params['adx_length'], 'rsi_length': params['rsi_length'], 'rsi_long_min': params['rsi_long_min'], 'rsi_long_max': params['rsi_long_max'], 'rsi_short_min': params['rsi_short_min'], 'rsi_short_max': params['rsi_short_max'], 'vol_max': params['vol_max'], 'entry_buffer_atr': params['entry_buffer_atr'], 'use_heikin_ashi': params['use_heikin_ashi'], 'use_ma_gap_filter': params['use_ma_gap_filter'], 'ma_gap_long_min': params['ma_gap_long_min'], 'ma_gap_short_max': params['ma_gap_short_max'], 'pnl': results['total_pnl'], 'win_rate': results['win_rate'], 'profit_factor': results['profit_factor'], 'max_drawdown': results['max_drawdown'], 'total_trades': results['total_trades'], } except Exception as e: print(f"Error testing config: {e}") return { 'atr_period': params['atr_period'], 'multiplier': params['multiplier'], 'pnl': 0, 'win_rate': 0, 'profit_factor': 0, 'max_drawdown': 0, 'total_trades': 0, } def process_chunk(chunk_file: str, output_file: str): """ Process a chunk of parameter configurations. Args: chunk_file: CSV file with parameter combinations to test output_file: CSV file to save results """ print(f"Loading chunk: {chunk_file}") chunk_df = pd.read_csv(chunk_file) print(f"Chunk size: {len(chunk_df)} configurations") print() results = [] for idx, row in tqdm(chunk_df.iterrows(), total=len(chunk_df), desc="Testing configs"): params = row.to_dict() result = test_config(params) results.append(result) # Save results results_df = pd.DataFrame(results) results_df.to_csv(output_file, index=False) print(f"\nResults saved to: {output_file}") # Print summary print() print("=" * 80) print("CHUNK COMPLETE") print("=" * 80) print() print(f"Configurations tested: {len(results_df)}") print(f"Best PnL: ${results_df['pnl'].max():.2f}") print(f"Mean PnL: ${results_df['pnl'].mean():.2f}") print(f"Configurations with trades: {(results_df['total_trades'] > 0).sum()}") print() def main(): parser = argparse.ArgumentParser(description="Process v9 advanced sweep chunk") parser.add_argument("chunk_file", help="Input CSV file with parameter combinations") parser.add_argument("output_file", help="Output CSV file for results") args = parser.parse_args() print("=" * 80) print("v9 ADVANCED SWEEP - CHUNK PROCESSOR") print("=" * 80) print() process_chunk(args.chunk_file, args.output_file) if __name__ == "__main__": main()