New Features: - Distributed coordinator orchestrates 2x AMD EPYC 16-core servers - 64 total cores processing 12M parameter combinations (70% CPU limit) - Worker1 (pve-nu-monitor01): Direct SSH access at 10.10.254.106 - Worker2 (bd-host01): 2-hop SSH through worker1 (10.20.254.100) - Web UI at /cluster shows real-time status and AI recommendations - API endpoint /api/cluster/status serves cluster metrics - Auto-refresh every 30s with top strategies and actionable insights Files Added: - cluster/distributed_coordinator.py (510 lines) - Main orchestrator - cluster/distributed_worker.py (271 lines) - Worker1 script - cluster/distributed_worker_bd_clean.py (275 lines) - Worker2 script - cluster/monitor_bd_host01.sh - Monitoring script - app/api/cluster/status/route.ts (274 lines) - API endpoint - app/cluster/page.tsx (258 lines) - Web UI - cluster/CLUSTER_SETUP.md - Complete setup and access documentation Technical Details: - SQLite database tracks chunk assignments - 10,000 combinations per chunk (1,195 total chunks) - Multiprocessing.Pool with 70% CPU limit (22 cores per EPYC) - SSH/SCP for deployment and result collection - Handles 2-hop SSH for bd-host01 access - Results in CSV format with top strategies ranked Access Documentation: - Worker1: ssh root@10.10.254.106 - Worker2: ssh root@10.10.254.106 "ssh root@10.20.254.100" - Web UI: http://localhost:3001/cluster - See CLUSTER_SETUP.md for complete guide Status: Deployed and operational
281 lines
9.1 KiB
Python
281 lines
9.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Distributed worker process for comprehensive parameter exploration
|
|
Runs on remote EPYC servers - Modified for bd-host01 directory structure
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import itertools
|
|
import multiprocessing as mp
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import csv
|
|
|
|
# Add backtester to path for bd-host01 structure
|
|
sys.path.insert(0, str(Path(__file__).parent / 'backtester'))
|
|
|
|
from backtester.simulator import simulate_money_line, MoneyLineInputs, TradeConfig
|
|
from backtester.data_loader import load_csv
|
|
|
|
# Rest of the file stays the same as distributed_worker.py
|
|
- Works with existing .venv Python environment
|
|
- Outputs same CSV format as comprehensive_sweep.py
|
|
- Can run standalone or as part of distributed cluster
|
|
|
|
Usage:
|
|
python3 distributed_worker.py /path/to/chunk_spec.json
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import itertools
|
|
import multiprocessing as mp
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import csv
|
|
|
|
# Import from existing comprehensive_sweep infrastructure
|
|
# These paths work because script runs from /home/comprehensive_sweep/backtester/scripts/
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from simulator import simulate_money_line, MoneyLineInputs, TradeConfig
|
|
from data_loader import load_csv
|
|
|
|
def test_config(args):
|
|
"""Test single parameter configuration (matches comprehensive_sweep.py signature)"""
|
|
config_id, params, data_slice = args
|
|
|
|
# Unpack parameters (14-dimensional grid)
|
|
flip_thresh, ma_gap, adx_min, long_pos, short_pos, cooldown, \
|
|
pos_size, tp1_mult, tp2_mult, sl_mult, tp1_close, trail_mult, \
|
|
vol_min, max_bars = params
|
|
|
|
# Create MoneyLineInputs
|
|
inputs = MoneyLineInputs(
|
|
flip_threshold_percent=flip_thresh,
|
|
ma_gap_threshold=ma_gap,
|
|
momentum_min_adx=adx_min,
|
|
momentum_long_max_pos=long_pos,
|
|
momentum_short_min_pos=short_pos,
|
|
cooldown_bars=cooldown,
|
|
momentum_spacing=3, # Fixed (not in grid)
|
|
momentum_cooldown=2, # Fixed (not in grid)
|
|
)
|
|
|
|
# Create TradeConfig
|
|
config = TradeConfig(
|
|
position_size=pos_size,
|
|
atr_multiplier_tp1=tp1_mult,
|
|
atr_multiplier_tp2=tp2_mult,
|
|
atr_multiplier_sl=sl_mult,
|
|
take_profit_1_size_percent=tp1_close,
|
|
trailing_atr_multiplier=trail_mult,
|
|
max_bars_per_trade=max_bars,
|
|
)
|
|
|
|
# Quality filter (matches comprehensive_sweep.py)
|
|
quality_filter = {
|
|
'min_adx': 15,
|
|
'min_volume_ratio': vol_min,
|
|
}
|
|
|
|
# Run simulation
|
|
try:
|
|
results = simulate_money_line(
|
|
data_slice.data,
|
|
data_slice.symbol,
|
|
inputs,
|
|
config,
|
|
quality_filter
|
|
)
|
|
|
|
# Extract metrics
|
|
trades = len(results.trades)
|
|
win_rate = results.win_rate if trades > 0 else 0
|
|
total_pnl = results.total_pnl
|
|
pnl_per_1k = (total_pnl / pos_size * 1000) if pos_size > 0 else 0
|
|
profit_factor = results.profit_factor if hasattr(results, 'profit_factor') else 0
|
|
max_drawdown = abs(results.max_drawdown) if hasattr(results, 'max_drawdown') else 0
|
|
sharpe = results.sharpe_ratio if hasattr(results, 'sharpe_ratio') else 0
|
|
|
|
return (config_id, trades, win_rate, total_pnl, pnl_per_1k,
|
|
profit_factor, max_drawdown, sharpe, params)
|
|
|
|
except Exception as e:
|
|
print(f"Error testing config {config_id}: {e}")
|
|
return (config_id, 0, 0, 0, 0, 0, 0, 0, params)
|
|
|
|
def process_chunk(chunk_spec_path: str):
|
|
"""Process parameter chunk specified in JSON file"""
|
|
|
|
# Load chunk specification
|
|
with open(chunk_spec_path, 'r') as f:
|
|
spec = json.load(f)
|
|
|
|
chunk_id = spec['chunk_id']
|
|
chunk_start = spec['chunk_start']
|
|
chunk_end = spec['chunk_end']
|
|
grid = spec['grid']
|
|
num_workers = spec['num_workers']
|
|
|
|
print(f"🎯 Processing chunk: {chunk_id}")
|
|
print(f"📊 Range: {chunk_start:,} to {chunk_end:,} ({chunk_end - chunk_start:,} combinations)")
|
|
print(f"⚙️ Workers: {num_workers} cores")
|
|
print()
|
|
|
|
# Load data (same as comprehensive_sweep.py)
|
|
data_path = Path(__file__).parent.parent / 'data' / 'solusdt_5m.csv'
|
|
print(f"📈 Loading data from {data_path}...")
|
|
data_slice = load_csv(str(data_path))
|
|
print(f"✅ Loaded {len(data_slice.data):,} rows")
|
|
print()
|
|
|
|
# Generate ALL parameter combinations (same order as comprehensive_sweep.py)
|
|
param_lists = [
|
|
grid['flip_thresholds'],
|
|
grid['ma_gaps'],
|
|
grid['adx_mins'],
|
|
grid['long_pos_maxs'],
|
|
grid['short_pos_mins'],
|
|
grid['cooldowns'],
|
|
grid['position_sizes'],
|
|
grid['tp1_multipliers'],
|
|
grid['tp2_multipliers'],
|
|
grid['sl_multipliers'],
|
|
grid['tp1_close_percents'],
|
|
grid['trailing_multipliers'],
|
|
grid['vol_mins'],
|
|
grid['max_bars_list'],
|
|
]
|
|
|
|
print("🔢 Generating parameter combinations...")
|
|
all_combos = list(itertools.product(*param_lists))
|
|
total_combos = len(all_combos)
|
|
print(f"✅ Generated {total_combos:,} total combinations")
|
|
|
|
# Extract chunk slice
|
|
chunk_combos = all_combos[chunk_start:chunk_end]
|
|
print(f"✂️ Extracted chunk slice: {len(chunk_combos):,} combinations")
|
|
print()
|
|
|
|
# Prepare arguments for test_config
|
|
args_list = [
|
|
(chunk_start + i, combo, data_slice)
|
|
for i, combo in enumerate(chunk_combos)
|
|
]
|
|
|
|
# Run multiprocessing sweep (same as comprehensive_sweep.py)
|
|
print(f"🚀 Starting sweep with {num_workers} workers...")
|
|
print()
|
|
|
|
results = []
|
|
completed = 0
|
|
best_pnl = float('-inf')
|
|
best_config = None
|
|
|
|
with mp.Pool(processes=num_workers) as pool:
|
|
for result in pool.imap_unordered(test_config, args_list, chunksize=10):
|
|
results.append(result)
|
|
completed += 1
|
|
|
|
# Track best
|
|
if result[4] > best_pnl: # pnl_per_1k
|
|
best_pnl = result[4]
|
|
best_config = result
|
|
|
|
# Progress every 100 configs
|
|
if completed % 100 == 0:
|
|
pct = (completed / len(chunk_combos)) * 100
|
|
print(f"⏳ Progress: {completed:,}/{len(chunk_combos):,} ({pct:.1f}%) - "
|
|
f"Best so far: ${best_pnl:.2f}/1k")
|
|
|
|
print()
|
|
print(f"✅ Chunk {chunk_id} complete!")
|
|
print(f"📊 Tested {len(results):,} configurations")
|
|
print(f"🏆 Best P&L: ${best_pnl:.2f} per $1k")
|
|
print()
|
|
|
|
# Sort by profitability
|
|
results.sort(key=lambda x: x[4], reverse=True)
|
|
|
|
# Save results to CSV (same format as comprehensive_sweep.py)
|
|
output_file = Path(__file__).parent.parent / f'chunk_{chunk_id}_results.csv'
|
|
|
|
print(f"💾 Saving results to {output_file}...")
|
|
|
|
with open(output_file, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
|
|
# Header
|
|
writer.writerow([
|
|
'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k',
|
|
'profit_factor', 'max_drawdown', 'sharpe_ratio',
|
|
'flip_threshold', 'ma_gap', 'adx_min', 'long_pos_max', 'short_pos_min',
|
|
'cooldown', 'position_size', 'tp1_mult', 'tp2_mult', 'sl_mult',
|
|
'tp1_close_pct', 'trailing_mult', 'vol_min', 'max_bars'
|
|
])
|
|
|
|
# Write all results
|
|
for rank, result in enumerate(results, 1):
|
|
config_id, trades, win_rate, total_pnl, pnl_per_1k, \
|
|
profit_factor, max_drawdown, sharpe, params = result
|
|
|
|
writer.writerow([
|
|
rank, trades, f'{win_rate:.4f}', f'{total_pnl:.2f}', f'{pnl_per_1k:.2f}',
|
|
f'{profit_factor:.3f}', f'{max_drawdown:.2f}', f'{sharpe:.3f}',
|
|
*params
|
|
])
|
|
|
|
print(f"✅ Results saved!")
|
|
print()
|
|
|
|
# Print top 10
|
|
print("🏆 Top 10 configurations:")
|
|
print()
|
|
for i, result in enumerate(results[:10], 1):
|
|
config_id, trades, win_rate, total_pnl, pnl_per_1k, \
|
|
profit_factor, max_drawdown, sharpe, params = result
|
|
|
|
print(f"{i:2d}. ${pnl_per_1k:7.2f}/1k | "
|
|
f"{trades:4d} trades | {win_rate*100:5.1f}% WR | "
|
|
f"PF {profit_factor:.2f} | DD {max_drawdown:.1f}%")
|
|
|
|
print()
|
|
print(f"✅ Chunk {chunk_id} processing complete!")
|
|
|
|
return output_file
|
|
|
|
def main():
|
|
"""Worker entry point"""
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python3 distributed_worker.py <chunk_spec.json>")
|
|
sys.exit(1)
|
|
|
|
chunk_spec_path = sys.argv[1]
|
|
|
|
if not Path(chunk_spec_path).exists():
|
|
print(f"Error: Chunk spec file not found: {chunk_spec_path}")
|
|
sys.exit(1)
|
|
|
|
print("=" * 80)
|
|
print("🔧 DISTRIBUTED WORKER")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
start_time = datetime.now()
|
|
|
|
output_file = process_chunk(chunk_spec_path)
|
|
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
print()
|
|
print("=" * 80)
|
|
print(f"⏱️ Total time: {duration:.1f} seconds ({duration/60:.1f} minutes)")
|
|
print(f"📄 Results: {output_file}")
|
|
print("=" * 80)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|