Files
trading_bot_v4/cluster/distributed_worker_bd.py
mindesbunister b77282b560 feat: Add EPYC cluster distributed sweep with web UI
New Features:
- Distributed coordinator orchestrates 2x AMD EPYC 16-core servers
- 64 total cores processing 12M parameter combinations (70% CPU limit)
- Worker1 (pve-nu-monitor01): Direct SSH access at 10.10.254.106
- Worker2 (bd-host01): 2-hop SSH through worker1 (10.20.254.100)
- Web UI at /cluster shows real-time status and AI recommendations
- API endpoint /api/cluster/status serves cluster metrics
- Auto-refresh every 30s with top strategies and actionable insights

Files Added:
- cluster/distributed_coordinator.py (510 lines) - Main orchestrator
- cluster/distributed_worker.py (271 lines) - Worker1 script
- cluster/distributed_worker_bd_clean.py (275 lines) - Worker2 script
- cluster/monitor_bd_host01.sh - Monitoring script
- app/api/cluster/status/route.ts (274 lines) - API endpoint
- app/cluster/page.tsx (258 lines) - Web UI
- cluster/CLUSTER_SETUP.md - Complete setup and access documentation

Technical Details:
- SQLite database tracks chunk assignments
- 10,000 combinations per chunk (1,195 total chunks)
- Multiprocessing.Pool with 70% CPU limit (22 cores per EPYC)
- SSH/SCP for deployment and result collection
- Handles 2-hop SSH for bd-host01 access
- Results in CSV format with top strategies ranked

Access Documentation:
- Worker1: ssh root@10.10.254.106
- Worker2: ssh root@10.10.254.106 "ssh root@10.20.254.100"
- Web UI: http://localhost:3001/cluster
- See CLUSTER_SETUP.md for complete guide

Status: Deployed and operational
2025-11-30 13:02:18 +01:00

281 lines
9.1 KiB
Python

#!/usr/bin/env python3
"""
Distributed worker process for comprehensive parameter exploration
Runs on remote EPYC servers - Modified for bd-host01 directory structure
"""
import sys
import json
import itertools
import multiprocessing as mp
from pathlib import Path
from datetime import datetime
import csv
# Add backtester to path for bd-host01 structure
sys.path.insert(0, str(Path(__file__).parent / 'backtester'))
from backtester.simulator import simulate_money_line, MoneyLineInputs, TradeConfig
from backtester.data_loader import load_csv
# Rest of the file stays the same as distributed_worker.py
- Works with existing .venv Python environment
- Outputs same CSV format as comprehensive_sweep.py
- Can run standalone or as part of distributed cluster
Usage:
python3 distributed_worker.py /path/to/chunk_spec.json
"""
import sys
import json
import itertools
import multiprocessing as mp
from pathlib import Path
from datetime import datetime
import csv
# Import from existing comprehensive_sweep infrastructure
# These paths work because script runs from /home/comprehensive_sweep/backtester/scripts/
sys.path.insert(0, str(Path(__file__).parent.parent))
from simulator import simulate_money_line, MoneyLineInputs, TradeConfig
from data_loader import load_csv
def test_config(args):
"""Test single parameter configuration (matches comprehensive_sweep.py signature)"""
config_id, params, data_slice = args
# Unpack parameters (14-dimensional grid)
flip_thresh, ma_gap, adx_min, long_pos, short_pos, cooldown, \
pos_size, tp1_mult, tp2_mult, sl_mult, tp1_close, trail_mult, \
vol_min, max_bars = params
# Create MoneyLineInputs
inputs = MoneyLineInputs(
flip_threshold_percent=flip_thresh,
ma_gap_threshold=ma_gap,
momentum_min_adx=adx_min,
momentum_long_max_pos=long_pos,
momentum_short_min_pos=short_pos,
cooldown_bars=cooldown,
momentum_spacing=3, # Fixed (not in grid)
momentum_cooldown=2, # Fixed (not in grid)
)
# Create TradeConfig
config = TradeConfig(
position_size=pos_size,
atr_multiplier_tp1=tp1_mult,
atr_multiplier_tp2=tp2_mult,
atr_multiplier_sl=sl_mult,
take_profit_1_size_percent=tp1_close,
trailing_atr_multiplier=trail_mult,
max_bars_per_trade=max_bars,
)
# Quality filter (matches comprehensive_sweep.py)
quality_filter = {
'min_adx': 15,
'min_volume_ratio': vol_min,
}
# Run simulation
try:
results = simulate_money_line(
data_slice.data,
data_slice.symbol,
inputs,
config,
quality_filter
)
# Extract metrics
trades = len(results.trades)
win_rate = results.win_rate if trades > 0 else 0
total_pnl = results.total_pnl
pnl_per_1k = (total_pnl / pos_size * 1000) if pos_size > 0 else 0
profit_factor = results.profit_factor if hasattr(results, 'profit_factor') else 0
max_drawdown = abs(results.max_drawdown) if hasattr(results, 'max_drawdown') else 0
sharpe = results.sharpe_ratio if hasattr(results, 'sharpe_ratio') else 0
return (config_id, trades, win_rate, total_pnl, pnl_per_1k,
profit_factor, max_drawdown, sharpe, params)
except Exception as e:
print(f"Error testing config {config_id}: {e}")
return (config_id, 0, 0, 0, 0, 0, 0, 0, params)
def process_chunk(chunk_spec_path: str):
"""Process parameter chunk specified in JSON file"""
# Load chunk specification
with open(chunk_spec_path, 'r') as f:
spec = json.load(f)
chunk_id = spec['chunk_id']
chunk_start = spec['chunk_start']
chunk_end = spec['chunk_end']
grid = spec['grid']
num_workers = spec['num_workers']
print(f"🎯 Processing chunk: {chunk_id}")
print(f"📊 Range: {chunk_start:,} to {chunk_end:,} ({chunk_end - chunk_start:,} combinations)")
print(f"⚙️ Workers: {num_workers} cores")
print()
# Load data (same as comprehensive_sweep.py)
data_path = Path(__file__).parent.parent / 'data' / 'solusdt_5m.csv'
print(f"📈 Loading data from {data_path}...")
data_slice = load_csv(str(data_path))
print(f"✅ Loaded {len(data_slice.data):,} rows")
print()
# Generate ALL parameter combinations (same order as comprehensive_sweep.py)
param_lists = [
grid['flip_thresholds'],
grid['ma_gaps'],
grid['adx_mins'],
grid['long_pos_maxs'],
grid['short_pos_mins'],
grid['cooldowns'],
grid['position_sizes'],
grid['tp1_multipliers'],
grid['tp2_multipliers'],
grid['sl_multipliers'],
grid['tp1_close_percents'],
grid['trailing_multipliers'],
grid['vol_mins'],
grid['max_bars_list'],
]
print("🔢 Generating parameter combinations...")
all_combos = list(itertools.product(*param_lists))
total_combos = len(all_combos)
print(f"✅ Generated {total_combos:,} total combinations")
# Extract chunk slice
chunk_combos = all_combos[chunk_start:chunk_end]
print(f"✂️ Extracted chunk slice: {len(chunk_combos):,} combinations")
print()
# Prepare arguments for test_config
args_list = [
(chunk_start + i, combo, data_slice)
for i, combo in enumerate(chunk_combos)
]
# Run multiprocessing sweep (same as comprehensive_sweep.py)
print(f"🚀 Starting sweep with {num_workers} workers...")
print()
results = []
completed = 0
best_pnl = float('-inf')
best_config = None
with mp.Pool(processes=num_workers) as pool:
for result in pool.imap_unordered(test_config, args_list, chunksize=10):
results.append(result)
completed += 1
# Track best
if result[4] > best_pnl: # pnl_per_1k
best_pnl = result[4]
best_config = result
# Progress every 100 configs
if completed % 100 == 0:
pct = (completed / len(chunk_combos)) * 100
print(f"⏳ Progress: {completed:,}/{len(chunk_combos):,} ({pct:.1f}%) - "
f"Best so far: ${best_pnl:.2f}/1k")
print()
print(f"✅ Chunk {chunk_id} complete!")
print(f"📊 Tested {len(results):,} configurations")
print(f"🏆 Best P&L: ${best_pnl:.2f} per $1k")
print()
# Sort by profitability
results.sort(key=lambda x: x[4], reverse=True)
# Save results to CSV (same format as comprehensive_sweep.py)
output_file = Path(__file__).parent.parent / f'chunk_{chunk_id}_results.csv'
print(f"💾 Saving results to {output_file}...")
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k',
'profit_factor', 'max_drawdown', 'sharpe_ratio',
'flip_threshold', 'ma_gap', 'adx_min', 'long_pos_max', 'short_pos_min',
'cooldown', 'position_size', 'tp1_mult', 'tp2_mult', 'sl_mult',
'tp1_close_pct', 'trailing_mult', 'vol_min', 'max_bars'
])
# Write all results
for rank, result in enumerate(results, 1):
config_id, trades, win_rate, total_pnl, pnl_per_1k, \
profit_factor, max_drawdown, sharpe, params = result
writer.writerow([
rank, trades, f'{win_rate:.4f}', f'{total_pnl:.2f}', f'{pnl_per_1k:.2f}',
f'{profit_factor:.3f}', f'{max_drawdown:.2f}', f'{sharpe:.3f}',
*params
])
print(f"✅ Results saved!")
print()
# Print top 10
print("🏆 Top 10 configurations:")
print()
for i, result in enumerate(results[:10], 1):
config_id, trades, win_rate, total_pnl, pnl_per_1k, \
profit_factor, max_drawdown, sharpe, params = result
print(f"{i:2d}. ${pnl_per_1k:7.2f}/1k | "
f"{trades:4d} trades | {win_rate*100:5.1f}% WR | "
f"PF {profit_factor:.2f} | DD {max_drawdown:.1f}%")
print()
print(f"✅ Chunk {chunk_id} processing complete!")
return output_file
def main():
"""Worker entry point"""
if len(sys.argv) < 2:
print("Usage: python3 distributed_worker.py <chunk_spec.json>")
sys.exit(1)
chunk_spec_path = sys.argv[1]
if not Path(chunk_spec_path).exists():
print(f"Error: Chunk spec file not found: {chunk_spec_path}")
sys.exit(1)
print("=" * 80)
print("🔧 DISTRIBUTED WORKER")
print("=" * 80)
print()
start_time = datetime.now()
output_file = process_chunk(chunk_spec_path)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print()
print("=" * 80)
print(f"⏱️ Total time: {duration:.1f} seconds ({duration/60:.1f} minutes)")
print(f"📄 Results: {output_file}")
print("=" * 80)
if __name__ == '__main__':
main()