Files
trading_bot_v4/distributed_sweep/coordinator.py
mindesbunister cc56b72df2 fix: Database-first cluster status detection + Stop button clarification
CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST

Changes:
1. app/api/cluster/status/route.ts:
   - Query exploration database before SSH detection
   - If running chunks exist, mark workers 'active' even if SSH fails
   - Override worker status: 'offline' → 'active' when chunks running
   - Log: ' Cluster status: ACTIVE (database shows running chunks)'
   - Database is source of truth, SSH only for supplementary metrics

2. app/cluster/page.tsx:
   - Stop button ALREADY EXISTS (conditionally shown)
   - Shows Start when status='idle', Stop when status='active'
   - No code changes needed - fixed by status detection

Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues

Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
2025-11-30 22:23:01 +01:00

420 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Distributed Sweep Coordinator for 2 EPYC Servers
Splits comprehensive parameter sweeps across both EPYC servers
to maximize throughput. Works with existing backtester infrastructure.
Architecture:
- Coordinator runs on local machine
- Generates job chunks (1000 configs each)
- Distributes chunks to both EPYC servers via SSH
- Collects results and aggregates into master CSV
- Runs 24/7 to continuously test new indicator combinations
Usage:
python3 coordinator.py --sweep comprehensive # Run full sweep
python3 coordinator.py --sweep custom --params params.json
python3 coordinator.py --status # Check progress
"""
import subprocess
import json
import time
import sqlite3
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional, Tuple
import argparse
class DistributedCoordinator:
"""Coordinates distributed backtesting across 2 EPYC servers."""
def __init__(self):
self.worker1 = "root@10.10.254.106"
self.worker2 = "root@10.20.254.100" # Via worker1
self.remote_path = "/home/comprehensive_sweep/backtester"
self.db_path = Path(__file__).parent / "sweep_results.db"
self.chunk_size = 1000 # Configs per job chunk
self._init_database()
def _init_database(self):
"""Initialize SQLite database for tracking progress."""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Jobs table
c.execute('''
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
worker TEXT,
status TEXT, -- queued, running, completed, failed
chunk_start INTEGER,
chunk_end INTEGER,
configs_tested INTEGER,
created_at TEXT,
started_at TEXT,
completed_at TEXT
)
''')
# Results table
c.execute('''
CREATE TABLE IF NOT EXISTS results (
config_id INTEGER PRIMARY KEY,
trades INTEGER,
win_rate REAL,
total_pnl REAL,
pnl_per_1k REAL,
params TEXT, -- JSON
tested_at TEXT
)
''')
# Sweep metadata
c.execute('''
CREATE TABLE IF NOT EXISTS sweeps (
sweep_id TEXT PRIMARY KEY,
sweep_type TEXT,
total_configs INTEGER,
configs_completed INTEGER,
best_pnl_per_1k REAL,
started_at TEXT,
completed_at TEXT
)
''')
conn.commit()
conn.close()
def create_sweep(self, sweep_type: str, total_configs: int) -> str:
"""Create a new sweep job."""
sweep_id = f"sweep_{sweep_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
INSERT INTO sweeps (sweep_id, sweep_type, total_configs,
configs_completed, started_at)
VALUES (?, ?, ?, 0, ?)
''', (sweep_id, sweep_type, total_configs, datetime.now().isoformat()))
conn.commit()
conn.close()
return sweep_id
def generate_job_chunks(self, sweep_id: str, total_configs: int):
"""Split sweep into job chunks for distribution."""
num_chunks = (total_configs + self.chunk_size - 1) // self.chunk_size
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
for i in range(num_chunks):
chunk_start = i * self.chunk_size
chunk_end = min((i + 1) * self.chunk_size, total_configs)
job_id = f"{sweep_id}_chunk_{i:04d}"
worker = self.worker1 if i % 2 == 0 else self.worker2
c.execute('''
INSERT INTO jobs (job_id, worker, status, chunk_start,
chunk_end, created_at)
VALUES (?, ?, 'queued', ?, ?, ?)
''', (job_id, worker, chunk_start, chunk_end,
datetime.now().isoformat()))
conn.commit()
conn.close()
print(f"✅ Created {num_chunks} job chunks")
print(f" Worker 1: {num_chunks // 2} chunks")
print(f" Worker 2: {num_chunks - num_chunks // 2} chunks")
def dispatch_jobs(self):
"""Dispatch queued jobs to workers."""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Get queued jobs
c.execute('''
SELECT job_id, worker, chunk_start, chunk_end
FROM jobs
WHERE status = 'queued'
LIMIT 10
''')
jobs = c.fetchall()
conn.close()
if not jobs:
return 0
for job_id, worker, chunk_start, chunk_end in jobs:
self._start_job_on_worker(job_id, worker, chunk_start, chunk_end)
return len(jobs)
def _start_job_on_worker(self, job_id: str, worker: str,
chunk_start: int, chunk_end: int):
"""Start a job on specified worker."""
try:
# Create job file on worker
job_spec = {
'job_id': job_id,
'chunk_start': chunk_start,
'chunk_end': chunk_end,
'timestamp': datetime.now().isoformat()
}
job_json = json.dumps(job_spec)
# Write job file
if worker == self.worker2:
# Two-hop for worker2
cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'echo \\'{job_json}\\' > {self.remote_path}/jobs/{job_id}.json'\""
else:
cmd = f"ssh {worker} \"echo '{job_json}' > {self.remote_path}/jobs/{job_id}.json\""
subprocess.run(cmd, shell=True, check=True)
# Update job status
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
UPDATE jobs
SET status = 'running', started_at = ?
WHERE job_id = ?
''', (datetime.now().isoformat(), job_id))
conn.commit()
conn.close()
print(f"✅ Started job {job_id} on {worker}")
except Exception as e:
print(f"❌ Failed to start job {job_id}: {e}")
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
UPDATE jobs SET status = 'failed' WHERE job_id = ?
''', (job_id,))
conn.commit()
conn.close()
def collect_results(self):
"""Collect completed results from workers."""
for worker in [self.worker1, self.worker2]:
self._collect_from_worker(worker)
def _collect_from_worker(self, worker: str):
"""Collect results from a specific worker."""
try:
# List completed result files
if worker == self.worker2:
cmd = f"ssh {self.worker1} \"ssh {self.worker2} 'ls {self.remote_path}/results/*.json 2>/dev/null'\""
else:
cmd = f"ssh {worker} \"ls {self.remote_path}/results/*.json 2>/dev/null\""
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
return
result_files = result.stdout.strip().split('\n')
for result_file in result_files:
if not result_file:
continue
# Extract job_id from filename
job_id = Path(result_file).stem
# Copy result file locally
local_file = Path(f"/tmp/{job_id}.json")
if worker == self.worker2:
subprocess.run(
f"ssh {self.worker1} \"scp {self.worker2}:{result_file} /tmp/ && scp {self.worker1}:/tmp/{job_id}.json {local_file}\"",
shell=True, check=True
)
else:
subprocess.run(
f"scp {worker}:{result_file} {local_file}",
shell=True, check=True
)
# Parse and store results
with open(local_file) as f:
results = json.load(f)
self._store_results(job_id, results)
# Delete remote result file
if worker == self.worker2:
subprocess.run(
f"ssh {self.worker1} \"ssh {self.worker2} 'rm {result_file}'\"",
shell=True
)
else:
subprocess.run(f"ssh {worker} \"rm {result_file}\"", shell=True)
# Delete local temp file
local_file.unlink()
print(f"✅ Collected results from {job_id}")
except Exception as e:
print(f"❌ Error collecting from {worker}: {e}")
def _store_results(self, job_id: str, results: Dict):
"""Store results in database."""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Store each config result
for result in results['configs']:
c.execute('''
INSERT OR REPLACE INTO results
(config_id, trades, win_rate, total_pnl, pnl_per_1k,
params, tested_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
result['config_id'],
result['trades'],
result['win_rate'],
result['total_pnl'],
result['pnl_per_1k'],
json.dumps(result['params']),
datetime.now().isoformat()
))
# Update job status
c.execute('''
UPDATE jobs
SET status = 'completed',
configs_tested = ?,
completed_at = ?
WHERE job_id = ?
''', (len(results['configs']), datetime.now().isoformat(), job_id))
conn.commit()
conn.close()
def get_status(self) -> Dict:
"""Get current sweep status."""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Job counts
c.execute('SELECT status, COUNT(*) FROM jobs GROUP BY status')
job_counts = dict(c.fetchall())
# Top results
c.execute('''
SELECT pnl_per_1k, trades, win_rate, params
FROM results
ORDER BY pnl_per_1k DESC
LIMIT 5
''')
top_results = c.fetchall()
# Active sweeps
c.execute('''
SELECT sweep_id, total_configs, configs_completed
FROM sweeps
WHERE completed_at IS NULL
''')
active_sweeps = c.fetchall()
conn.close()
return {
'jobs': job_counts,
'top_results': top_results,
'active_sweeps': active_sweeps
}
def run_forever(self):
"""Run coordinator loop indefinitely."""
print("🚀 Starting Distributed Sweep Coordinator")
print("=" * 80)
while True:
try:
# Dispatch new jobs
dispatched = self.dispatch_jobs()
# Collect completed results
self.collect_results()
# Show status
status = self.get_status()
print(f"\n📊 Status: {datetime.now().strftime('%H:%M:%S')}")
print(f" Queued: {status['jobs'].get('queued', 0)}")
print(f" Running: {status['jobs'].get('running', 0)}")
print(f" Completed: {status['jobs'].get('completed', 0)}")
if status['top_results']:
print(f"\n🏆 Top Result: ${status['top_results'][0][0]:.2f}/1k")
# Sleep before next cycle
time.sleep(60)
except KeyboardInterrupt:
print("\n\n⏹️ Coordinator stopped")
break
except Exception as e:
print(f"❌ Error in coordinator loop: {e}")
time.sleep(60)
def main():
parser = argparse.ArgumentParser(description='Distributed Sweep Coordinator')
parser.add_argument('--sweep', choices=['comprehensive', 'custom'],
help='Start a new sweep')
parser.add_argument('--status', action='store_true',
help='Show current status')
parser.add_argument('--run', action='store_true',
help='Run coordinator loop')
args = parser.parse_args()
coordinator = DistributedCoordinator()
if args.status:
status = coordinator.get_status()
print("📊 SWEEP STATUS")
print("=" * 80)
print(f"Queued: {status['jobs'].get('queued', 0)}")
print(f"Running: {status['jobs'].get('running', 0)}")
print(f"Completed: {status['jobs'].get('completed', 0)}")
print()
if status['top_results']:
print("🏆 TOP 5 RESULTS:")
for i, (pnl, trades, wr, params) in enumerate(status['top_results'], 1):
print(f"{i}. ${pnl:.2f}/1k - {trades} trades @ {wr:.1f}% WR")
elif args.sweep:
# Calculate total configs (example for comprehensive)
# This should match your actual parameter grid
total_configs = 5 * 3 * 4 * 4 * 4 * 4 * 3 * 3 * 3 * 4 # ~691,200 configs
sweep_id = coordinator.create_sweep(args.sweep, total_configs)
coordinator.generate_job_chunks(sweep_id, total_configs)
print(f"✅ Sweep {sweep_id} created")
print(f" Total configs: {total_configs:,}")
print(f" Run 'python3 coordinator.py --run' to start processing")
elif args.run:
coordinator.run_forever()
else:
parser.print_help()
if __name__ == '__main__':
main()