Files
trading_bot_v4/cluster/exploration_status.py
mindesbunister cc56b72df2 fix: Database-first cluster status detection + Stop button clarification
CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST

Changes:
1. app/api/cluster/status/route.ts:
   - Query exploration database before SSH detection
   - If running chunks exist, mark workers 'active' even if SSH fails
   - Override worker status: 'offline' → 'active' when chunks running
   - Log: ' Cluster status: ACTIVE (database shows running chunks)'
   - Database is source of truth, SSH only for supplementary metrics

2. app/cluster/page.tsx:
   - Stop button ALREADY EXISTS (conditionally shown)
   - Shows Start when status='idle', Stop when status='active'
   - No code changes needed - fixed by status detection

Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues

Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
2025-11-30 22:23:01 +01:00

184 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Exploration Status Monitor
Shows real-time progress of distributed parameter sweep across EPYC servers.
"""
import sqlite3
import subprocess
from pathlib import Path
from datetime import datetime
from typing import Dict, List
CLUSTER_DIR = Path(__file__).parent
DB_PATH = CLUSTER_DIR / 'exploration.db'
WORKERS = {
'worker1': {'host': 'root@10.10.254.106'},
'worker2': {'host': 'root@10.20.254.100', 'ssh_hop': 'root@10.10.254.106'},
}
def check_worker_status(worker_id: str) -> Dict:
"""Check if worker has active processes"""
worker = WORKERS[worker_id]
if 'ssh_hop' in worker:
cmd = f"ssh {worker['ssh_hop']} ssh {worker['host']} 'pgrep -f distributed_worker.py | wc -l'"
else:
cmd = f"ssh {worker['host']} 'pgrep -f distributed_worker.py | wc -l'"
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
active_processes = int(result.stdout.strip())
return {'status': 'running' if active_processes > 0 else 'idle', 'processes': active_processes}
except Exception as e:
return {'status': 'unreachable', 'error': str(e)}
def print_status():
"""Print comprehensive status"""
if not DB_PATH.exists():
print("❌ Database not found. Run distributed_coordinator.py first.")
return
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
print("=" * 80)
print("📊 DISTRIBUTED EXPLORATION STATUS")
print("=" * 80)
print()
# Worker status
print("🖥️ WORKER STATUS:")
print()
for worker_id in WORKERS.keys():
status = check_worker_status(worker_id)
status_emoji = "🟢" if status['status'] == 'running' else ""
print(f" {status_emoji} {worker_id}: {status['status']}", end="")
if status['status'] == 'running':
print(f" ({status['processes']} processes)")
else:
print()
print()
# Chunk progress
c.execute("SELECT status, COUNT(*) FROM chunks GROUP BY status")
chunk_stats = dict(c.fetchall())
total_chunks = sum(chunk_stats.values()) if chunk_stats else 0
completed = chunk_stats.get('completed', 0)
running = chunk_stats.get('running', 0)
pending = chunk_stats.get('pending', 0)
if total_chunks > 0:
print("📦 CHUNK PROGRESS:")
print()
print(f" Total chunks: {total_chunks:,}")
print(f" ✅ Completed: {completed:,} ({completed/total_chunks*100:.1f}%)")
print(f" 🔄 Running: {running:,}")
print(f" ⏸️ Pending: {pending:,}")
print()
# Completion estimate
c.execute("SELECT SUM(total_combos) FROM chunks")
total_combos = c.fetchone()[0] or 0
c.execute("SELECT SUM(total_combos) FROM chunks WHERE status='completed'")
completed_combos = c.fetchone()[0] or 0
if total_combos > 0:
pct_complete = (completed_combos / total_combos) * 100
print(f" 📊 Parameter space: {completed_combos:,} / {total_combos:,} tested ({pct_complete:.1f}%)")
print()
# Strategy statistics
c.execute("SELECT COUNT(*) FROM strategies")
total_strategies = c.fetchone()[0]
if total_strategies > 0:
print("🎯 STRATEGY RESULTS:")
print()
print(f" Total tested: {total_strategies:,} unique configurations")
print()
# Top 10 strategies
c.execute('''
SELECT params_json, trades, win_rate, pnl_per_1k, profit_factor,
max_drawdown, tested_at
FROM strategies
WHERE trades >= 700
AND win_rate >= 0.50 AND win_rate <= 0.70
AND profit_factor >= 1.2
ORDER BY pnl_per_1k DESC
LIMIT 10
''')
top_strategies = c.fetchall()
if top_strategies:
print(" 🏆 TOP 10 STRATEGIES:")
print()
print(" " + "-" * 76)
print(f" {'Rank':<6} {'P&L/1k':<10} {'Trades':<8} {'WR%':<7} {'PF':<7} {'DD%':<8} {'Tested':<15}")
print(" " + "-" * 76)
for i, row in enumerate(top_strategies, 1):
params, trades, wr, pnl, pf, dd, tested = row
tested_date = datetime.fromisoformat(tested).strftime('%Y-%m-%d %H:%M')
print(f" {i:<6} ${pnl:<9.2f} {trades:<8} {wr*100:<6.1f}% {pf:<6.2f} {dd:<7.1f}% {tested_date}")
print(" " + "-" * 76)
print()
# Show best config details
best = top_strategies[0]
params_json = best[0]
import json
params = json.loads(params_json)
print(" 💎 BEST CONFIGURATION:")
print()
print(f" P&L: ${best[3]:.2f} per $1k")
print(f" Trades: {best[1]}, Win Rate: {best[2]*100:.1f}%, Profit Factor: {best[4]:.2f}")
print(f" Max Drawdown: {best[5]:.1f}%")
print()
print(" Parameters:")
for key, value in params.items():
print(f" {key}: {value}")
else:
print(" ⏳ No validated strategies yet (need 700+ trades with realistic metrics)")
else:
print("⏳ No strategies tested yet")
print()
print("=" * 80)
conn.close()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description='Monitor distributed exploration progress')
parser.add_argument('--watch', action='store_true', help='Watch mode (refresh every 30s)')
args = parser.parse_args()
if args.watch:
import time
try:
while True:
print("\033[2J\033[H") # Clear screen
print_status()
print()
print("Press Ctrl+C to exit watch mode...")
time.sleep(30)
except KeyboardInterrupt:
print("\n\n👋 Exiting watch mode")
else:
print_status()
if __name__ == '__main__':
main()