#!/usr/bin/env python3 """ Distributed worker process for comprehensive parameter exploration Runs on remote EPYC servers - Modified for bd-host01 directory structure """ import sys import json import itertools import multiprocessing as mp from pathlib import Path from datetime import datetime import csv # Add backtester to path for bd-host01 structure sys.path.insert(0, str(Path(__file__).parent / 'backtester')) from backtester.simulator import simulate_money_line, MoneyLineInputs, TradeConfig from backtester.data_loader import load_csv # Rest of the file stays the same as distributed_worker.py - Works with existing .venv Python environment - Outputs same CSV format as comprehensive_sweep.py - Can run standalone or as part of distributed cluster Usage: python3 distributed_worker.py /path/to/chunk_spec.json """ import sys import json import itertools import multiprocessing as mp from pathlib import Path from datetime import datetime import csv # Import from existing comprehensive_sweep infrastructure # These paths work because script runs from /home/comprehensive_sweep/backtester/scripts/ sys.path.insert(0, str(Path(__file__).parent.parent)) from simulator import simulate_money_line, MoneyLineInputs, TradeConfig from data_loader import load_csv def test_config(args): """Test single parameter configuration (matches comprehensive_sweep.py signature)""" config_id, params, data_slice = args # Unpack parameters (14-dimensional grid) flip_thresh, ma_gap, adx_min, long_pos, short_pos, cooldown, \ pos_size, tp1_mult, tp2_mult, sl_mult, tp1_close, trail_mult, \ vol_min, max_bars = params # Create MoneyLineInputs inputs = MoneyLineInputs( flip_threshold_percent=flip_thresh, ma_gap_threshold=ma_gap, momentum_min_adx=adx_min, momentum_long_max_pos=long_pos, momentum_short_min_pos=short_pos, cooldown_bars=cooldown, momentum_spacing=3, # Fixed (not in grid) momentum_cooldown=2, # Fixed (not in grid) ) # Create TradeConfig config = TradeConfig( position_size=pos_size, atr_multiplier_tp1=tp1_mult, atr_multiplier_tp2=tp2_mult, atr_multiplier_sl=sl_mult, take_profit_1_size_percent=tp1_close, trailing_atr_multiplier=trail_mult, max_bars_per_trade=max_bars, ) # Quality filter (matches comprehensive_sweep.py) quality_filter = { 'min_adx': 15, 'min_volume_ratio': vol_min, } # Run simulation try: results = simulate_money_line( data_slice.data, data_slice.symbol, inputs, config, quality_filter ) # Extract metrics trades = len(results.trades) win_rate = results.win_rate if trades > 0 else 0 total_pnl = results.total_pnl pnl_per_1k = (total_pnl / pos_size * 1000) if pos_size > 0 else 0 profit_factor = results.profit_factor if hasattr(results, 'profit_factor') else 0 max_drawdown = abs(results.max_drawdown) if hasattr(results, 'max_drawdown') else 0 sharpe = results.sharpe_ratio if hasattr(results, 'sharpe_ratio') else 0 return (config_id, trades, win_rate, total_pnl, pnl_per_1k, profit_factor, max_drawdown, sharpe, params) except Exception as e: print(f"Error testing config {config_id}: {e}") return (config_id, 0, 0, 0, 0, 0, 0, 0, params) def process_chunk(chunk_spec_path: str): """Process parameter chunk specified in JSON file""" # Load chunk specification with open(chunk_spec_path, 'r') as f: spec = json.load(f) chunk_id = spec['chunk_id'] chunk_start = spec['chunk_start'] chunk_end = spec['chunk_end'] grid = spec['grid'] num_workers = spec['num_workers'] print(f"🎯 Processing chunk: {chunk_id}") print(f"📊 Range: {chunk_start:,} to {chunk_end:,} ({chunk_end - chunk_start:,} combinations)") print(f"⚙️ Workers: {num_workers} cores") print() # Load data (same as comprehensive_sweep.py) data_path = Path(__file__).parent.parent / 'data' / 'solusdt_5m.csv' print(f"📈 Loading data from {data_path}...") data_slice = load_csv(str(data_path)) print(f"✅ Loaded {len(data_slice.data):,} rows") print() # Generate ALL parameter combinations (same order as comprehensive_sweep.py) param_lists = [ grid['flip_thresholds'], grid['ma_gaps'], grid['adx_mins'], grid['long_pos_maxs'], grid['short_pos_mins'], grid['cooldowns'], grid['position_sizes'], grid['tp1_multipliers'], grid['tp2_multipliers'], grid['sl_multipliers'], grid['tp1_close_percents'], grid['trailing_multipliers'], grid['vol_mins'], grid['max_bars_list'], ] print("🔢 Generating parameter combinations...") all_combos = list(itertools.product(*param_lists)) total_combos = len(all_combos) print(f"✅ Generated {total_combos:,} total combinations") # Extract chunk slice chunk_combos = all_combos[chunk_start:chunk_end] print(f"✂️ Extracted chunk slice: {len(chunk_combos):,} combinations") print() # Prepare arguments for test_config args_list = [ (chunk_start + i, combo, data_slice) for i, combo in enumerate(chunk_combos) ] # Run multiprocessing sweep (same as comprehensive_sweep.py) print(f"🚀 Starting sweep with {num_workers} workers...") print() results = [] completed = 0 best_pnl = float('-inf') best_config = None with mp.Pool(processes=num_workers) as pool: for result in pool.imap_unordered(test_config, args_list, chunksize=10): results.append(result) completed += 1 # Track best if result[4] > best_pnl: # pnl_per_1k best_pnl = result[4] best_config = result # Progress every 100 configs if completed % 100 == 0: pct = (completed / len(chunk_combos)) * 100 print(f"⏳ Progress: {completed:,}/{len(chunk_combos):,} ({pct:.1f}%) - " f"Best so far: ${best_pnl:.2f}/1k") print() print(f"✅ Chunk {chunk_id} complete!") print(f"📊 Tested {len(results):,} configurations") print(f"🏆 Best P&L: ${best_pnl:.2f} per $1k") print() # Sort by profitability results.sort(key=lambda x: x[4], reverse=True) # Save results to CSV (same format as comprehensive_sweep.py) output_file = Path(__file__).parent.parent / f'chunk_{chunk_id}_results.csv' print(f"💾 Saving results to {output_file}...") with open(output_file, 'w', newline='') as f: writer = csv.writer(f) # Header writer.writerow([ 'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k', 'profit_factor', 'max_drawdown', 'sharpe_ratio', 'flip_threshold', 'ma_gap', 'adx_min', 'long_pos_max', 'short_pos_min', 'cooldown', 'position_size', 'tp1_mult', 'tp2_mult', 'sl_mult', 'tp1_close_pct', 'trailing_mult', 'vol_min', 'max_bars' ]) # Write all results for rank, result in enumerate(results, 1): config_id, trades, win_rate, total_pnl, pnl_per_1k, \ profit_factor, max_drawdown, sharpe, params = result writer.writerow([ rank, trades, f'{win_rate:.4f}', f'{total_pnl:.2f}', f'{pnl_per_1k:.2f}', f'{profit_factor:.3f}', f'{max_drawdown:.2f}', f'{sharpe:.3f}', *params ]) print(f"✅ Results saved!") print() # Print top 10 print("🏆 Top 10 configurations:") print() for i, result in enumerate(results[:10], 1): config_id, trades, win_rate, total_pnl, pnl_per_1k, \ profit_factor, max_drawdown, sharpe, params = result print(f"{i:2d}. ${pnl_per_1k:7.2f}/1k | " f"{trades:4d} trades | {win_rate*100:5.1f}% WR | " f"PF {profit_factor:.2f} | DD {max_drawdown:.1f}%") print() print(f"✅ Chunk {chunk_id} processing complete!") return output_file def main(): """Worker entry point""" if len(sys.argv) < 2: print("Usage: python3 distributed_worker.py ") sys.exit(1) chunk_spec_path = sys.argv[1] if not Path(chunk_spec_path).exists(): print(f"Error: Chunk spec file not found: {chunk_spec_path}") sys.exit(1) print("=" * 80) print("🔧 DISTRIBUTED WORKER") print("=" * 80) print() start_time = datetime.now() output_file = process_chunk(chunk_spec_path) end_time = datetime.now() duration = (end_time - start_time).total_seconds() print() print("=" * 80) print(f"⏱️ Total time: {duration:.1f} seconds ({duration/60:.1f} minutes)") print(f"📄 Results: {output_file}") print("=" * 80) if __name__ == '__main__': main()