#!/usr/bin/env python3
"""
Real-time web dashboard for distributed parameter exploration cluster.
Shows worker status, chunk progress, top strategies, and system metrics.
"""
from flask import Flask, render_template_string
import sqlite3
import subprocess
import time
from datetime import datetime
app = Flask(__name__)
WORKERS = {
'worker1': {
'host': 'root@10.10.254.106',
'cores': 32,
'name': 'Worker1'
},
'worker2': {
'host': 'root@10.20.254.100',
'ssh_hop': 'root@10.10.254.106',
'cores': 32,
'name': 'Worker2 (via hop)'
}
}
HTML_TEMPLATE = """
Cluster Dashboard
🚀 Parameter Exploration Cluster
Real-time distributed backtesting dashboard
📊 Exploration Progress
{{ progress_pct }}%
{{ tested_combos:,}} / {{ total_combos:,}}
Total Chunks
{{ total_chunks }}
Completed
{{ completed_chunks }} ({{ completed_pct }}%)
Running
{{ running_chunks }}
Pending
{{ pending_chunks }}
Est. Completion
{{ est_hours }}h remaining
🖥️ Worker Status
{% for worker_id, worker_data in workers.items() %}
CPU Usage
{{ worker_data.cpu }}%
Processes
{{ worker_data.processes }}
Active Chunks
{{ worker_data.active_chunks }}
{% endfor %}
🏆 Top 10 Strategies
{% if top_strategies %}
| Rank |
Parameters |
PnL per 1k |
Win Rate |
Profit Factor |
Trades |
{% for strat in top_strategies %}
| #{{ loop.index }} |
flip={{ strat.flip_threshold }},
gap={{ strat.ma_gap }},
adx={{ strat.momentum_adx }},
pos={{ strat.momentum_long_pos }}/{{ strat.momentum_short_pos }}
|
${{ "%.2f"|format(strat.pnl_per_1k) }}
|
{{ "%.1f"|format(strat.win_rate) }}% |
{{ "%.2f"|format(strat.profit_factor) }} |
{{ strat.total_trades }} |
{% endfor %}
{% else %}
⏳ Processing combinations... Results will appear when chunks complete.
First chunk running now - check back in a few minutes!
{% endif %}
Last updated: {{ timestamp }}
Auto-refreshes every 30 seconds
"""
def get_worker_status(worker_id):
"""Get real-time status from a worker via SSH."""
worker = WORKERS[worker_id]
try:
# Get CPU usage
if 'ssh_hop' in worker:
cpu_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"top -bn1 | grep Cpu\"'"
else:
cpu_cmd = f"ssh {worker['host']} 'top -bn1 | grep Cpu'"
cpu_result = subprocess.run(cpu_cmd, shell=True, capture_output=True, text=True, timeout=5)
cpu_line = cpu_result.stdout.strip()
# Parse: %Cpu(s): 90.1 us, ...
if 'Cpu' in cpu_line:
cpu_pct = float(cpu_line.split(':')[1].split('us')[0].strip())
else:
cpu_pct = 0.0
# Get process count
if 'ssh_hop' in worker:
proc_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"ps aux | grep chunk_v9 | grep python | wc -l\"'"
else:
proc_cmd = f"ssh {worker['host']} 'ps aux | grep chunk_v9 | grep python | wc -l'"
proc_result = subprocess.run(proc_cmd, shell=True, capture_output=True, text=True, timeout=5)
processes = int(proc_result.stdout.strip())
# Determine status
if processes > 0 and cpu_pct > 50:
status = 'running'
status_text = 'RUNNING'
elif processes > 0:
status = 'running'
status_text = 'STARTING'
else:
status = 'idle'
status_text = 'IDLE'
return {
'name': worker.get('name', worker_id),
'cpu': round(cpu_pct, 1),
'processes': processes,
'status': status,
'status_text': status_text,
'active_chunks': 0 # Will be filled from DB
}
except Exception as e:
return {
'name': worker.get('name', worker_id),
'cpu': 0.0,
'processes': 0,
'status': 'idle',
'status_text': 'ERROR',
'active_chunks': 0
}
@app.route('/')
def dashboard():
"""Render the dashboard."""
# Connect to database
conn = sqlite3.connect('exploration.db')
c = conn.cursor()
# Get chunk statistics
c.execute("SELECT COUNT(*), status FROM chunks GROUP BY status")
chunk_stats = {row[1]: row[0] for row in c.fetchall()}
total_chunks = sum(chunk_stats.values())
completed_chunks = chunk_stats.get('completed', 0)
running_chunks = chunk_stats.get('running', 0)
pending_chunks = chunk_stats.get('pending', 0)
# Get active chunks per worker
c.execute("SELECT assigned_worker, COUNT(*) FROM chunks WHERE status='running' GROUP BY assigned_worker")
active_per_worker = {row[0]: row[1] for row in c.fetchall()}
# Get total combinations from chunks table
c.execute("SELECT SUM(total_combos) FROM chunks")
total_combos_result = c.fetchone()[0]
total_combos = total_combos_result if total_combos_result else 4096
# Get tested strategies count (if strategies table exists)
try:
c.execute("SELECT COUNT(*) FROM strategies")
tested_combos = c.fetchone()[0]
except sqlite3.OperationalError:
tested_combos = 0
# Get top strategies (if table exists)
top_strategies = []
try:
c.execute("""
SELECT * FROM strategies
WHERE total_trades >= 700
ORDER BY pnl_per_1k DESC
LIMIT 10
""")
columns = [desc[0] for desc in c.description]
for row in c.fetchall():
strat = dict(zip(columns, row))
top_strategies.append(strat)
except sqlite3.OperationalError:
pass # Table doesn't exist yet
conn.close()
# Calculate progress
progress_pct = round((tested_combos / total_combos) * 100, 2) if total_combos > 0 else 0
completed_pct = round((completed_chunks / total_chunks) * 100, 1) if total_chunks > 0 else 0
# Estimate time remaining
if completed_chunks > 0 and running_chunks > 0:
avg_time_per_chunk = 1.5 # hours (rough estimate)
est_hours = round(pending_chunks * avg_time_per_chunk / max(running_chunks, 1), 1)
else:
est_hours = "N/A"
# Get worker status
workers = {}
for worker_id in WORKERS.keys():
worker_data = get_worker_status(worker_id)
worker_data['active_chunks'] = active_per_worker.get(worker_id, 0)
workers[worker_id] = worker_data
# Render template
return render_template_string(
HTML_TEMPLATE,
progress_pct=progress_pct,
tested_combos=tested_combos,
total_combos=total_combos,
total_chunks=total_chunks,
completed_chunks=completed_chunks,
completed_pct=completed_pct,
running_chunks=running_chunks,
pending_chunks=pending_chunks,
est_hours=est_hours,
workers=workers,
top_strategies=top_strategies,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
if __name__ == '__main__':
print("🌐 Starting web dashboard on http://0.0.0.0:5000")
print(" Access from any browser on your network")
print(" Auto-refreshes every 30 seconds")
print()
app.run(host='0.0.0.0', port=5000, debug=False)