#!/usr/bin/env python3 """ Continuous Optimization Cluster - WORKER Executes backtesting jobs assigned by master """ import json import sys import time import subprocess from datetime import datetime from pathlib import Path import os # Configuration WORKSPACE = Path('/root/optimization-cluster') JOBS_DIR = WORKSPACE / 'jobs' RESULTS_DIR = WORKSPACE / 'results' DATA_DIR = WORKSPACE / 'data' BACKTESTER_DIR = WORKSPACE / 'backtester' # Worker settings MAX_PARALLEL = 22 # 70% of 32 cores MEMORY_PER_JOB = '4G' # 4GB per worker class BacktestWorker: """Executes individual backtest jobs""" def __init__(self, job_file): self.job_file = Path(job_file) self.job = self.load_job() self.result = None def load_job(self): """Load job specification""" with open(JOBS_DIR / self.job_file, 'r') as f: return json.load(f) def execute(self): """Run the backtest""" print(f"🔬 Starting backtest: {self.job['id']}") print(f" Indicator: {self.job['indicator']}") print(f" Params: {json.dumps(self.job['params'], indent=2)}") start_time = time.time() try: # Determine which backtester to use if self.job['indicator'] == 'v9_moneyline': result = self.run_v9_backtest() else: raise ValueError(f"Unknown indicator: {self.job['indicator']}") duration = time.time() - start_time # Prepare result self.result = { 'job_id': self.job['id'], 'indicator': self.job['indicator'], 'params': self.job['params'], 'pnl': result['pnl'], 'trades': result['trades'], 'win_rate': result['win_rate'], 'profit_factor': result['profit_factor'], 'max_drawdown': result['max_drawdown'], 'sharpe_ratio': result.get('sharpe_ratio', 0), 'duration_seconds': duration, 'completed_at': datetime.now().isoformat() } # Save result self.save_result() print(f"✅ Backtest complete: ${result['pnl']:.2f} PnL, {result['trades']} trades") print(f" Duration: {duration:.1f}s") return True except Exception as e: print(f"❌ Backtest failed: {e}") # Save error result self.result = { 'job_id': self.job['id'], 'error': str(e), 'completed_at': datetime.now().isoformat() } self.save_result() return False def run_v9_backtest(self): """Execute v9 moneyline backtest""" params = self.job['params'] # Build command cmd = [ 'python3', str(BACKTESTER_DIR / 'backtester_core.py'), '--data', str(DATA_DIR / 'solusdt_5m.csv'), '--indicator', 'v9', '--flip-threshold', str(params['flip_threshold']), '--ma-gap', str(params['ma_gap']), '--momentum-adx', str(params['momentum_adx']), '--long-pos', str(params.get('long_pos', 70)), '--short-pos', str(params.get('short_pos', 25)), '--cooldown', str(params.get('cooldown_bars', 2)), '--output', 'json' ] # Execute result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: raise RuntimeError(f"Backtest failed: {result.stderr}") # Parse result return json.loads(result.stdout) def save_result(self): """Save result for master to collect""" result_file = RESULTS_DIR / f"{self.job['id']}_result.json" with open(result_file, 'w') as f: json.dump(self.result, f, indent=2) # Copy to master (via rsync) master_host = os.environ.get('MASTER_HOST', 'icke@95.216.52.28') master_dir = f"{master_host}:/home/icke/traderv4/cluster/results/" subprocess.run(['rsync', '-a', str(result_file), master_dir]) # Clean up job file (JOBS_DIR / self.job_file).unlink() def setup_workspace(): """Initialize worker workspace""" WORKSPACE.mkdir(exist_ok=True) JOBS_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) DATA_DIR.mkdir(exist_ok=True) (WORKSPACE / 'logs').mkdir(exist_ok=True) print(f"✅ Workspace ready: {WORKSPACE}") def main(): """Main worker entry point""" if len(sys.argv) < 2: print("Usage: worker.py ") sys.exit(1) job_file = sys.argv[1] # Setup setup_workspace() # Execute job worker = BacktestWorker(job_file) success = worker.execute() sys.exit(0 if success else 1) if __name__ == '__main__': main()