#!/usr/bin/env python3 """ Distributed Sweep Coordinator for 2 EPYC Servers Splits comprehensive parameter sweeps across both EPYC servers to maximize throughput. Works with existing backtester infrastructure. Architecture: - Coordinator runs on local machine - Generates job chunks (1000 configs each) - Distributes chunks to both EPYC servers via SSH - Collects results and aggregates into master CSV - Runs 24/7 to continuously test new indicator combinations Usage: python3 coordinator.py --sweep comprehensive # Run full sweep python3 coordinator.py --sweep custom --params params.json python3 coordinator.py --status # Check progress """ import subprocess import json import time import sqlite3 from pathlib import Path from datetime import datetime from typing import List, Dict, Optional, Tuple import argparse class DistributedCoordinator: """Coordinates distributed backtesting across 2 EPYC servers.""" def __init__(self): self.worker1 = "root@10.10.254.106" self.worker2 = "root@10.20.254.100" # Via worker1 self.remote_path = "/home/comprehensive_sweep/backtester" self.db_path = Path(__file__).parent / "sweep_results.db" self.chunk_size = 1000 # Configs per job chunk self._init_database() def _init_database(self): """Initialize SQLite database for tracking progress.""" conn = sqlite3.connect(self.db_path) c = conn.cursor() # Jobs table c.execute(''' CREATE TABLE IF NOT EXISTS jobs ( job_id TEXT PRIMARY KEY, worker TEXT, status TEXT, -- queued, running, completed, failed chunk_start INTEGER, chunk_end INTEGER, configs_tested INTEGER, created_at TEXT, started_at TEXT, completed_at TEXT ) ''') # Results table c.execute(''' CREATE TABLE IF NOT EXISTS results ( config_id INTEGER PRIMARY KEY, trades INTEGER, win_rate REAL, total_pnl REAL, pnl_per_1k REAL, params TEXT, -- JSON tested_at TEXT ) ''') # Sweep metadata c.execute(''' CREATE TABLE IF NOT EXISTS sweeps ( sweep_id TEXT PRIMARY KEY, sweep_type TEXT, total_configs INTEGER, configs_completed INTEGER, best_pnl_per_1k REAL, started_at TEXT, completed_at TEXT ) ''') conn.commit() conn.close() def create_sweep(self, sweep_type: str, total_configs: int) -> str: """Create a new sweep job.""" sweep_id = f"sweep_{sweep_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' INSERT INTO sweeps (sweep_id, sweep_type, total_configs, configs_completed, started_at) VALUES (?, ?, ?, 0, ?) ''', (sweep_id, sweep_type, total_configs, datetime.now().isoformat())) conn.commit() conn.close() return sweep_id def generate_job_chunks(self, sweep_id: str, total_configs: int): """Split sweep into job chunks for distribution.""" num_chunks = (total_configs + self.chunk_size - 1) // self.chunk_size conn = sqlite3.connect(self.db_path) c = conn.cursor() for i in range(num_chunks): chunk_start = i * self.chunk_size chunk_end = min((i + 1) * self.chunk_size, total_configs) job_id = f"{sweep_id}_chunk_{i:04d}" worker = self.worker1 if i % 2 == 0 else self.worker2 c.execute(''' INSERT INTO jobs (job_id, worker, status, chunk_start, chunk_end, created_at) VALUES (?, ?, 'queued', ?, ?, ?) ''', (job_id, worker, chunk_start, chunk_end, datetime.now().isoformat())) conn.commit() conn.close() print(f"✅ Created {num_chunks} job chunks") print(f" Worker 1: {num_chunks // 2} chunks") print(f" Worker 2: {num_chunks - num_chunks // 2} chunks") def dispatch_jobs(self): """Dispatch queued jobs to workers.""" conn = sqlite3.connect(self.db_path) c = conn.cursor() # Get queued jobs c.execute(''' SELECT job_id, worker, chunk_start, chunk_end FROM jobs WHERE status = 'queued' LIMIT 10 ''') jobs = c.fetchall() conn.close() if not jobs: return 0 for job_id, worker, chunk_start, chunk_end in jobs: self._start_job_on_worker(job_id, worker, chunk_start, chunk_end) return len(jobs) def _start_job_on_worker(self, job_id: str, worker: str, chunk_start: int, chunk_end: int): """Start a job on specified worker.""" try: # Create job file on worker job_spec = { 'job_id': job_id, 'chunk_start': chunk_start, 'chunk_end': chunk_end, 'timestamp': datetime.now().isoformat() } job_json = json.dumps(job_spec) # Write job file if worker == self.worker2: # Two-hop for worker2 cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'echo \\'{job_json}\\' > {self.remote_path}/jobs/{job_id}.json'\"" else: cmd = f"ssh {worker} \"echo '{job_json}' > {self.remote_path}/jobs/{job_id}.json\"" subprocess.run(cmd, shell=True, check=True) # Update job status conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' UPDATE jobs SET status = 'running', started_at = ? WHERE job_id = ? ''', (datetime.now().isoformat(), job_id)) conn.commit() conn.close() print(f"✅ Started job {job_id} on {worker}") except Exception as e: print(f"❌ Failed to start job {job_id}: {e}") conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' UPDATE jobs SET status = 'failed' WHERE job_id = ? ''', (job_id,)) conn.commit() conn.close() def collect_results(self): """Collect completed results from workers.""" for worker in [self.worker1, self.worker2]: self._collect_from_worker(worker) def _collect_from_worker(self, worker: str): """Collect results from a specific worker.""" try: # List completed result files if worker == self.worker2: cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'ls {self.remote_path}/results/*.json 2>/dev/null'\"" else: cmd = f"ssh {worker} \"ls {self.remote_path}/results/*.json 2>/dev/null\"" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode != 0: return result_files = result.stdout.strip().split('\n') for result_file in result_files: if not result_file: continue # Extract job_id from filename job_id = Path(result_file).stem # Copy result file locally local_file = Path(f"/tmp/{job_id}.json") if worker == self.worker2: subprocess.run( f"ssh {self.worker1} \"scp {self.worker2}:{result_file} /tmp/ && scp {self.worker1}:/tmp/{job_id}.json {local_file}\"", shell=True, check=True ) else: subprocess.run( f"scp {worker}:{result_file} {local_file}", shell=True, check=True ) # Parse and store results with open(local_file) as f: results = json.load(f) self._store_results(job_id, results) # Delete remote result file if worker == self.worker2: subprocess.run( f"ssh {self.worker1} \"ssh {self.worker2} 'rm {result_file}'\"", shell=True ) else: subprocess.run(f"ssh {worker} \"rm {result_file}\"", shell=True) # Delete local temp file local_file.unlink() print(f"✅ Collected results from {job_id}") except Exception as e: print(f"❌ Error collecting from {worker}: {e}") def _store_results(self, job_id: str, results: Dict): """Store results in database.""" conn = sqlite3.connect(self.db_path) c = conn.cursor() # Store each config result for result in results['configs']: c.execute(''' INSERT OR REPLACE INTO results (config_id, trades, win_rate, total_pnl, pnl_per_1k, params, tested_at) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( result['config_id'], result['trades'], result['win_rate'], result['total_pnl'], result['pnl_per_1k'], json.dumps(result['params']), datetime.now().isoformat() )) # Update job status c.execute(''' UPDATE jobs SET status = 'completed', configs_tested = ?, completed_at = ? WHERE job_id = ? ''', (len(results['configs']), datetime.now().isoformat(), job_id)) conn.commit() conn.close() def get_status(self) -> Dict: """Get current sweep status.""" conn = sqlite3.connect(self.db_path) c = conn.cursor() # Job counts c.execute('SELECT status, COUNT(*) FROM jobs GROUP BY status') job_counts = dict(c.fetchall()) # Top results c.execute(''' SELECT pnl_per_1k, trades, win_rate, params FROM results ORDER BY pnl_per_1k DESC LIMIT 5 ''') top_results = c.fetchall() # Active sweeps c.execute(''' SELECT sweep_id, total_configs, configs_completed FROM sweeps WHERE completed_at IS NULL ''') active_sweeps = c.fetchall() conn.close() return { 'jobs': job_counts, 'top_results': top_results, 'active_sweeps': active_sweeps } def run_forever(self): """Run coordinator loop indefinitely.""" print("🚀 Starting Distributed Sweep Coordinator") print("=" * 80) while True: try: # Dispatch new jobs dispatched = self.dispatch_jobs() # Collect completed results self.collect_results() # Show status status = self.get_status() print(f"\n📊 Status: {datetime.now().strftime('%H:%M:%S')}") print(f" Queued: {status['jobs'].get('queued', 0)}") print(f" Running: {status['jobs'].get('running', 0)}") print(f" Completed: {status['jobs'].get('completed', 0)}") if status['top_results']: print(f"\n🏆 Top Result: ${status['top_results'][0][0]:.2f}/1k") # Sleep before next cycle time.sleep(60) except KeyboardInterrupt: print("\n\n⏹️ Coordinator stopped") break except Exception as e: print(f"❌ Error in coordinator loop: {e}") time.sleep(60) def main(): parser = argparse.ArgumentParser(description='Distributed Sweep Coordinator') parser.add_argument('--sweep', choices=['comprehensive', 'custom'], help='Start a new sweep') parser.add_argument('--status', action='store_true', help='Show current status') parser.add_argument('--run', action='store_true', help='Run coordinator loop') args = parser.parse_args() coordinator = DistributedCoordinator() if args.status: status = coordinator.get_status() print("📊 SWEEP STATUS") print("=" * 80) print(f"Queued: {status['jobs'].get('queued', 0)}") print(f"Running: {status['jobs'].get('running', 0)}") print(f"Completed: {status['jobs'].get('completed', 0)}") print() if status['top_results']: print("🏆 TOP 5 RESULTS:") for i, (pnl, trades, wr, params) in enumerate(status['top_results'], 1): print(f"{i}. ${pnl:.2f}/1k - {trades} trades @ {wr:.1f}% WR") elif args.sweep: # Calculate total configs (example for comprehensive) # This should match your actual parameter grid total_configs = 5 * 3 * 4 * 4 * 4 * 4 * 3 * 3 * 3 * 4 # ~691,200 configs sweep_id = coordinator.create_sweep(args.sweep, total_configs) coordinator.generate_job_chunks(sweep_id, total_configs) print(f"✅ Sweep {sweep_id} created") print(f" Total configs: {total_configs:,}") print(f" Run 'python3 coordinator.py --run' to start processing") elif args.run: coordinator.run_forever() else: parser.print_help() if __name__ == '__main__': main()