#!/usr/bin/env python3 """ Exploration Status Monitor Shows real-time progress of distributed parameter sweep across EPYC servers. """ import sqlite3 import subprocess from pathlib import Path from datetime import datetime from typing import Dict, List CLUSTER_DIR = Path(__file__).parent DB_PATH = CLUSTER_DIR / 'exploration.db' WORKERS = { 'worker1': {'host': 'root@10.10.254.106'}, 'worker2': {'host': 'root@10.20.254.100', 'ssh_hop': 'root@10.10.254.106'}, } def check_worker_status(worker_id: str) -> Dict: """Check if worker has active processes""" worker = WORKERS[worker_id] if 'ssh_hop' in worker: cmd = f"ssh {worker['ssh_hop']} ssh {worker['host']} 'pgrep -f distributed_worker.py | wc -l'" else: cmd = f"ssh {worker['host']} 'pgrep -f distributed_worker.py | wc -l'" try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5) active_processes = int(result.stdout.strip()) return {'status': 'running' if active_processes > 0 else 'idle', 'processes': active_processes} except Exception as e: return {'status': 'unreachable', 'error': str(e)} def print_status(): """Print comprehensive status""" if not DB_PATH.exists(): print("❌ Database not found. Run distributed_coordinator.py first.") return conn = sqlite3.connect(DB_PATH) c = conn.cursor() print("=" * 80) print("πŸ“Š DISTRIBUTED EXPLORATION STATUS") print("=" * 80) print() # Worker status print("πŸ–₯️ WORKER STATUS:") print() for worker_id in WORKERS.keys(): status = check_worker_status(worker_id) status_emoji = "🟒" if status['status'] == 'running' else "βšͺ" print(f" {status_emoji} {worker_id}: {status['status']}", end="") if status['status'] == 'running': print(f" ({status['processes']} processes)") else: print() print() # Chunk progress c.execute("SELECT status, COUNT(*) FROM chunks GROUP BY status") chunk_stats = dict(c.fetchall()) total_chunks = sum(chunk_stats.values()) if chunk_stats else 0 completed = chunk_stats.get('completed', 0) running = chunk_stats.get('running', 0) pending = chunk_stats.get('pending', 0) if total_chunks > 0: print("πŸ“¦ CHUNK PROGRESS:") print() print(f" Total chunks: {total_chunks:,}") print(f" βœ… Completed: {completed:,} ({completed/total_chunks*100:.1f}%)") print(f" πŸ”„ Running: {running:,}") print(f" ⏸️ Pending: {pending:,}") print() # Completion estimate c.execute("SELECT SUM(total_combos) FROM chunks") total_combos = c.fetchone()[0] or 0 c.execute("SELECT SUM(total_combos) FROM chunks WHERE status='completed'") completed_combos = c.fetchone()[0] or 0 if total_combos > 0: pct_complete = (completed_combos / total_combos) * 100 print(f" πŸ“Š Parameter space: {completed_combos:,} / {total_combos:,} tested ({pct_complete:.1f}%)") print() # Strategy statistics c.execute("SELECT COUNT(*) FROM strategies") total_strategies = c.fetchone()[0] if total_strategies > 0: print("🎯 STRATEGY RESULTS:") print() print(f" Total tested: {total_strategies:,} unique configurations") print() # Top 10 strategies c.execute(''' SELECT params_json, trades, win_rate, pnl_per_1k, profit_factor, max_drawdown, tested_at FROM strategies WHERE trades >= 700 AND win_rate >= 0.50 AND win_rate <= 0.70 AND profit_factor >= 1.2 ORDER BY pnl_per_1k DESC LIMIT 10 ''') top_strategies = c.fetchall() if top_strategies: print(" πŸ† TOP 10 STRATEGIES:") print() print(" " + "-" * 76) print(f" {'Rank':<6} {'P&L/1k':<10} {'Trades':<8} {'WR%':<7} {'PF':<7} {'DD%':<8} {'Tested':<15}") print(" " + "-" * 76) for i, row in enumerate(top_strategies, 1): params, trades, wr, pnl, pf, dd, tested = row tested_date = datetime.fromisoformat(tested).strftime('%Y-%m-%d %H:%M') print(f" {i:<6} ${pnl:<9.2f} {trades:<8} {wr*100:<6.1f}% {pf:<6.2f} {dd:<7.1f}% {tested_date}") print(" " + "-" * 76) print() # Show best config details best = top_strategies[0] params_json = best[0] import json params = json.loads(params_json) print(" πŸ’Ž BEST CONFIGURATION:") print() print(f" P&L: ${best[3]:.2f} per $1k") print(f" Trades: {best[1]}, Win Rate: {best[2]*100:.1f}%, Profit Factor: {best[4]:.2f}") print(f" Max Drawdown: {best[5]:.1f}%") print() print(" Parameters:") for key, value in params.items(): print(f" {key}: {value}") else: print(" ⏳ No validated strategies yet (need 700+ trades with realistic metrics)") else: print("⏳ No strategies tested yet") print() print("=" * 80) conn.close() def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description='Monitor distributed exploration progress') parser.add_argument('--watch', action='store_true', help='Watch mode (refresh every 30s)') args = parser.parse_args() if args.watch: import time try: while True: print("\033[2J\033[H") # Clear screen print_status() print() print("Press Ctrl+C to exit watch mode...") time.sleep(30) except KeyboardInterrupt: print("\n\nπŸ‘‹ Exiting watch mode") else: print_status() if __name__ == '__main__': main()