#!/usr/bin/env python3 """ Real-time web dashboard for distributed parameter exploration cluster. Shows worker status, chunk progress, top strategies, and system metrics. """ from flask import Flask, render_template_string import sqlite3 import subprocess import time from datetime import datetime app = Flask(__name__) WORKERS = { 'worker1': { 'host': 'root@10.10.254.106', 'cores': 32, 'name': 'Worker1' }, 'worker2': { 'host': 'root@10.20.254.100', 'ssh_hop': 'root@10.10.254.106', 'cores': 32, 'name': 'Worker2 (via hop)' } } HTML_TEMPLATE = """ Cluster Dashboard

🚀 Parameter Exploration Cluster

Real-time distributed backtesting dashboard

📊 Exploration Progress

{{ progress_pct }}%
{{ tested_combos:,}} / {{ total_combos:,}}
Total Chunks {{ total_chunks }}
Completed {{ completed_chunks }} ({{ completed_pct }}%)
Running {{ running_chunks }}
Pending {{ pending_chunks }}
Est. Completion {{ est_hours }}h remaining

🖥️ Worker Status

{% for worker_id, worker_data in workers.items() %}
{{ worker_data.name }}
{{ worker_data.status_text }}
CPU Usage {{ worker_data.cpu }}%
Processes {{ worker_data.processes }}
Active Chunks {{ worker_data.active_chunks }}
{% endfor %}

🏆 Top 10 Strategies

{% if top_strategies %} {% for strat in top_strategies %} {% endfor %}
Rank Parameters PnL per 1k Win Rate Profit Factor Trades
#{{ loop.index }} flip={{ strat.flip_threshold }}, gap={{ strat.ma_gap }}, adx={{ strat.momentum_adx }}, pos={{ strat.momentum_long_pos }}/{{ strat.momentum_short_pos }} ${{ "%.2f"|format(strat.pnl_per_1k) }} {{ "%.1f"|format(strat.win_rate) }}% {{ "%.2f"|format(strat.profit_factor) }} {{ strat.total_trades }}
{% else %}

⏳ Processing combinations... Results will appear when chunks complete.
First chunk running now - check back in a few minutes!

{% endif %}
Last updated: {{ timestamp }}
Auto-refreshes every 30 seconds
""" def get_worker_status(worker_id): """Get real-time status from a worker via SSH.""" worker = WORKERS[worker_id] try: # Get CPU usage if 'ssh_hop' in worker: cpu_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"top -bn1 | grep Cpu\"'" else: cpu_cmd = f"ssh {worker['host']} 'top -bn1 | grep Cpu'" cpu_result = subprocess.run(cpu_cmd, shell=True, capture_output=True, text=True, timeout=5) cpu_line = cpu_result.stdout.strip() # Parse: %Cpu(s): 90.1 us, ... if 'Cpu' in cpu_line: cpu_pct = float(cpu_line.split(':')[1].split('us')[0].strip()) else: cpu_pct = 0.0 # Get process count if 'ssh_hop' in worker: proc_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"ps aux | grep chunk_v9 | grep python | wc -l\"'" else: proc_cmd = f"ssh {worker['host']} 'ps aux | grep chunk_v9 | grep python | wc -l'" proc_result = subprocess.run(proc_cmd, shell=True, capture_output=True, text=True, timeout=5) processes = int(proc_result.stdout.strip()) # Determine status if processes > 0 and cpu_pct > 50: status = 'running' status_text = 'RUNNING' elif processes > 0: status = 'running' status_text = 'STARTING' else: status = 'idle' status_text = 'IDLE' return { 'name': worker.get('name', worker_id), 'cpu': round(cpu_pct, 1), 'processes': processes, 'status': status, 'status_text': status_text, 'active_chunks': 0 # Will be filled from DB } except Exception as e: return { 'name': worker.get('name', worker_id), 'cpu': 0.0, 'processes': 0, 'status': 'idle', 'status_text': 'ERROR', 'active_chunks': 0 } @app.route('/') def dashboard(): """Render the dashboard.""" # Connect to database conn = sqlite3.connect('exploration.db') c = conn.cursor() # Get chunk statistics c.execute("SELECT COUNT(*), status FROM chunks GROUP BY status") chunk_stats = {row[1]: row[0] for row in c.fetchall()} total_chunks = sum(chunk_stats.values()) completed_chunks = chunk_stats.get('completed', 0) running_chunks = chunk_stats.get('running', 0) pending_chunks = chunk_stats.get('pending', 0) # Get active chunks per worker c.execute("SELECT assigned_worker, COUNT(*) FROM chunks WHERE status='running' GROUP BY assigned_worker") active_per_worker = {row[0]: row[1] for row in c.fetchall()} # Get total combinations from chunks table c.execute("SELECT SUM(total_combos) FROM chunks") total_combos_result = c.fetchone()[0] total_combos = total_combos_result if total_combos_result else 4096 # Get tested strategies count (if strategies table exists) try: c.execute("SELECT COUNT(*) FROM strategies") tested_combos = c.fetchone()[0] except sqlite3.OperationalError: tested_combos = 0 # Get top strategies (if table exists) top_strategies = [] try: c.execute(""" SELECT * FROM strategies WHERE total_trades >= 700 ORDER BY pnl_per_1k DESC LIMIT 10 """) columns = [desc[0] for desc in c.description] for row in c.fetchall(): strat = dict(zip(columns, row)) top_strategies.append(strat) except sqlite3.OperationalError: pass # Table doesn't exist yet conn.close() # Calculate progress progress_pct = round((tested_combos / total_combos) * 100, 2) if total_combos > 0 else 0 completed_pct = round((completed_chunks / total_chunks) * 100, 1) if total_chunks > 0 else 0 # Estimate time remaining if completed_chunks > 0 and running_chunks > 0: avg_time_per_chunk = 1.5 # hours (rough estimate) est_hours = round(pending_chunks * avg_time_per_chunk / max(running_chunks, 1), 1) else: est_hours = "N/A" # Get worker status workers = {} for worker_id in WORKERS.keys(): worker_data = get_worker_status(worker_id) worker_data['active_chunks'] = active_per_worker.get(worker_id, 0) workers[worker_id] = worker_data # Render template return render_template_string( HTML_TEMPLATE, progress_pct=progress_pct, tested_combos=tested_combos, total_combos=total_combos, total_chunks=total_chunks, completed_chunks=completed_chunks, completed_pct=completed_pct, running_chunks=running_chunks, pending_chunks=pending_chunks, est_hours=est_hours, workers=workers, top_strategies=top_strategies, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) if __name__ == '__main__': print("🌐 Starting web dashboard on http://0.0.0.0:5000") print(" Access from any browser on your network") print(" Auto-refreshes every 30 seconds") print() app.run(host='0.0.0.0', port=5000, debug=False)