fix: Database-first cluster status detection + Stop button clarification
CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST
Changes:
1. app/api/cluster/status/route.ts:
- Query exploration database before SSH detection
- If running chunks exist, mark workers 'active' even if SSH fails
- Override worker status: 'offline' → 'active' when chunks running
- Log: '✅ Cluster status: ACTIVE (database shows running chunks)'
- Database is source of truth, SSH only for supplementary metrics
2. app/cluster/page.tsx:
- Stop button ALREADY EXISTS (conditionally shown)
- Shows Start when status='idle', Stop when status='active'
- No code changes needed - fixed by status detection
Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues
Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
This commit is contained in:
419
distributed_sweep/coordinator.py
Normal file
419
distributed_sweep/coordinator.py
Normal file
@@ -0,0 +1,419 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Distributed Sweep Coordinator for 2 EPYC Servers
|
||||
|
||||
Splits comprehensive parameter sweeps across both EPYC servers
|
||||
to maximize throughput. Works with existing backtester infrastructure.
|
||||
|
||||
Architecture:
|
||||
- Coordinator runs on local machine
|
||||
- Generates job chunks (1000 configs each)
|
||||
- Distributes chunks to both EPYC servers via SSH
|
||||
- Collects results and aggregates into master CSV
|
||||
- Runs 24/7 to continuously test new indicator combinations
|
||||
|
||||
Usage:
|
||||
python3 coordinator.py --sweep comprehensive # Run full sweep
|
||||
python3 coordinator.py --sweep custom --params params.json
|
||||
python3 coordinator.py --status # Check progress
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import time
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
import argparse
|
||||
|
||||
class DistributedCoordinator:
|
||||
"""Coordinates distributed backtesting across 2 EPYC servers."""
|
||||
|
||||
def __init__(self):
|
||||
self.worker1 = "root@10.10.254.106"
|
||||
self.worker2 = "root@10.20.254.100" # Via worker1
|
||||
self.remote_path = "/home/comprehensive_sweep/backtester"
|
||||
self.db_path = Path(__file__).parent / "sweep_results.db"
|
||||
self.chunk_size = 1000 # Configs per job chunk
|
||||
|
||||
self._init_database()
|
||||
|
||||
def _init_database(self):
|
||||
"""Initialize SQLite database for tracking progress."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
|
||||
# Jobs table
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
job_id TEXT PRIMARY KEY,
|
||||
worker TEXT,
|
||||
status TEXT, -- queued, running, completed, failed
|
||||
chunk_start INTEGER,
|
||||
chunk_end INTEGER,
|
||||
configs_tested INTEGER,
|
||||
created_at TEXT,
|
||||
started_at TEXT,
|
||||
completed_at TEXT
|
||||
)
|
||||
''')
|
||||
|
||||
# Results table
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS results (
|
||||
config_id INTEGER PRIMARY KEY,
|
||||
trades INTEGER,
|
||||
win_rate REAL,
|
||||
total_pnl REAL,
|
||||
pnl_per_1k REAL,
|
||||
params TEXT, -- JSON
|
||||
tested_at TEXT
|
||||
)
|
||||
''')
|
||||
|
||||
# Sweep metadata
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS sweeps (
|
||||
sweep_id TEXT PRIMARY KEY,
|
||||
sweep_type TEXT,
|
||||
total_configs INTEGER,
|
||||
configs_completed INTEGER,
|
||||
best_pnl_per_1k REAL,
|
||||
started_at TEXT,
|
||||
completed_at TEXT
|
||||
)
|
||||
''')
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def create_sweep(self, sweep_type: str, total_configs: int) -> str:
|
||||
"""Create a new sweep job."""
|
||||
sweep_id = f"sweep_{sweep_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
c.execute('''
|
||||
INSERT INTO sweeps (sweep_id, sweep_type, total_configs,
|
||||
configs_completed, started_at)
|
||||
VALUES (?, ?, ?, 0, ?)
|
||||
''', (sweep_id, sweep_type, total_configs, datetime.now().isoformat()))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return sweep_id
|
||||
|
||||
def generate_job_chunks(self, sweep_id: str, total_configs: int):
|
||||
"""Split sweep into job chunks for distribution."""
|
||||
num_chunks = (total_configs + self.chunk_size - 1) // self.chunk_size
|
||||
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
|
||||
for i in range(num_chunks):
|
||||
chunk_start = i * self.chunk_size
|
||||
chunk_end = min((i + 1) * self.chunk_size, total_configs)
|
||||
|
||||
job_id = f"{sweep_id}_chunk_{i:04d}"
|
||||
worker = self.worker1 if i % 2 == 0 else self.worker2
|
||||
|
||||
c.execute('''
|
||||
INSERT INTO jobs (job_id, worker, status, chunk_start,
|
||||
chunk_end, created_at)
|
||||
VALUES (?, ?, 'queued', ?, ?, ?)
|
||||
''', (job_id, worker, chunk_start, chunk_end,
|
||||
datetime.now().isoformat()))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
print(f"✅ Created {num_chunks} job chunks")
|
||||
print(f" Worker 1: {num_chunks // 2} chunks")
|
||||
print(f" Worker 2: {num_chunks - num_chunks // 2} chunks")
|
||||
|
||||
def dispatch_jobs(self):
|
||||
"""Dispatch queued jobs to workers."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
|
||||
# Get queued jobs
|
||||
c.execute('''
|
||||
SELECT job_id, worker, chunk_start, chunk_end
|
||||
FROM jobs
|
||||
WHERE status = 'queued'
|
||||
LIMIT 10
|
||||
''')
|
||||
|
||||
jobs = c.fetchall()
|
||||
conn.close()
|
||||
|
||||
if not jobs:
|
||||
return 0
|
||||
|
||||
for job_id, worker, chunk_start, chunk_end in jobs:
|
||||
self._start_job_on_worker(job_id, worker, chunk_start, chunk_end)
|
||||
|
||||
return len(jobs)
|
||||
|
||||
def _start_job_on_worker(self, job_id: str, worker: str,
|
||||
chunk_start: int, chunk_end: int):
|
||||
"""Start a job on specified worker."""
|
||||
try:
|
||||
# Create job file on worker
|
||||
job_spec = {
|
||||
'job_id': job_id,
|
||||
'chunk_start': chunk_start,
|
||||
'chunk_end': chunk_end,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
job_json = json.dumps(job_spec)
|
||||
|
||||
# Write job file
|
||||
if worker == self.worker2:
|
||||
# Two-hop for worker2
|
||||
cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'echo \\'{job_json}\\' > {self.remote_path}/jobs/{job_id}.json'\""
|
||||
else:
|
||||
cmd = f"ssh {worker} \"echo '{job_json}' > {self.remote_path}/jobs/{job_id}.json\""
|
||||
|
||||
subprocess.run(cmd, shell=True, check=True)
|
||||
|
||||
# Update job status
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
c.execute('''
|
||||
UPDATE jobs
|
||||
SET status = 'running', started_at = ?
|
||||
WHERE job_id = ?
|
||||
''', (datetime.now().isoformat(), job_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
print(f"✅ Started job {job_id} on {worker}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to start job {job_id}: {e}")
|
||||
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
c.execute('''
|
||||
UPDATE jobs SET status = 'failed' WHERE job_id = ?
|
||||
''', (job_id,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def collect_results(self):
|
||||
"""Collect completed results from workers."""
|
||||
for worker in [self.worker1, self.worker2]:
|
||||
self._collect_from_worker(worker)
|
||||
|
||||
def _collect_from_worker(self, worker: str):
|
||||
"""Collect results from a specific worker."""
|
||||
try:
|
||||
# List completed result files
|
||||
if worker == self.worker2:
|
||||
cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'ls {self.remote_path}/results/*.json 2>/dev/null'\""
|
||||
else:
|
||||
cmd = f"ssh {worker} \"ls {self.remote_path}/results/*.json 2>/dev/null\""
|
||||
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
return
|
||||
|
||||
result_files = result.stdout.strip().split('\n')
|
||||
|
||||
for result_file in result_files:
|
||||
if not result_file:
|
||||
continue
|
||||
|
||||
# Extract job_id from filename
|
||||
job_id = Path(result_file).stem
|
||||
|
||||
# Copy result file locally
|
||||
local_file = Path(f"/tmp/{job_id}.json")
|
||||
|
||||
if worker == self.worker2:
|
||||
subprocess.run(
|
||||
f"ssh {self.worker1} \"scp {self.worker2}:{result_file} /tmp/ && scp {self.worker1}:/tmp/{job_id}.json {local_file}\"",
|
||||
shell=True, check=True
|
||||
)
|
||||
else:
|
||||
subprocess.run(
|
||||
f"scp {worker}:{result_file} {local_file}",
|
||||
shell=True, check=True
|
||||
)
|
||||
|
||||
# Parse and store results
|
||||
with open(local_file) as f:
|
||||
results = json.load(f)
|
||||
|
||||
self._store_results(job_id, results)
|
||||
|
||||
# Delete remote result file
|
||||
if worker == self.worker2:
|
||||
subprocess.run(
|
||||
f"ssh {self.worker1} \"ssh {self.worker2} 'rm {result_file}'\"",
|
||||
shell=True
|
||||
)
|
||||
else:
|
||||
subprocess.run(f"ssh {worker} \"rm {result_file}\"", shell=True)
|
||||
|
||||
# Delete local temp file
|
||||
local_file.unlink()
|
||||
|
||||
print(f"✅ Collected results from {job_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error collecting from {worker}: {e}")
|
||||
|
||||
def _store_results(self, job_id: str, results: Dict):
|
||||
"""Store results in database."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
|
||||
# Store each config result
|
||||
for result in results['configs']:
|
||||
c.execute('''
|
||||
INSERT OR REPLACE INTO results
|
||||
(config_id, trades, win_rate, total_pnl, pnl_per_1k,
|
||||
params, tested_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
''', (
|
||||
result['config_id'],
|
||||
result['trades'],
|
||||
result['win_rate'],
|
||||
result['total_pnl'],
|
||||
result['pnl_per_1k'],
|
||||
json.dumps(result['params']),
|
||||
datetime.now().isoformat()
|
||||
))
|
||||
|
||||
# Update job status
|
||||
c.execute('''
|
||||
UPDATE jobs
|
||||
SET status = 'completed',
|
||||
configs_tested = ?,
|
||||
completed_at = ?
|
||||
WHERE job_id = ?
|
||||
''', (len(results['configs']), datetime.now().isoformat(), job_id))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def get_status(self) -> Dict:
|
||||
"""Get current sweep status."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
c = conn.cursor()
|
||||
|
||||
# Job counts
|
||||
c.execute('SELECT status, COUNT(*) FROM jobs GROUP BY status')
|
||||
job_counts = dict(c.fetchall())
|
||||
|
||||
# Top results
|
||||
c.execute('''
|
||||
SELECT pnl_per_1k, trades, win_rate, params
|
||||
FROM results
|
||||
ORDER BY pnl_per_1k DESC
|
||||
LIMIT 5
|
||||
''')
|
||||
top_results = c.fetchall()
|
||||
|
||||
# Active sweeps
|
||||
c.execute('''
|
||||
SELECT sweep_id, total_configs, configs_completed
|
||||
FROM sweeps
|
||||
WHERE completed_at IS NULL
|
||||
''')
|
||||
active_sweeps = c.fetchall()
|
||||
|
||||
conn.close()
|
||||
|
||||
return {
|
||||
'jobs': job_counts,
|
||||
'top_results': top_results,
|
||||
'active_sweeps': active_sweeps
|
||||
}
|
||||
|
||||
def run_forever(self):
|
||||
"""Run coordinator loop indefinitely."""
|
||||
print("🚀 Starting Distributed Sweep Coordinator")
|
||||
print("=" * 80)
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Dispatch new jobs
|
||||
dispatched = self.dispatch_jobs()
|
||||
|
||||
# Collect completed results
|
||||
self.collect_results()
|
||||
|
||||
# Show status
|
||||
status = self.get_status()
|
||||
|
||||
print(f"\n📊 Status: {datetime.now().strftime('%H:%M:%S')}")
|
||||
print(f" Queued: {status['jobs'].get('queued', 0)}")
|
||||
print(f" Running: {status['jobs'].get('running', 0)}")
|
||||
print(f" Completed: {status['jobs'].get('completed', 0)}")
|
||||
|
||||
if status['top_results']:
|
||||
print(f"\n🏆 Top Result: ${status['top_results'][0][0]:.2f}/1k")
|
||||
|
||||
# Sleep before next cycle
|
||||
time.sleep(60)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n⏹️ Coordinator stopped")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"❌ Error in coordinator loop: {e}")
|
||||
time.sleep(60)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Distributed Sweep Coordinator')
|
||||
parser.add_argument('--sweep', choices=['comprehensive', 'custom'],
|
||||
help='Start a new sweep')
|
||||
parser.add_argument('--status', action='store_true',
|
||||
help='Show current status')
|
||||
parser.add_argument('--run', action='store_true',
|
||||
help='Run coordinator loop')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
coordinator = DistributedCoordinator()
|
||||
|
||||
if args.status:
|
||||
status = coordinator.get_status()
|
||||
print("📊 SWEEP STATUS")
|
||||
print("=" * 80)
|
||||
print(f"Queued: {status['jobs'].get('queued', 0)}")
|
||||
print(f"Running: {status['jobs'].get('running', 0)}")
|
||||
print(f"Completed: {status['jobs'].get('completed', 0)}")
|
||||
print()
|
||||
|
||||
if status['top_results']:
|
||||
print("🏆 TOP 5 RESULTS:")
|
||||
for i, (pnl, trades, wr, params) in enumerate(status['top_results'], 1):
|
||||
print(f"{i}. ${pnl:.2f}/1k - {trades} trades @ {wr:.1f}% WR")
|
||||
|
||||
elif args.sweep:
|
||||
# Calculate total configs (example for comprehensive)
|
||||
# This should match your actual parameter grid
|
||||
total_configs = 5 * 3 * 4 * 4 * 4 * 4 * 3 * 3 * 3 * 4 # ~691,200 configs
|
||||
|
||||
sweep_id = coordinator.create_sweep(args.sweep, total_configs)
|
||||
coordinator.generate_job_chunks(sweep_id, total_configs)
|
||||
|
||||
print(f"✅ Sweep {sweep_id} created")
|
||||
print(f" Total configs: {total_configs:,}")
|
||||
print(f" Run 'python3 coordinator.py --run' to start processing")
|
||||
|
||||
elif args.run:
|
||||
coordinator.run_forever()
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user