- Master controller with job queue and result aggregation - Worker scripts for parallel backtesting (22 workers per server) - SQLite database for strategy ranking and performance tracking - File-based job queue (simple, robust, survives crashes) - Auto-setup script for both EPYC servers - Status dashboard for monitoring progress - Comprehensive deployment guide Architecture: - Master: Job generation, worker coordination, result collection - Worker 1 (pve-nu-monitor01): AMD EPYC 7282, 22 parallel jobs - Worker 2 (srv-bd-host01): AMD EPYC 7302, 22 parallel jobs - Total capacity: ~49,000 backtests/day (44 cores @ 70%) Initial focus: v9 parameter refinement (27 configurations) Target: Find strategies >00/1k P&L (current baseline 92/1k) Files: - cluster/master.py: Main controller (570 lines) - cluster/worker.py: Worker execution script (220 lines) - cluster/setup_cluster.sh: Automated deployment - cluster/status.py: Real-time status dashboard - cluster/README.md: Operational documentation - cluster/DEPLOYMENT.md: Step-by-step deployment guide
165 lines
5.0 KiB
Python
165 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Continuous Optimization Cluster - WORKER
|
|
Executes backtesting jobs assigned by master
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import os
|
|
|
|
# Configuration
|
|
WORKSPACE = Path('/root/optimization-cluster')
|
|
JOBS_DIR = WORKSPACE / 'jobs'
|
|
RESULTS_DIR = WORKSPACE / 'results'
|
|
DATA_DIR = WORKSPACE / 'data'
|
|
BACKTESTER_DIR = WORKSPACE / 'backtester'
|
|
|
|
# Worker settings
|
|
MAX_PARALLEL = 22 # 70% of 32 cores
|
|
MEMORY_PER_JOB = '4G' # 4GB per worker
|
|
|
|
class BacktestWorker:
|
|
"""Executes individual backtest jobs"""
|
|
|
|
def __init__(self, job_file):
|
|
self.job_file = Path(job_file)
|
|
self.job = self.load_job()
|
|
self.result = None
|
|
|
|
def load_job(self):
|
|
"""Load job specification"""
|
|
with open(JOBS_DIR / self.job_file, 'r') as f:
|
|
return json.load(f)
|
|
|
|
def execute(self):
|
|
"""Run the backtest"""
|
|
print(f"🔬 Starting backtest: {self.job['id']}")
|
|
print(f" Indicator: {self.job['indicator']}")
|
|
print(f" Params: {json.dumps(self.job['params'], indent=2)}")
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Determine which backtester to use
|
|
if self.job['indicator'] == 'v9_moneyline':
|
|
result = self.run_v9_backtest()
|
|
else:
|
|
raise ValueError(f"Unknown indicator: {self.job['indicator']}")
|
|
|
|
duration = time.time() - start_time
|
|
|
|
# Prepare result
|
|
self.result = {
|
|
'job_id': self.job['id'],
|
|
'indicator': self.job['indicator'],
|
|
'params': self.job['params'],
|
|
'pnl': result['pnl'],
|
|
'trades': result['trades'],
|
|
'win_rate': result['win_rate'],
|
|
'profit_factor': result['profit_factor'],
|
|
'max_drawdown': result['max_drawdown'],
|
|
'sharpe_ratio': result.get('sharpe_ratio', 0),
|
|
'duration_seconds': duration,
|
|
'completed_at': datetime.now().isoformat()
|
|
}
|
|
|
|
# Save result
|
|
self.save_result()
|
|
|
|
print(f"✅ Backtest complete: ${result['pnl']:.2f} PnL, {result['trades']} trades")
|
|
print(f" Duration: {duration:.1f}s")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Backtest failed: {e}")
|
|
|
|
# Save error result
|
|
self.result = {
|
|
'job_id': self.job['id'],
|
|
'error': str(e),
|
|
'completed_at': datetime.now().isoformat()
|
|
}
|
|
self.save_result()
|
|
|
|
return False
|
|
|
|
def run_v9_backtest(self):
|
|
"""Execute v9 moneyline backtest"""
|
|
params = self.job['params']
|
|
|
|
# Build command
|
|
cmd = [
|
|
'python3',
|
|
str(BACKTESTER_DIR / 'backtester_core.py'),
|
|
'--data', str(DATA_DIR / 'solusdt_5m.csv'),
|
|
'--indicator', 'v9',
|
|
'--flip-threshold', str(params['flip_threshold']),
|
|
'--ma-gap', str(params['ma_gap']),
|
|
'--momentum-adx', str(params['momentum_adx']),
|
|
'--long-pos', str(params.get('long_pos', 70)),
|
|
'--short-pos', str(params.get('short_pos', 25)),
|
|
'--cooldown', str(params.get('cooldown_bars', 2)),
|
|
'--output', 'json'
|
|
]
|
|
|
|
# Execute
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Backtest failed: {result.stderr}")
|
|
|
|
# Parse result
|
|
return json.loads(result.stdout)
|
|
|
|
def save_result(self):
|
|
"""Save result for master to collect"""
|
|
result_file = RESULTS_DIR / f"{self.job['id']}_result.json"
|
|
|
|
with open(result_file, 'w') as f:
|
|
json.dump(self.result, f, indent=2)
|
|
|
|
# Copy to master (via rsync)
|
|
master_host = os.environ.get('MASTER_HOST', 'icke@95.216.52.28')
|
|
master_dir = f"{master_host}:/home/icke/traderv4/cluster/results/"
|
|
|
|
subprocess.run(['rsync', '-a', str(result_file), master_dir])
|
|
|
|
# Clean up job file
|
|
(JOBS_DIR / self.job_file).unlink()
|
|
|
|
def setup_workspace():
|
|
"""Initialize worker workspace"""
|
|
WORKSPACE.mkdir(exist_ok=True)
|
|
JOBS_DIR.mkdir(exist_ok=True)
|
|
RESULTS_DIR.mkdir(exist_ok=True)
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
(WORKSPACE / 'logs').mkdir(exist_ok=True)
|
|
|
|
print(f"✅ Workspace ready: {WORKSPACE}")
|
|
|
|
def main():
|
|
"""Main worker entry point"""
|
|
if len(sys.argv) < 2:
|
|
print("Usage: worker.py <job_file>")
|
|
sys.exit(1)
|
|
|
|
job_file = sys.argv[1]
|
|
|
|
# Setup
|
|
setup_workspace()
|
|
|
|
# Execute job
|
|
worker = BacktestWorker(job_file)
|
|
success = worker.execute()
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|