#!/usr/bin/env python3 """ Distributed Continuous Optimization Coordinator Extends comprehensive_sweep.py to distribute massive parameter grids across 2 EPYC servers (64 cores total) for 24/7 strategy discovery. Architecture: 1. Master generates parameter grid (millions of combinations) 2. Splits into chunks (~10,000 combos per chunk) 3. Distributes chunks to workers via SSH 4. Workers run modified comprehensive_sweep on their chunk 5. Master aggregates results, identifies top performers 6. Master generates next exploration batch (nearby good configs) 7. Repeat forever - continuous improvement Integration with Existing System: - Uses simulator.py and MoneyLineInputs from /home/comprehensive_sweep/backtester/ - Preserves comprehensive_sweep.py output format (CSV with 14 params) - Works with existing .venv and data files on EPYC - Backwards compatible - can still run comprehensive_sweep.py standalone """ import sqlite3 import subprocess import json import time import itertools import hashlib from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass # Worker Configuration WORKERS = { 'worker1': { 'host': 'root@10.10.254.106', 'cores': 32, # Full 32 threads available 'workspace': '/home/comprehensive_sweep', 'ssh_key': None, # Use default key }, 'worker2': { 'host': 'root@10.20.254.100', 'cores': 32, # Full 32 threads available 'workspace': '/home/backtest_dual/backtest', # CORRECTED: Actual path on bd-host01 'ssh_hop': 'root@10.10.254.106', # Connect through worker1 'ssh_key': None, } } CLUSTER_DIR = Path(__file__).parent RESULTS_DIR = CLUSTER_DIR / 'distributed_results' DB_PATH = CLUSTER_DIR / 'exploration.db' @dataclass class ParameterGrid: """Full parameter space for comprehensive sweep""" flip_thresholds: List[float] ma_gaps: List[float] adx_mins: List[int] long_pos_maxs: List[int] short_pos_mins: List[int] cooldowns: List[int] position_sizes: List[int] tp1_multipliers: List[float] tp2_multipliers: List[float] sl_multipliers: List[float] tp1_close_percents: List[int] trailing_multipliers: List[float] vol_mins: List[float] max_bars_list: List[int] def total_combinations(self) -> int: """Calculate total parameter space size""" return ( len(self.flip_thresholds) * len(self.ma_gaps) * len(self.adx_mins) * len(self.long_pos_maxs) * len(self.short_pos_mins) * len(self.cooldowns) * len(self.position_sizes) * len(self.tp1_multipliers) * len(self.tp2_multipliers) * len(self.sl_multipliers) * len(self.tp1_close_percents) * len(self.trailing_multipliers) * len(self.vol_mins) * len(self.max_bars_list) ) def to_dict(self) -> Dict[str, List]: """Convert to dict for JSON serialization""" return { 'flip_thresholds': self.flip_thresholds, 'ma_gaps': self.ma_gaps, 'adx_mins': self.adx_mins, 'long_pos_maxs': self.long_pos_maxs, 'short_pos_mins': self.short_pos_mins, 'cooldowns': self.cooldowns, 'position_sizes': self.position_sizes, 'tp1_multipliers': self.tp1_multipliers, 'tp2_multipliers': self.tp2_multipliers, 'sl_multipliers': self.sl_multipliers, 'tp1_close_percents': self.tp1_close_percents, 'trailing_multipliers': self.trailing_multipliers, 'vol_mins': self.vol_mins, 'max_bars_list': self.max_bars_list, } class ExplorationDatabase: """Track all tested strategies and exploration progress""" def __init__(self, db_path: Path): self.db_path = db_path self.init_db() def init_db(self): """Create tables""" conn = sqlite3.connect(self.db_path) c = conn.cursor() # Strategies table - all tested configurations c.execute(''' CREATE TABLE IF NOT EXISTS strategies ( id INTEGER PRIMARY KEY AUTOINCREMENT, param_hash TEXT UNIQUE NOT NULL, indicator_type TEXT NOT NULL, params_json TEXT NOT NULL, trades INTEGER, win_rate REAL, total_pnl REAL, pnl_per_1k REAL, profit_factor REAL, max_drawdown REAL, sharpe_ratio REAL, tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, worker_id TEXT, chunk_id TEXT ) ''') # Exploration chunks - work distribution tracking c.execute(''' CREATE TABLE IF NOT EXISTS chunks ( id TEXT PRIMARY KEY, indicator_type TEXT NOT NULL, grid_json TEXT NOT NULL, chunk_start INTEGER NOT NULL, chunk_end INTEGER NOT NULL, total_combos INTEGER NOT NULL, assigned_worker TEXT, status TEXT DEFAULT 'pending', started_at TIMESTAMP, completed_at TIMESTAMP, best_pnl_in_chunk REAL, results_csv_path TEXT ) ''') # Exploration phases - high-level progress c.execute(''' CREATE TABLE IF NOT EXISTS phases ( id INTEGER PRIMARY KEY AUTOINCREMENT, phase_name TEXT NOT NULL, indicator_type TEXT NOT NULL, grid_json TEXT NOT NULL, total_combos INTEGER NOT NULL, completed_combos INTEGER DEFAULT 0, best_pnl_overall REAL DEFAULT 0, best_params_json TEXT, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, estimated_completion TIMESTAMP, actual_completion TIMESTAMP ) ''') # Create indexes for fast queries c.execute('CREATE INDEX IF NOT EXISTS idx_pnl_per_1k ON strategies(pnl_per_1k DESC)') c.execute('CREATE INDEX IF NOT EXISTS idx_indicator_type ON strategies(indicator_type)') c.execute('CREATE INDEX IF NOT EXISTS idx_chunk_status ON chunks(status)') conn.commit() conn.close() def record_chunk(self, chunk_id: str, indicator_type: str, grid: ParameterGrid, chunk_start: int, chunk_end: int, assigned_worker: str) -> None: """Record new chunk assigned to worker""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' INSERT INTO chunks (id, indicator_type, grid_json, chunk_start, chunk_end, total_combos, assigned_worker, status, started_at) VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?) ''', (chunk_id, indicator_type, json.dumps(grid.to_dict()), chunk_start, chunk_end, chunk_end - chunk_start, assigned_worker, datetime.now())) conn.commit() conn.close() def complete_chunk(self, chunk_id: str, results_csv_path: str, best_pnl: float) -> None: """Mark chunk as completed with results""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' UPDATE chunks SET status='completed', completed_at=?, results_csv_path=?, best_pnl_in_chunk=? WHERE id=? ''', (datetime.now(), results_csv_path, best_pnl, chunk_id)) conn.commit() conn.close() def import_results_csv(self, csv_path: str, worker_id: str, chunk_id: str) -> int: """Import CSV results from comprehensive_sweep into strategies table""" import csv conn = sqlite3.connect(self.db_path) c = conn.cursor() imported = 0 with open(csv_path, 'r') as f: reader = csv.DictReader(f) for row in reader: # Create parameter hash for deduplication params = {k: v for k, v in row.items() if k not in [ 'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k', 'profit_factor', 'max_drawdown', 'sharpe_ratio' ]} param_hash = hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest() try: c.execute(''' INSERT INTO strategies ( param_hash, indicator_type, params_json, trades, win_rate, total_pnl, pnl_per_1k, profit_factor, max_drawdown, sharpe_ratio, worker_id, chunk_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( param_hash, 'v9_moneyline', json.dumps(params), int(row['trades']), float(row['win_rate']), float(row['total_pnl']), float(row['pnl_per_1k']), float(row.get('profit_factor', 0)), float(row.get('max_drawdown', 0)), float(row.get('sharpe_ratio', 0)), worker_id, chunk_id )) imported += 1 except sqlite3.IntegrityError: # Duplicate param_hash - already tested this config pass conn.commit() conn.close() return imported def get_top_strategies(self, limit: int = 100) -> List[Dict]: """Get top performing strategies across all tested""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' SELECT indicator_type, params_json, trades, win_rate, total_pnl, pnl_per_1k, profit_factor, max_drawdown, sharpe_ratio, tested_at FROM strategies WHERE trades >= 700 -- Statistical significance AND win_rate >= 0.50 AND win_rate <= 0.70 -- Realistic AND profit_factor >= 1.2 -- Minimum edge ORDER BY pnl_per_1k DESC LIMIT ? ''', (limit,)) rows = c.fetchall() conn.close() results = [] for row in rows: results.append({ 'indicator_type': row[0], 'params': json.loads(row[1]), 'trades': row[2], 'win_rate': row[3], 'total_pnl': row[4], 'pnl_per_1k': row[5], 'profit_factor': row[6], 'max_drawdown': row[7], 'sharpe_ratio': row[8], 'tested_at': row[9], }) return results class DistributedCoordinator: """Coordinates distributed parameter sweeps across EPYC servers""" def __init__(self): self.db = ExplorationDatabase(DB_PATH) RESULTS_DIR.mkdir(parents=True, exist_ok=True) def ssh_command(self, worker_id: str, command: str) -> subprocess.CompletedProcess: """Execute command on worker via SSH""" worker = WORKERS[worker_id] if 'ssh_hop' in worker: # Worker 2 requires hop through worker 1 # CRITICAL FIX (Nov 29, 2025): Use double-nested quotes for 2-hop SSH # Single quotes don't pass command to inner SSH properly ssh_cmd = f"ssh {worker['ssh_hop']} \"ssh {worker['host']} '{command}'\"" else: ssh_cmd = f"ssh {worker['host']} '{command}'" return subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True) def deploy_worker_script(self, worker_id: str) -> bool: """Deploy distributed_worker.py to EPYC server""" worker = WORKERS[worker_id] script_path = CLUSTER_DIR / 'distributed_worker.py' # Copy script to worker's comprehensive_sweep directory target = f"{worker['workspace']}/backtester/scripts/distributed_worker.py" if 'ssh_hop' in worker: # Two-hop copy for worker2 print(f"πŸ“€ Copying worker script to {worker_id} via hop...") # Copy to worker1 first subprocess.run(f"scp {script_path} {WORKERS['worker1']['host']}:/tmp/", shell=True) # Then copy from worker1 to worker2 self.ssh_command('worker1', f"scp /tmp/distributed_worker.py {worker['host']}:{target}") else: print(f"πŸ“€ Copying worker script to {worker_id}...") subprocess.run(f"scp {script_path} {worker['host']}:{target}", shell=True) print(f"βœ… Worker script deployed to {worker_id}") return True def assign_chunk(self, worker_id: str, chunk_id: str, grid: ParameterGrid, chunk_start: int, chunk_end: int) -> bool: """Assign parameter chunk to worker for processing""" worker = WORKERS[worker_id] # Record in database self.db.record_chunk(chunk_id, 'v9_moneyline', grid, chunk_start, chunk_end, worker_id) # Create chunk specification JSON chunk_spec = { 'chunk_id': chunk_id, 'chunk_start': chunk_start, 'chunk_end': chunk_end, 'grid': grid.to_dict(), 'num_workers': worker['cores'], } chunk_json_path = RESULTS_DIR / f"{chunk_id}_spec.json" with open(chunk_json_path, 'w') as f: json.dump(chunk_spec, f, indent=2) # Copy chunk spec to worker target_json = f"{worker['workspace']}/chunk_{chunk_id}.json" if 'ssh_hop' in worker: # Two-hop copy subprocess.run(f"scp {chunk_json_path} {WORKERS['worker1']['host']}:/tmp/", shell=True) self.ssh_command('worker1', f"scp /tmp/{chunk_id}_spec.json {worker['host']}:{target_json}") else: subprocess.run(f"scp {chunk_json_path} {worker['host']}:{target_json}", shell=True) # Execute distributed_worker.py on worker # CRITICAL: Simplified SSH command without bash -c to avoid quoting issues cmd = (f"cd {worker['workspace']} && " f"source backtester/.venv/bin/activate && " f"nohup python3 backtester/scripts/distributed_worker.py {target_json} " f"> /tmp/{chunk_id}.log 2>&1 &") print(f"πŸš€ Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...") result = self.ssh_command(worker_id, cmd) if result.returncode == 0: print(f"βœ… Chunk {chunk_id} assigned to {worker_id}") return True else: print(f"❌ Failed to assign chunk {chunk_id} to {worker_id}: {result.stderr}") return False def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]: """Collect CSV results from worker""" worker = WORKERS[worker_id] # Check if results file exists on worker results_csv = f"{worker['workspace']}/chunk_{chunk_id}_results.csv" check_cmd = f"test -f {results_csv} && echo 'exists'" result = self.ssh_command(worker_id, check_cmd) if 'exists' not in result.stdout: return None # Results not ready yet # Copy results back to master local_csv = RESULTS_DIR / f"{chunk_id}_results.csv" if 'ssh_hop' in worker: # Two-hop copy back self.ssh_command('worker1', f"scp {worker['host']}:{results_csv} /tmp/") subprocess.run(f"scp {WORKERS['worker1']['host']}:/tmp/chunk_{chunk_id}_results.csv {local_csv}", shell=True) else: subprocess.run(f"scp {worker['host']}:{results_csv} {local_csv}", shell=True) print(f"πŸ“₯ Collected results from {worker_id} chunk {chunk_id}") # Import into database imported = self.db.import_results_csv(str(local_csv), worker_id, chunk_id) print(f"πŸ“Š Imported {imported} unique strategies from {chunk_id}") # Get best P&L from CSV for chunk tracking import csv with open(local_csv, 'r') as f: reader = csv.DictReader(f) rows = list(reader) best_pnl = max(float(row['pnl_per_1k']) for row in rows) if rows else 0 self.db.complete_chunk(chunk_id, str(local_csv), best_pnl) return str(local_csv) def start_comprehensive_exploration(self, chunk_size: int = 10000): """Start massive comprehensive parameter sweep""" print("=" * 80) print("πŸš€ DISTRIBUTED COMPREHENSIVE EXPLORATION") print("=" * 80) print() # Define full parameter grid (can be expanded) grid = ParameterGrid( flip_thresholds=[0.4, 0.5, 0.6, 0.7], ma_gaps=[0.20, 0.30, 0.40, 0.50], adx_mins=[18, 21, 24, 27], long_pos_maxs=[60, 65, 70, 75], short_pos_mins=[20, 25, 30, 35], cooldowns=[1, 2, 3, 4], position_sizes=[10000], # Fixed for fair comparison tp1_multipliers=[1.5, 2.0, 2.5], tp2_multipliers=[3.0, 4.0, 5.0], sl_multipliers=[2.5, 3.0, 3.5], tp1_close_percents=[50, 60, 70, 75], trailing_multipliers=[1.0, 1.5, 2.0], vol_mins=[0.8, 1.0, 1.2], max_bars_list=[300, 500, 1000], ) total_combos = grid.total_combinations() print(f"πŸ“Š Total parameter space: {total_combos:,} combinations") print(f"πŸ“¦ Chunk size: {chunk_size:,} combinations per chunk") print(f"🎯 Total chunks: {(total_combos + chunk_size - 1) // chunk_size:,}") print(f"⏱️ Estimated time: {(total_combos * 1.6) / (64 * 3600):.1f} hours with 64 cores") print() # Deploy worker scripts for worker_id in WORKERS.keys(): self.deploy_worker_script(worker_id) print() print("πŸ”„ Distributing chunks to workers...") print() # Split work across workers chunk_id_counter = 0 chunk_start = 0 active_chunks = {} worker_list = list(WORKERS.keys()) # ['worker1', 'worker2'] while chunk_start < total_combos: chunk_end = min(chunk_start + chunk_size, total_combos) chunk_id = f"v9_chunk_{chunk_id_counter:06d}" # Round-robin assignment across both workers for balanced load worker_id = worker_list[chunk_id_counter % len(worker_list)] if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end): active_chunks[chunk_id] = worker_id chunk_id_counter += 1 chunk_start = chunk_end # Don't overwhelm workers - limit to 2 chunks per worker at a time if len(active_chunks) >= len(WORKERS) * 2: print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active") print(f"⏳ Waiting for chunks to complete...") break print() print(f"βœ… Assigned {len(active_chunks)} initial chunks") print() print("πŸ“Š Monitor progress with: python3 cluster/exploration_status.py") print("πŸ† View top strategies: sqlite3 cluster/exploration.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'") def main(): """Main coordinator entry point""" import argparse parser = argparse.ArgumentParser(description='Distributed continuous optimization coordinator') parser.add_argument('--chunk-size', type=int, default=10000, help='Number of combinations per chunk (default: 10000)') parser.add_argument('--continuous', action='store_true', help='Run continuously (not implemented yet)') args = parser.parse_args() coordinator = DistributedCoordinator() coordinator.start_comprehensive_exploration(chunk_size=args.chunk_size) if __name__ == '__main__': main()