CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST
Changes:
1. app/api/cluster/status/route.ts:
- Query exploration database before SSH detection
- If running chunks exist, mark workers 'active' even if SSH fails
- Override worker status: 'offline' → 'active' when chunks running
- Log: '✅ Cluster status: ACTIVE (database shows running chunks)'
- Database is source of truth, SSH only for supplementary metrics
2. app/cluster/page.tsx:
- Stop button ALREADY EXISTS (conditionally shown)
- Shows Start when status='idle', Stop when status='active'
- No code changes needed - fixed by status detection
Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues
Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
420 lines
14 KiB
Python
420 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Distributed Sweep Coordinator for 2 EPYC Servers
|
|
|
|
Splits comprehensive parameter sweeps across both EPYC servers
|
|
to maximize throughput. Works with existing backtester infrastructure.
|
|
|
|
Architecture:
|
|
- Coordinator runs on local machine
|
|
- Generates job chunks (1000 configs each)
|
|
- Distributes chunks to both EPYC servers via SSH
|
|
- Collects results and aggregates into master CSV
|
|
- Runs 24/7 to continuously test new indicator combinations
|
|
|
|
Usage:
|
|
python3 coordinator.py --sweep comprehensive # Run full sweep
|
|
python3 coordinator.py --sweep custom --params params.json
|
|
python3 coordinator.py --status # Check progress
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import time
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import List, Dict, Optional, Tuple
|
|
import argparse
|
|
|
|
class DistributedCoordinator:
|
|
"""Coordinates distributed backtesting across 2 EPYC servers."""
|
|
|
|
def __init__(self):
|
|
self.worker1 = "root@10.10.254.106"
|
|
self.worker2 = "root@10.20.254.100" # Via worker1
|
|
self.remote_path = "/home/comprehensive_sweep/backtester"
|
|
self.db_path = Path(__file__).parent / "sweep_results.db"
|
|
self.chunk_size = 1000 # Configs per job chunk
|
|
|
|
self._init_database()
|
|
|
|
def _init_database(self):
|
|
"""Initialize SQLite database for tracking progress."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
# Jobs table
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
job_id TEXT PRIMARY KEY,
|
|
worker TEXT,
|
|
status TEXT, -- queued, running, completed, failed
|
|
chunk_start INTEGER,
|
|
chunk_end INTEGER,
|
|
configs_tested INTEGER,
|
|
created_at TEXT,
|
|
started_at TEXT,
|
|
completed_at TEXT
|
|
)
|
|
''')
|
|
|
|
# Results table
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS results (
|
|
config_id INTEGER PRIMARY KEY,
|
|
trades INTEGER,
|
|
win_rate REAL,
|
|
total_pnl REAL,
|
|
pnl_per_1k REAL,
|
|
params TEXT, -- JSON
|
|
tested_at TEXT
|
|
)
|
|
''')
|
|
|
|
# Sweep metadata
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS sweeps (
|
|
sweep_id TEXT PRIMARY KEY,
|
|
sweep_type TEXT,
|
|
total_configs INTEGER,
|
|
configs_completed INTEGER,
|
|
best_pnl_per_1k REAL,
|
|
started_at TEXT,
|
|
completed_at TEXT
|
|
)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def create_sweep(self, sweep_type: str, total_configs: int) -> str:
|
|
"""Create a new sweep job."""
|
|
sweep_id = f"sweep_{sweep_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
c.execute('''
|
|
INSERT INTO sweeps (sweep_id, sweep_type, total_configs,
|
|
configs_completed, started_at)
|
|
VALUES (?, ?, ?, 0, ?)
|
|
''', (sweep_id, sweep_type, total_configs, datetime.now().isoformat()))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return sweep_id
|
|
|
|
def generate_job_chunks(self, sweep_id: str, total_configs: int):
|
|
"""Split sweep into job chunks for distribution."""
|
|
num_chunks = (total_configs + self.chunk_size - 1) // self.chunk_size
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
for i in range(num_chunks):
|
|
chunk_start = i * self.chunk_size
|
|
chunk_end = min((i + 1) * self.chunk_size, total_configs)
|
|
|
|
job_id = f"{sweep_id}_chunk_{i:04d}"
|
|
worker = self.worker1 if i % 2 == 0 else self.worker2
|
|
|
|
c.execute('''
|
|
INSERT INTO jobs (job_id, worker, status, chunk_start,
|
|
chunk_end, created_at)
|
|
VALUES (?, ?, 'queued', ?, ?, ?)
|
|
''', (job_id, worker, chunk_start, chunk_end,
|
|
datetime.now().isoformat()))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ Created {num_chunks} job chunks")
|
|
print(f" Worker 1: {num_chunks // 2} chunks")
|
|
print(f" Worker 2: {num_chunks - num_chunks // 2} chunks")
|
|
|
|
def dispatch_jobs(self):
|
|
"""Dispatch queued jobs to workers."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
# Get queued jobs
|
|
c.execute('''
|
|
SELECT job_id, worker, chunk_start, chunk_end
|
|
FROM jobs
|
|
WHERE status = 'queued'
|
|
LIMIT 10
|
|
''')
|
|
|
|
jobs = c.fetchall()
|
|
conn.close()
|
|
|
|
if not jobs:
|
|
return 0
|
|
|
|
for job_id, worker, chunk_start, chunk_end in jobs:
|
|
self._start_job_on_worker(job_id, worker, chunk_start, chunk_end)
|
|
|
|
return len(jobs)
|
|
|
|
def _start_job_on_worker(self, job_id: str, worker: str,
|
|
chunk_start: int, chunk_end: int):
|
|
"""Start a job on specified worker."""
|
|
try:
|
|
# Create job file on worker
|
|
job_spec = {
|
|
'job_id': job_id,
|
|
'chunk_start': chunk_start,
|
|
'chunk_end': chunk_end,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
job_json = json.dumps(job_spec)
|
|
|
|
# Write job file
|
|
if worker == self.worker2:
|
|
# Two-hop for worker2
|
|
cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'echo \\'{job_json}\\' > {self.remote_path}/jobs/{job_id}.json'\""
|
|
else:
|
|
cmd = f"ssh {worker} \"echo '{job_json}' > {self.remote_path}/jobs/{job_id}.json\""
|
|
|
|
subprocess.run(cmd, shell=True, check=True)
|
|
|
|
# Update job status
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
c.execute('''
|
|
UPDATE jobs
|
|
SET status = 'running', started_at = ?
|
|
WHERE job_id = ?
|
|
''', (datetime.now().isoformat(), job_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ Started job {job_id} on {worker}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Failed to start job {job_id}: {e}")
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
c.execute('''
|
|
UPDATE jobs SET status = 'failed' WHERE job_id = ?
|
|
''', (job_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def collect_results(self):
|
|
"""Collect completed results from workers."""
|
|
for worker in [self.worker1, self.worker2]:
|
|
self._collect_from_worker(worker)
|
|
|
|
def _collect_from_worker(self, worker: str):
|
|
"""Collect results from a specific worker."""
|
|
try:
|
|
# List completed result files
|
|
if worker == self.worker2:
|
|
cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'ls {self.remote_path}/results/*.json 2>/dev/null'\""
|
|
else:
|
|
cmd = f"ssh {worker} \"ls {self.remote_path}/results/*.json 2>/dev/null\""
|
|
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
return
|
|
|
|
result_files = result.stdout.strip().split('\n')
|
|
|
|
for result_file in result_files:
|
|
if not result_file:
|
|
continue
|
|
|
|
# Extract job_id from filename
|
|
job_id = Path(result_file).stem
|
|
|
|
# Copy result file locally
|
|
local_file = Path(f"/tmp/{job_id}.json")
|
|
|
|
if worker == self.worker2:
|
|
subprocess.run(
|
|
f"ssh {self.worker1} \"scp {self.worker2}:{result_file} /tmp/ && scp {self.worker1}:/tmp/{job_id}.json {local_file}\"",
|
|
shell=True, check=True
|
|
)
|
|
else:
|
|
subprocess.run(
|
|
f"scp {worker}:{result_file} {local_file}",
|
|
shell=True, check=True
|
|
)
|
|
|
|
# Parse and store results
|
|
with open(local_file) as f:
|
|
results = json.load(f)
|
|
|
|
self._store_results(job_id, results)
|
|
|
|
# Delete remote result file
|
|
if worker == self.worker2:
|
|
subprocess.run(
|
|
f"ssh {self.worker1} \"ssh {self.worker2} 'rm {result_file}'\"",
|
|
shell=True
|
|
)
|
|
else:
|
|
subprocess.run(f"ssh {worker} \"rm {result_file}\"", shell=True)
|
|
|
|
# Delete local temp file
|
|
local_file.unlink()
|
|
|
|
print(f"✅ Collected results from {job_id}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error collecting from {worker}: {e}")
|
|
|
|
def _store_results(self, job_id: str, results: Dict):
|
|
"""Store results in database."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
# Store each config result
|
|
for result in results['configs']:
|
|
c.execute('''
|
|
INSERT OR REPLACE INTO results
|
|
(config_id, trades, win_rate, total_pnl, pnl_per_1k,
|
|
params, tested_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
result['config_id'],
|
|
result['trades'],
|
|
result['win_rate'],
|
|
result['total_pnl'],
|
|
result['pnl_per_1k'],
|
|
json.dumps(result['params']),
|
|
datetime.now().isoformat()
|
|
))
|
|
|
|
# Update job status
|
|
c.execute('''
|
|
UPDATE jobs
|
|
SET status = 'completed',
|
|
configs_tested = ?,
|
|
completed_at = ?
|
|
WHERE job_id = ?
|
|
''', (len(results['configs']), datetime.now().isoformat(), job_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_status(self) -> Dict:
|
|
"""Get current sweep status."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
# Job counts
|
|
c.execute('SELECT status, COUNT(*) FROM jobs GROUP BY status')
|
|
job_counts = dict(c.fetchall())
|
|
|
|
# Top results
|
|
c.execute('''
|
|
SELECT pnl_per_1k, trades, win_rate, params
|
|
FROM results
|
|
ORDER BY pnl_per_1k DESC
|
|
LIMIT 5
|
|
''')
|
|
top_results = c.fetchall()
|
|
|
|
# Active sweeps
|
|
c.execute('''
|
|
SELECT sweep_id, total_configs, configs_completed
|
|
FROM sweeps
|
|
WHERE completed_at IS NULL
|
|
''')
|
|
active_sweeps = c.fetchall()
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
'jobs': job_counts,
|
|
'top_results': top_results,
|
|
'active_sweeps': active_sweeps
|
|
}
|
|
|
|
def run_forever(self):
|
|
"""Run coordinator loop indefinitely."""
|
|
print("🚀 Starting Distributed Sweep Coordinator")
|
|
print("=" * 80)
|
|
|
|
while True:
|
|
try:
|
|
# Dispatch new jobs
|
|
dispatched = self.dispatch_jobs()
|
|
|
|
# Collect completed results
|
|
self.collect_results()
|
|
|
|
# Show status
|
|
status = self.get_status()
|
|
|
|
print(f"\n📊 Status: {datetime.now().strftime('%H:%M:%S')}")
|
|
print(f" Queued: {status['jobs'].get('queued', 0)}")
|
|
print(f" Running: {status['jobs'].get('running', 0)}")
|
|
print(f" Completed: {status['jobs'].get('completed', 0)}")
|
|
|
|
if status['top_results']:
|
|
print(f"\n🏆 Top Result: ${status['top_results'][0][0]:.2f}/1k")
|
|
|
|
# Sleep before next cycle
|
|
time.sleep(60)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⏹️ Coordinator stopped")
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ Error in coordinator loop: {e}")
|
|
time.sleep(60)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Distributed Sweep Coordinator')
|
|
parser.add_argument('--sweep', choices=['comprehensive', 'custom'],
|
|
help='Start a new sweep')
|
|
parser.add_argument('--status', action='store_true',
|
|
help='Show current status')
|
|
parser.add_argument('--run', action='store_true',
|
|
help='Run coordinator loop')
|
|
|
|
args = parser.parse_args()
|
|
|
|
coordinator = DistributedCoordinator()
|
|
|
|
if args.status:
|
|
status = coordinator.get_status()
|
|
print("📊 SWEEP STATUS")
|
|
print("=" * 80)
|
|
print(f"Queued: {status['jobs'].get('queued', 0)}")
|
|
print(f"Running: {status['jobs'].get('running', 0)}")
|
|
print(f"Completed: {status['jobs'].get('completed', 0)}")
|
|
print()
|
|
|
|
if status['top_results']:
|
|
print("🏆 TOP 5 RESULTS:")
|
|
for i, (pnl, trades, wr, params) in enumerate(status['top_results'], 1):
|
|
print(f"{i}. ${pnl:.2f}/1k - {trades} trades @ {wr:.1f}% WR")
|
|
|
|
elif args.sweep:
|
|
# Calculate total configs (example for comprehensive)
|
|
# This should match your actual parameter grid
|
|
total_configs = 5 * 3 * 4 * 4 * 4 * 4 * 3 * 3 * 3 * 4 # ~691,200 configs
|
|
|
|
sweep_id = coordinator.create_sweep(args.sweep, total_configs)
|
|
coordinator.generate_job_chunks(sweep_id, total_configs)
|
|
|
|
print(f"✅ Sweep {sweep_id} created")
|
|
print(f" Total configs: {total_configs:,}")
|
|
print(f" Run 'python3 coordinator.py --run' to start processing")
|
|
|
|
elif args.run:
|
|
coordinator.run_forever()
|
|
|
|
else:
|
|
parser.print_help()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|