**SSH Command Fix:** - CRITICAL: Removed && after background command (&) - Pattern: 'cmd & echo Started' works, 'cmd && echo' waits forever - Manually tested: Works perfectly on direct SSH - Result: Chunk 0 now starts successfully on worker1 (24 processes running) **Resumption Logic Fix:** - CRITICAL: Only count completed/running chunks, not pending - Query: Added 'AND status IN (completed, running)' filter - Result: Starts from chunk 0 when no chunks complete (was skipping to chunk 3) **Database Cleanup:** - CRITICAL: Delete pending/failed chunks on coordinator start - Prevents UNIQUE constraint errors on retry - Result: Clean slate allows coordinator to assign chunks fresh **Verification:** - ✅ Chunk v9_chunk_000000: status='running', assigned_worker='worker1' - ✅ Worker1: 24 Python processes running backtester - ✅ Database: Cleaned 3 pending chunks, created 1 running chunk - ⚠️ Worker2: SSH hop still timing out (separate infrastructure issue) Files changed: - cluster/distributed_coordinator.py (3 critical fixes: line 388-401, 514-533, 507-514)
717 lines
31 KiB
Python
717 lines
31 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Distributed Continuous Optimization Coordinator
|
||
|
||
Extends comprehensive_sweep.py to distribute massive parameter grids
|
||
across 2 EPYC servers (64 cores total) for 24/7 strategy discovery.
|
||
|
||
Architecture:
|
||
1. Master generates parameter grid (millions of combinations)
|
||
2. Splits into chunks (~10,000 combos per chunk)
|
||
3. Distributes chunks to workers via SSH
|
||
4. Workers run modified comprehensive_sweep on their chunk
|
||
5. Master aggregates results, identifies top performers
|
||
6. Master generates next exploration batch (nearby good configs)
|
||
7. Repeat forever - continuous improvement
|
||
|
||
Integration with Existing System:
|
||
- Uses simulator.py and MoneyLineInputs from /home/comprehensive_sweep/backtester/
|
||
- Preserves comprehensive_sweep.py output format (CSV with 14 params)
|
||
- Works with existing .venv and data files on EPYC
|
||
- Backwards compatible - can still run comprehensive_sweep.py standalone
|
||
"""
|
||
|
||
import sqlite3
|
||
import subprocess
|
||
import json
|
||
import time
|
||
import itertools
|
||
import hashlib
|
||
import threading # ADDED Nov 30, 2025: Background monitoring
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional, Tuple, Any
|
||
from dataclasses import dataclass
|
||
|
||
# Worker Configuration
|
||
WORKERS = {
|
||
'worker1': {
|
||
'host': 'root@10.10.254.106',
|
||
'cores': 32, # Full 32 threads available
|
||
'workspace': '/home/comprehensive_sweep',
|
||
'venv_path': 'backtester/.venv/bin/activate', # Relative to workspace
|
||
'ssh_key': None, # Use default key
|
||
},
|
||
'worker2': {
|
||
'host': 'root@10.20.254.100',
|
||
'cores': 32, # Full 32 threads available
|
||
'workspace': '/home/backtest_dual/backtest', # CORRECTED: Actual path on bd-host01
|
||
'venv_path': '.venv/bin/activate', # CRITICAL FIX (Nov 30): Worker2 has venv at workspace root, not in backtester/
|
||
'ssh_hop': 'root@10.10.254.106', # Connect through worker1
|
||
'ssh_key': None,
|
||
}
|
||
}
|
||
|
||
CLUSTER_DIR = Path(__file__).parent
|
||
RESULTS_DIR = CLUSTER_DIR / 'distributed_results'
|
||
DB_PATH = CLUSTER_DIR / 'exploration.db'
|
||
|
||
@dataclass
|
||
class ParameterGrid:
|
||
"""Full parameter space for comprehensive sweep"""
|
||
flip_thresholds: List[float]
|
||
ma_gaps: List[float]
|
||
adx_mins: List[int]
|
||
long_pos_maxs: List[int]
|
||
short_pos_mins: List[int]
|
||
cooldowns: List[int]
|
||
position_sizes: List[int]
|
||
tp1_multipliers: List[float]
|
||
tp2_multipliers: List[float]
|
||
sl_multipliers: List[float]
|
||
tp1_close_percents: List[int]
|
||
trailing_multipliers: List[float]
|
||
vol_mins: List[float]
|
||
max_bars_list: List[int]
|
||
|
||
def total_combinations(self) -> int:
|
||
"""Calculate total parameter space size"""
|
||
return (
|
||
len(self.flip_thresholds) * len(self.ma_gaps) * len(self.adx_mins) *
|
||
len(self.long_pos_maxs) * len(self.short_pos_mins) * len(self.cooldowns) *
|
||
len(self.position_sizes) * len(self.tp1_multipliers) * len(self.tp2_multipliers) *
|
||
len(self.sl_multipliers) * len(self.tp1_close_percents) *
|
||
len(self.trailing_multipliers) * len(self.vol_mins) * len(self.max_bars_list)
|
||
)
|
||
|
||
def to_dict(self) -> Dict[str, List]:
|
||
"""Convert to dict for JSON serialization"""
|
||
return {
|
||
'flip_thresholds': self.flip_thresholds,
|
||
'ma_gaps': self.ma_gaps,
|
||
'adx_mins': self.adx_mins,
|
||
'long_pos_maxs': self.long_pos_maxs,
|
||
'short_pos_mins': self.short_pos_mins,
|
||
'cooldowns': self.cooldowns,
|
||
'position_sizes': self.position_sizes,
|
||
'tp1_multipliers': self.tp1_multipliers,
|
||
'tp2_multipliers': self.tp2_multipliers,
|
||
'sl_multipliers': self.sl_multipliers,
|
||
'tp1_close_percents': self.tp1_close_percents,
|
||
'trailing_multipliers': self.trailing_multipliers,
|
||
'vol_mins': self.vol_mins,
|
||
'max_bars_list': self.max_bars_list,
|
||
}
|
||
|
||
class ExplorationDatabase:
|
||
"""Track all tested strategies and exploration progress"""
|
||
|
||
def __init__(self, db_path: Path):
|
||
self.db_path = db_path
|
||
self.init_db()
|
||
|
||
def init_db(self):
|
||
"""Create tables"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
c = conn.cursor()
|
||
|
||
# Strategies table - all tested configurations
|
||
c.execute('''
|
||
CREATE TABLE IF NOT EXISTS strategies (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
param_hash TEXT UNIQUE NOT NULL,
|
||
indicator_type TEXT NOT NULL,
|
||
params_json TEXT NOT NULL,
|
||
|
||
trades INTEGER,
|
||
win_rate REAL,
|
||
total_pnl REAL,
|
||
pnl_per_1k REAL,
|
||
profit_factor REAL,
|
||
max_drawdown REAL,
|
||
sharpe_ratio REAL,
|
||
|
||
tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
worker_id TEXT,
|
||
chunk_id TEXT
|
||
)
|
||
''')
|
||
|
||
# Exploration chunks - work distribution tracking
|
||
c.execute('''
|
||
CREATE TABLE IF NOT EXISTS chunks (
|
||
id TEXT PRIMARY KEY,
|
||
indicator_type TEXT NOT NULL,
|
||
grid_json TEXT NOT NULL,
|
||
chunk_start INTEGER NOT NULL,
|
||
chunk_end INTEGER NOT NULL,
|
||
total_combos INTEGER NOT NULL,
|
||
|
||
assigned_worker TEXT,
|
||
status TEXT DEFAULT 'pending',
|
||
started_at TIMESTAMP,
|
||
completed_at TIMESTAMP,
|
||
|
||
best_pnl_in_chunk REAL,
|
||
results_csv_path TEXT
|
||
)
|
||
''')
|
||
|
||
# Exploration phases - high-level progress
|
||
c.execute('''
|
||
CREATE TABLE IF NOT EXISTS phases (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
phase_name TEXT NOT NULL,
|
||
indicator_type TEXT NOT NULL,
|
||
grid_json TEXT NOT NULL,
|
||
total_combos INTEGER NOT NULL,
|
||
|
||
completed_combos INTEGER DEFAULT 0,
|
||
best_pnl_overall REAL DEFAULT 0,
|
||
best_params_json TEXT,
|
||
|
||
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
estimated_completion TIMESTAMP,
|
||
actual_completion TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# Create indexes for fast queries
|
||
c.execute('CREATE INDEX IF NOT EXISTS idx_pnl_per_1k ON strategies(pnl_per_1k DESC)')
|
||
c.execute('CREATE INDEX IF NOT EXISTS idx_indicator_type ON strategies(indicator_type)')
|
||
c.execute('CREATE INDEX IF NOT EXISTS idx_chunk_status ON chunks(status)')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def record_chunk(self, chunk_id: str, indicator_type: str, grid: ParameterGrid,
|
||
chunk_start: int, chunk_end: int, assigned_worker: str) -> None:
|
||
"""Record new chunk assigned to worker"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
c = conn.cursor()
|
||
|
||
c.execute('''
|
||
INSERT INTO chunks (id, indicator_type, grid_json, chunk_start, chunk_end,
|
||
total_combos, assigned_worker, status, started_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?)
|
||
''', (chunk_id, indicator_type, json.dumps(grid.to_dict()), chunk_start, chunk_end,
|
||
chunk_end - chunk_start, assigned_worker, datetime.now()))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def complete_chunk(self, chunk_id: str, results_csv_path: str, best_pnl: float) -> None:
|
||
"""Mark chunk as completed with results"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
c = conn.cursor()
|
||
|
||
c.execute('''
|
||
UPDATE chunks
|
||
SET status='completed', completed_at=?, results_csv_path=?, best_pnl_in_chunk=?
|
||
WHERE id=?
|
||
''', (datetime.now(), results_csv_path, best_pnl, chunk_id))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def import_results_csv(self, csv_path: str, worker_id: str, chunk_id: str) -> int:
|
||
"""Import CSV results from comprehensive_sweep into strategies table"""
|
||
import csv
|
||
|
||
conn = sqlite3.connect(self.db_path)
|
||
c = conn.cursor()
|
||
|
||
imported = 0
|
||
with open(csv_path, 'r') as f:
|
||
reader = csv.DictReader(f)
|
||
for row in reader:
|
||
# Create parameter hash for deduplication
|
||
params = {k: v for k, v in row.items() if k not in [
|
||
'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k',
|
||
'profit_factor', 'max_drawdown', 'sharpe_ratio'
|
||
]}
|
||
param_hash = hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest()
|
||
|
||
try:
|
||
c.execute('''
|
||
INSERT INTO strategies (
|
||
param_hash, indicator_type, params_json,
|
||
trades, win_rate, total_pnl, pnl_per_1k,
|
||
profit_factor, max_drawdown, sharpe_ratio,
|
||
worker_id, chunk_id
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (
|
||
param_hash, 'v9_moneyline', json.dumps(params),
|
||
int(row['trades']), float(row['win_rate']), float(row['total_pnl']),
|
||
float(row['pnl_per_1k']), float(row.get('profit_factor', 0)),
|
||
float(row.get('max_drawdown', 0)), float(row.get('sharpe_ratio', 0)),
|
||
worker_id, chunk_id
|
||
))
|
||
imported += 1
|
||
except sqlite3.IntegrityError:
|
||
# Duplicate param_hash - already tested this config
|
||
pass
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
return imported
|
||
|
||
def get_top_strategies(self, limit: int = 100) -> List[Dict]:
|
||
"""Get top performing strategies across all tested"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
c = conn.cursor()
|
||
|
||
c.execute('''
|
||
SELECT indicator_type, params_json, trades, win_rate, total_pnl, pnl_per_1k,
|
||
profit_factor, max_drawdown, sharpe_ratio, tested_at
|
||
FROM strategies
|
||
WHERE trades >= 700 -- Statistical significance
|
||
AND win_rate >= 0.50 AND win_rate <= 0.70 -- Realistic
|
||
AND profit_factor >= 1.2 -- Minimum edge
|
||
ORDER BY pnl_per_1k DESC
|
||
LIMIT ?
|
||
''', (limit,))
|
||
|
||
rows = c.fetchall()
|
||
conn.close()
|
||
|
||
results = []
|
||
for row in rows:
|
||
results.append({
|
||
'indicator_type': row[0],
|
||
'params': json.loads(row[1]),
|
||
'trades': row[2],
|
||
'win_rate': row[3],
|
||
'total_pnl': row[4],
|
||
'pnl_per_1k': row[5],
|
||
'profit_factor': row[6],
|
||
'max_drawdown': row[7],
|
||
'sharpe_ratio': row[8],
|
||
'tested_at': row[9],
|
||
})
|
||
|
||
return results
|
||
|
||
class DistributedCoordinator:
|
||
"""Coordinates distributed parameter sweeps across EPYC servers"""
|
||
|
||
def __init__(self):
|
||
self.db = ExplorationDatabase(DB_PATH)
|
||
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
def ssh_command(self, worker_id: str, command: str) -> subprocess.CompletedProcess:
|
||
"""Execute command on worker via SSH"""
|
||
worker = WORKERS[worker_id]
|
||
|
||
# CRITICAL FIX (Dec 1, 2025): Add SSH options to prevent prompts and improve reliability
|
||
ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5"
|
||
|
||
if 'ssh_hop' in worker:
|
||
# Worker 2 requires hop through worker 1
|
||
# CRITICAL FIX (Nov 29, 2025): Use double-nested quotes for 2-hop SSH
|
||
# Single quotes don't pass command to inner SSH properly
|
||
ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{command}'\""
|
||
else:
|
||
ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{command}'"
|
||
|
||
return subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True)
|
||
|
||
def deploy_worker_script(self, worker_id: str) -> bool:
|
||
"""Deploy distributed_worker.py to EPYC server"""
|
||
worker = WORKERS[worker_id]
|
||
script_path = CLUSTER_DIR / 'distributed_worker.py'
|
||
|
||
# Copy script to worker's comprehensive_sweep directory
|
||
target = f"{worker['workspace']}/backtester/scripts/distributed_worker.py"
|
||
|
||
if 'ssh_hop' in worker:
|
||
# Two-hop copy for worker2
|
||
print(f"📤 Copying worker script to {worker_id} via hop...")
|
||
# Copy to worker1 first
|
||
subprocess.run(f"scp {script_path} {WORKERS['worker1']['host']}:/tmp/", shell=True)
|
||
# Then copy from worker1 to worker2
|
||
self.ssh_command('worker1', f"scp /tmp/distributed_worker.py {worker['host']}:{target}")
|
||
else:
|
||
print(f"📤 Copying worker script to {worker_id}...")
|
||
subprocess.run(f"scp {script_path} {worker['host']}:{target}", shell=True)
|
||
|
||
print(f"✅ Worker script deployed to {worker_id}")
|
||
return True
|
||
|
||
def assign_chunk(self, worker_id: str, chunk_id: str, grid: ParameterGrid,
|
||
chunk_start: int, chunk_end: int) -> bool:
|
||
"""Assign parameter chunk to worker for processing"""
|
||
worker = WORKERS[worker_id]
|
||
|
||
# Record in database
|
||
self.db.record_chunk(chunk_id, 'v9_moneyline', grid, chunk_start, chunk_end, worker_id)
|
||
|
||
# Create chunk specification JSON
|
||
chunk_spec = {
|
||
'chunk_id': chunk_id,
|
||
'chunk_start': chunk_start,
|
||
'chunk_end': chunk_end,
|
||
'grid': grid.to_dict(),
|
||
'num_workers': worker['cores'],
|
||
}
|
||
|
||
chunk_json_path = RESULTS_DIR / f"{chunk_id}_spec.json"
|
||
with open(chunk_json_path, 'w') as f:
|
||
json.dump(chunk_spec, f, indent=2)
|
||
|
||
# Copy chunk spec to worker
|
||
target_json = f"{worker['workspace']}/chunk_{chunk_id}.json"
|
||
if 'ssh_hop' in worker:
|
||
# Two-hop copy
|
||
subprocess.run(f"scp {chunk_json_path} {WORKERS['worker1']['host']}:/tmp/", shell=True)
|
||
self.ssh_command('worker1', f"scp /tmp/{chunk_id}_spec.json {worker['host']}:{target_json}")
|
||
else:
|
||
subprocess.run(f"scp {chunk_json_path} {worker['host']}:{target_json}", shell=True)
|
||
|
||
# Execute distributed_worker.py on worker
|
||
# CRITICAL FIX (Nov 30): Use per-worker venv_path to support heterogeneous cluster configurations
|
||
# Worker1: backtester/.venv/bin/activate (venv inside backtester/)
|
||
# Worker2: .venv/bin/activate (venv at workspace root)
|
||
# PROVEN WORKING PATTERN (Nov 30): Manual SSH commands succeeded with this exact structure
|
||
venv_path = worker.get('venv_path', 'backtester/.venv/bin/activate') # Default to worker1 pattern
|
||
|
||
# Build command exactly as proven in manual tests
|
||
# CRITICAL: Use nohup with explicit background redirect to detach properly
|
||
cmd = (f"cd {worker['workspace']} && "
|
||
f"source {venv_path} && "
|
||
f"nohup python3 backtester/scripts/distributed_worker.py chunk_{chunk_id}.json "
|
||
f"> /tmp/{chunk_id}.log 2>&1 &")
|
||
|
||
print(f"🚀 Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...")
|
||
|
||
# CRITICAL FIX (Dec 1, 2025): Add SSH options for reliability and timeout handling
|
||
ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5"
|
||
|
||
# Execute command and capture result to verify it started
|
||
# CRITICAL FIX (Dec 1, 2025): Remove && because command ends with & (background)
|
||
# In single quotes, & backgrounds process and shell continues to next command (echo)
|
||
if 'ssh_hop' in worker:
|
||
# Worker 2 requires hop through worker 1
|
||
ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'\""
|
||
else:
|
||
ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'"
|
||
|
||
# DEBUG: Print command for troubleshooting
|
||
print(f" DEBUG: Executing SSH command:")
|
||
print(f" {ssh_cmd[:200]}...") # First 200 chars
|
||
|
||
# Use run() to capture output and verify success
|
||
# CRITICAL FIX (Dec 1, 2025): Increase timeout from 30s to 60s for nested SSH hops
|
||
try:
|
||
result = subprocess.run(
|
||
ssh_cmd,
|
||
shell=True,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=60 # 60 second timeout (was 30s, too short for 2-hop SSH)
|
||
)
|
||
|
||
# Verify worker process started
|
||
# CRITICAL FIX (Dec 1, 2025): Look for marker without spaces (Started_chunk_)
|
||
if f'Started_chunk_{chunk_id}' in result.stdout:
|
||
print(f"✅ Chunk {chunk_id} started on {worker_id} successfully")
|
||
return True
|
||
else:
|
||
print(f"❌ FAILED to start chunk {chunk_id} on {worker_id}")
|
||
print(f" stdout: {result.stdout}")
|
||
print(f" stderr: {result.stderr}")
|
||
return False
|
||
except subprocess.TimeoutExpired:
|
||
print(f"⚠️ SSH command timed out for {chunk_id} on {worker_id}")
|
||
print(f" This usually means SSH hop is misconfigured or slow")
|
||
return False
|
||
|
||
def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]:
|
||
"""Collect CSV results from worker"""
|
||
worker = WORKERS[worker_id]
|
||
|
||
# Check if results file exists on worker (in backtester/ subdirectory)
|
||
results_csv = f"{worker['workspace']}/backtester/chunk_{chunk_id}_results.csv"
|
||
check_cmd = f"test -f {results_csv} && echo 'exists'"
|
||
result = self.ssh_command(worker_id, check_cmd)
|
||
|
||
if 'exists' not in result.stdout:
|
||
return None # Results not ready yet
|
||
|
||
# Copy results back to master
|
||
local_csv = RESULTS_DIR / f"{chunk_id}_results.csv"
|
||
|
||
if 'ssh_hop' in worker:
|
||
# Two-hop copy back
|
||
self.ssh_command('worker1', f"scp {worker['host']}:{results_csv} /tmp/")
|
||
subprocess.run(f"scp {WORKERS['worker1']['host']}:/tmp/chunk_{chunk_id}_results.csv {local_csv}", shell=True)
|
||
else:
|
||
subprocess.run(f"scp {worker['host']}:{results_csv} {local_csv}", shell=True)
|
||
|
||
print(f"📥 Collected results from {worker_id} chunk {chunk_id}")
|
||
|
||
# Import into database
|
||
imported = self.db.import_results_csv(str(local_csv), worker_id, chunk_id)
|
||
print(f"📊 Imported {imported} unique strategies from {chunk_id}")
|
||
|
||
# Get best P&L from CSV for chunk tracking
|
||
import csv
|
||
with open(local_csv, 'r') as f:
|
||
reader = csv.DictReader(f)
|
||
rows = list(reader)
|
||
best_pnl = max(float(row['pnl_per_1k']) for row in rows) if rows else 0
|
||
|
||
self.db.complete_chunk(chunk_id, str(local_csv), best_pnl)
|
||
|
||
return str(local_csv)
|
||
|
||
def start_comprehensive_exploration(self, chunk_size: int = 10000):
|
||
"""Start massive comprehensive parameter sweep"""
|
||
print("=" * 80)
|
||
print("🚀 DISTRIBUTED COMPREHENSIVE EXPLORATION")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
# v9 Money Line parameter grid (Nov 30, 2025)
|
||
# 6 swept parameters × 4 values each = 4,096 combinations
|
||
# Focus on core trend-following parameters, fix TP/SL to proven v9 values
|
||
grid = ParameterGrid(
|
||
flip_thresholds=[0.4, 0.5, 0.6, 0.7],
|
||
ma_gaps=[0.20, 0.30, 0.40, 0.50],
|
||
adx_mins=[18, 21, 24, 27],
|
||
long_pos_maxs=[60, 65, 70, 75],
|
||
short_pos_mins=[20, 25, 30, 35],
|
||
cooldowns=[1, 2, 3, 4],
|
||
|
||
# Fixed to standard v9 values
|
||
position_sizes=[10000],
|
||
tp1_multipliers=[2.0],
|
||
tp2_multipliers=[4.0],
|
||
sl_multipliers=[3.0],
|
||
tp1_close_percents=[60],
|
||
trailing_multipliers=[1.5],
|
||
vol_mins=[1.0],
|
||
max_bars_list=[500],
|
||
)
|
||
|
||
total_combos = grid.total_combinations()
|
||
|
||
print(f"📊 Total parameter space: {total_combos:,} combinations")
|
||
print(f"📦 Chunk size: {chunk_size:,} combinations per chunk")
|
||
print(f"🎯 Total chunks: {(total_combos + chunk_size - 1) // chunk_size:,}")
|
||
print(f"⏱️ Estimated time: {(total_combos * 1.6) / (64 * 3600):.1f} hours with 64 cores")
|
||
print()
|
||
|
||
# Deploy worker scripts
|
||
for worker_id in WORKERS.keys():
|
||
self.deploy_worker_script(worker_id)
|
||
|
||
print()
|
||
print("🔄 Distributing chunks to workers...")
|
||
print()
|
||
|
||
# CRITICAL FIX (Dec 1, 2025): Clear pending/failed chunks to allow retry
|
||
# This prevents UNIQUE constraint errors on restart
|
||
conn = sqlite3.connect(self.db.db_path)
|
||
c = conn.cursor()
|
||
c.execute("DELETE FROM chunks WHERE status IN ('pending', 'failed')")
|
||
deleted_count = c.rowcount
|
||
conn.commit()
|
||
conn.close()
|
||
if deleted_count > 0:
|
||
print(f"🗑️ Cleared {deleted_count} pending/failed chunks for retry")
|
||
print()
|
||
|
||
# CRITICAL FIX (Dec 1, 2025): Resume from existing COMPLETED chunks only
|
||
# Get max chunk ID from completed/running chunks, not all chunks
|
||
# This allows retrying failed/pending chunks without UNIQUE constraint errors
|
||
conn = sqlite3.connect(self.db.db_path)
|
||
c = conn.cursor()
|
||
c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' AND status IN ('completed', 'running') ORDER BY id DESC LIMIT 1")
|
||
last_chunk = c.fetchone()
|
||
conn.close()
|
||
|
||
if last_chunk:
|
||
# Extract counter from last completed chunk ID
|
||
last_counter = int(last_chunk[0].split('_')[-1])
|
||
chunk_id_counter = last_counter + 1
|
||
chunk_start = chunk_id_counter * chunk_size
|
||
print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} completed chunks)")
|
||
print(f" Starting at combo {chunk_start:,} / {total_combos:,}")
|
||
else:
|
||
chunk_id_counter = 0
|
||
chunk_start = 0
|
||
print(f"📋 Starting fresh - no completed chunks found")
|
||
|
||
# Split work across workers
|
||
active_chunks = {}
|
||
worker_list = list(WORKERS.keys()) # ['worker1', 'worker2']
|
||
|
||
while chunk_start < total_combos:
|
||
chunk_end = min(chunk_start + chunk_size, total_combos)
|
||
chunk_id = f"v9_chunk_{chunk_id_counter:06d}"
|
||
|
||
# Round-robin assignment across both workers for balanced load
|
||
worker_id = worker_list[chunk_id_counter % len(worker_list)]
|
||
|
||
if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end):
|
||
active_chunks[chunk_id] = worker_id
|
||
|
||
chunk_id_counter += 1
|
||
chunk_start = chunk_end
|
||
|
||
# CPU limit: 1 chunk per worker = ~70% CPU usage (16 cores per chunk on 32-core machines)
|
||
if len(active_chunks) >= len(WORKERS) * 1:
|
||
print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active (70% CPU target)")
|
||
print(f"⏳ Waiting for chunks to complete...")
|
||
break
|
||
|
||
print()
|
||
print(f"✅ Assigned {len(active_chunks)} initial chunks")
|
||
print()
|
||
print("📊 Monitor progress with: python3 cluster/exploration_status.py")
|
||
print("🏆 View top strategies: sqlite3 cluster/exploration.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'")
|
||
print()
|
||
print("🔄 Starting background monitoring thread...")
|
||
|
||
# Start monitoring in background thread (Nov 30, 2025)
|
||
monitor_thread = threading.Thread(
|
||
target=self._monitor_chunks_background,
|
||
args=(grid, chunk_size, total_combos, active_chunks, worker_list,
|
||
chunk_id_counter, chunk_start, last_counter if last_chunk else None),
|
||
daemon=True # Die when main program exits
|
||
)
|
||
monitor_thread.start()
|
||
|
||
print("✅ Monitoring thread started - coordinator will now exit")
|
||
print(" (Monitoring continues in background - check logs or dashboard)")
|
||
print()
|
||
print("=" * 80)
|
||
|
||
# Keep coordinator alive so daemon thread can continue
|
||
# Thread will exit when all work is done
|
||
print("💤 Main thread sleeping - monitoring continues in background...")
|
||
print(" Press Ctrl+C to stop coordinator (will stop monitoring)")
|
||
print()
|
||
|
||
try:
|
||
monitor_thread.join() # Wait for monitoring thread to finish
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ Coordinator interrupted by user")
|
||
print(" Workers will continue running their current chunks")
|
||
print(" Restart coordinator to resume monitoring")
|
||
|
||
|
||
def _monitor_chunks_background(self, grid, chunk_size, total_combos, active_chunks,
|
||
worker_list, chunk_id_counter, chunk_start, last_counter):
|
||
"""
|
||
Background monitoring thread to detect completions and assign new chunks.
|
||
|
||
This runs continuously until all chunks are processed.
|
||
Uses polling (SSH checks every 60s) to detect when workers complete chunks.
|
||
|
||
Args:
|
||
grid: Parameter grid for generating chunks
|
||
chunk_size: Number of combinations per chunk
|
||
total_combos: Total parameter combinations to process
|
||
active_chunks: Dict mapping chunk_id -> worker_id for currently running chunks
|
||
worker_list: List of worker IDs for round-robin assignment
|
||
chunk_id_counter: Current chunk counter (for generating chunk IDs)
|
||
chunk_start: Current position in parameter space
|
||
last_counter: Counter from last existing chunk (for progress calculation)
|
||
"""
|
||
import time
|
||
poll_interval = 60 # Check every 60 seconds
|
||
|
||
print(f"🔄 Monitoring thread started (poll interval: {poll_interval}s)")
|
||
print(f" Will process {total_combos:,} combinations in chunks of {chunk_size:,}")
|
||
print()
|
||
|
||
try:
|
||
while chunk_start < total_combos or active_chunks:
|
||
time.sleep(poll_interval)
|
||
|
||
# Check each active chunk for completion
|
||
completed = []
|
||
for chunk_id, worker_id in list(active_chunks.items()):
|
||
worker = WORKERS[worker_id]
|
||
workspace = worker['workspace']
|
||
|
||
# Check if results CSV exists on worker
|
||
results_csv = f"{workspace}/backtester/chunk_{chunk_id}_results.csv"
|
||
|
||
# Use appropriate SSH path for two-hop workers
|
||
if 'ssh_hop' in worker:
|
||
check_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"test -f {results_csv} && echo EXISTS\"'"
|
||
else:
|
||
check_cmd = f"ssh {worker['host']} 'test -f {results_csv} && echo EXISTS'"
|
||
|
||
result = subprocess.run(check_cmd, shell=True, capture_output=True, text=True)
|
||
|
||
if 'EXISTS' in result.stdout:
|
||
print(f"✅ Detected completion: {chunk_id} on {worker_id}")
|
||
|
||
try:
|
||
# Collect results back to coordinator
|
||
self.collect_results(worker_id, chunk_id)
|
||
completed.append(chunk_id)
|
||
print(f"📥 Collected and imported results from {chunk_id}")
|
||
except Exception as e:
|
||
print(f"⚠️ Error collecting {chunk_id}: {e}")
|
||
# Mark as completed anyway to prevent infinite retry
|
||
completed.append(chunk_id)
|
||
|
||
# Remove completed chunks from active tracking
|
||
for chunk_id in completed:
|
||
del active_chunks[chunk_id]
|
||
|
||
# Assign new chunks if we have capacity and work remaining
|
||
# Maintain 1 chunk per worker for 70% CPU target
|
||
while len(active_chunks) < len(WORKERS) * 1 and chunk_start < total_combos:
|
||
chunk_end = min(chunk_start + chunk_size, total_combos)
|
||
chunk_id = f"v9_chunk_{chunk_id_counter:06d}"
|
||
|
||
# Round-robin assignment
|
||
worker_id = worker_list[chunk_id_counter % len(worker_list)]
|
||
|
||
if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end):
|
||
active_chunks[chunk_id] = worker_id
|
||
print(f"🎯 Assigned new chunk {chunk_id} to {worker_id}")
|
||
|
||
chunk_id_counter += 1
|
||
chunk_start = chunk_end
|
||
|
||
# Status update
|
||
completed_count = chunk_id_counter - len(active_chunks) - (last_counter + 1 if last_counter is not None else 0)
|
||
total_chunks = (total_combos + chunk_size - 1) // chunk_size
|
||
progress = (completed_count / total_chunks) * 100
|
||
print(f"📊 Progress: {completed_count}/{total_chunks} chunks ({progress:.1f}%) | Active: {len(active_chunks)}")
|
||
|
||
print()
|
||
print("=" * 80)
|
||
print("🎉 COMPREHENSIVE EXPLORATION COMPLETE!")
|
||
print("=" * 80)
|
||
|
||
except Exception as e:
|
||
print(f"❌ Monitoring thread error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def main():
|
||
"""Main coordinator entry point"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description='Distributed continuous optimization coordinator')
|
||
parser.add_argument('--chunk-size', type=int, default=2000,
|
||
help='Number of combinations per chunk (default: 2000)')
|
||
parser.add_argument('--continuous', action='store_true',
|
||
help='Run continuously (not implemented yet)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
coordinator = DistributedCoordinator()
|
||
coordinator.start_comprehensive_exploration(chunk_size=args.chunk_size)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|