- Master controller with job queue and result aggregation - Worker scripts for parallel backtesting (22 workers per server) - SQLite database for strategy ranking and performance tracking - File-based job queue (simple, robust, survives crashes) - Auto-setup script for both EPYC servers - Status dashboard for monitoring progress - Comprehensive deployment guide Architecture: - Master: Job generation, worker coordination, result collection - Worker 1 (pve-nu-monitor01): AMD EPYC 7282, 22 parallel jobs - Worker 2 (srv-bd-host01): AMD EPYC 7302, 22 parallel jobs - Total capacity: ~49,000 backtests/day (44 cores @ 70%) Initial focus: v9 parameter refinement (27 configurations) Target: Find strategies >00/1k P&L (current baseline 92/1k) Files: - cluster/master.py: Main controller (570 lines) - cluster/worker.py: Worker execution script (220 lines) - cluster/setup_cluster.sh: Automated deployment - cluster/status.py: Real-time status dashboard - cluster/README.md: Operational documentation - cluster/DEPLOYMENT.md: Step-by-step deployment guide
372 lines
12 KiB
Python
372 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Continuous Optimization Cluster - MASTER Controller
|
|
Manages job queue, coordinates workers, aggregates results
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import os
|
|
import subprocess
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import sqlite3
|
|
|
|
# Configuration
|
|
WORKERS = {
|
|
'worker1': {
|
|
'host': 'root@10.10.254.106',
|
|
'cores': 22, # 70% of 32
|
|
'workspace': '/root/optimization-cluster',
|
|
},
|
|
'worker2': {
|
|
'host': 'root@10.10.254.106', # Connect through monitor01
|
|
'ssh_hop': 'root@10.20.254.100', # Actual target
|
|
'cores': 22, # 70% of 32
|
|
'workspace': '/root/optimization-cluster',
|
|
}
|
|
}
|
|
|
|
CLUSTER_DIR = Path(__file__).parent
|
|
QUEUE_DIR = CLUSTER_DIR / 'queue'
|
|
RESULTS_DIR = CLUSTER_DIR / 'results'
|
|
DB_PATH = CLUSTER_DIR / 'strategies.db'
|
|
|
|
# Job priorities
|
|
PRIORITY_HIGH = 1 # Known good strategies (v9 refinements)
|
|
PRIORITY_MEDIUM = 2 # New concepts (volume profiles, orderflow)
|
|
PRIORITY_LOW = 3 # Experimental (ML, neural networks)
|
|
|
|
class StrategyDatabase:
|
|
"""SQLite database for strategy performance tracking"""
|
|
|
|
def __init__(self, db_path):
|
|
self.db_path = db_path
|
|
self.init_db()
|
|
|
|
def init_db(self):
|
|
"""Create tables if not exist"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS strategies (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
indicator_type TEXT NOT NULL,
|
|
params JSON NOT NULL,
|
|
pnl_per_1k REAL,
|
|
trade_count INTEGER,
|
|
win_rate REAL,
|
|
profit_factor REAL,
|
|
max_drawdown REAL,
|
|
sharpe_ratio REAL,
|
|
tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
status TEXT DEFAULT 'pending',
|
|
notes TEXT
|
|
)
|
|
''')
|
|
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS backtest_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
strategy_id INTEGER,
|
|
config JSON NOT NULL,
|
|
pnl REAL,
|
|
trades INTEGER,
|
|
win_rate REAL,
|
|
completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (strategy_id) REFERENCES strategies (id)
|
|
)
|
|
''')
|
|
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_file TEXT UNIQUE NOT NULL,
|
|
priority INTEGER DEFAULT 2,
|
|
worker_id TEXT,
|
|
status TEXT DEFAULT 'queued',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
started_at TIMESTAMP,
|
|
completed_at TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def add_strategy(self, name, indicator_type, params):
|
|
"""Add new strategy to test"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
try:
|
|
c.execute('''
|
|
INSERT INTO strategies (name, indicator_type, params)
|
|
VALUES (?, ?, ?)
|
|
''', (name, indicator_type, json.dumps(params)))
|
|
conn.commit()
|
|
return c.lastrowid
|
|
except sqlite3.IntegrityError:
|
|
# Strategy already exists
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_top_strategies(self, limit=10):
|
|
"""Get top performing strategies"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
c = conn.cursor()
|
|
|
|
c.execute('''
|
|
SELECT name, indicator_type, pnl_per_1k, trade_count,
|
|
win_rate, profit_factor, max_drawdown
|
|
FROM strategies
|
|
WHERE status = 'completed'
|
|
ORDER BY pnl_per_1k DESC
|
|
LIMIT ?
|
|
''', (limit,))
|
|
|
|
results = c.fetchall()
|
|
conn.close()
|
|
return results
|
|
|
|
class JobQueue:
|
|
"""Manages job queue and worker assignments"""
|
|
|
|
def __init__(self, queue_dir, db):
|
|
self.queue_dir = Path(queue_dir)
|
|
self.queue_dir.mkdir(exist_ok=True)
|
|
self.db = db
|
|
|
|
def create_job(self, indicator, params, priority=PRIORITY_MEDIUM):
|
|
"""Create a new backtest job"""
|
|
job_id = f"{indicator}_{int(time.time() * 1000)}"
|
|
job_file = self.queue_dir / f"{job_id}.json"
|
|
|
|
job = {
|
|
'id': job_id,
|
|
'indicator': indicator,
|
|
'params': params,
|
|
'priority': priority,
|
|
'created_at': datetime.now().isoformat(),
|
|
'status': 'queued'
|
|
}
|
|
|
|
with open(job_file, 'w') as f:
|
|
json.dump(job, f, indent=2)
|
|
|
|
# Log to database
|
|
conn = sqlite3.connect(self.db.db_path)
|
|
c = conn.cursor()
|
|
c.execute('''
|
|
INSERT INTO jobs (job_file, priority, status)
|
|
VALUES (?, ?, 'queued')
|
|
''', (job_file.name, priority))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ Created job: {job_id} (priority {priority})")
|
|
return job_id
|
|
|
|
def get_next_job(self):
|
|
"""Get highest priority job from queue"""
|
|
jobs = sorted(
|
|
self.queue_dir.glob("*.json"),
|
|
key=lambda f: json.load(open(f)).get('priority', 2)
|
|
)
|
|
|
|
if jobs:
|
|
return jobs[0]
|
|
return None
|
|
|
|
def mark_running(self, job_file, worker_id):
|
|
"""Mark job as running"""
|
|
with open(job_file, 'r') as f:
|
|
job = json.load(f)
|
|
|
|
job['status'] = 'running'
|
|
job['worker_id'] = worker_id
|
|
job['started_at'] = datetime.now().isoformat()
|
|
|
|
with open(job_file, 'w') as f:
|
|
json.dump(job, f, indent=2)
|
|
|
|
# Update database
|
|
conn = sqlite3.connect(self.db.db_path)
|
|
c = conn.cursor()
|
|
c.execute('''
|
|
UPDATE jobs
|
|
SET status = 'running', worker_id = ?, started_at = CURRENT_TIMESTAMP
|
|
WHERE job_file = ?
|
|
''', (worker_id, job_file.name))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
class WorkerManager:
|
|
"""Manages worker coordination"""
|
|
|
|
def __init__(self, workers):
|
|
self.workers = workers
|
|
|
|
def check_worker_status(self, worker_id):
|
|
"""Check if worker is idle"""
|
|
worker = self.workers[worker_id]
|
|
|
|
# Check via SSH if worker has running jobs
|
|
cmd = f"ssh {worker['host']}"
|
|
if 'ssh_hop' in worker:
|
|
cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}"
|
|
|
|
cmd += " 'pgrep -f backtester || echo IDLE'"
|
|
|
|
try:
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
|
|
return 'IDLE' in result.stdout
|
|
except:
|
|
return False
|
|
|
|
def assign_job(self, worker_id, job_file):
|
|
"""Assign job to worker"""
|
|
worker = self.workers[worker_id]
|
|
|
|
print(f"📤 Assigning {job_file.name} to {worker_id}...")
|
|
|
|
# Copy job to worker
|
|
cmd = f"scp {job_file} {worker['host']}:{worker['workspace']}/jobs/"
|
|
subprocess.run(cmd, shell=True)
|
|
|
|
# Trigger worker execution
|
|
cmd = f"ssh {worker['host']}"
|
|
if 'ssh_hop' in worker:
|
|
cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}"
|
|
|
|
cmd += f" 'cd {worker['workspace']} && python3 worker.py {job_file.name} > logs/worker.log 2>&1 &'"
|
|
subprocess.run(cmd, shell=True)
|
|
|
|
print(f"✅ Job started on {worker_id}")
|
|
|
|
class ClusterMaster:
|
|
"""Main cluster controller"""
|
|
|
|
def __init__(self):
|
|
self.db = StrategyDatabase(DB_PATH)
|
|
self.queue = JobQueue(QUEUE_DIR, self.db)
|
|
self.workers = WorkerManager(WORKERS)
|
|
self.results_dir = Path(RESULTS_DIR)
|
|
self.results_dir.mkdir(exist_ok=True)
|
|
|
|
def generate_v9_jobs(self):
|
|
"""Generate v9 parameter sweep jobs"""
|
|
print("🔧 Generating v9 parameter sweep jobs...")
|
|
|
|
# Refined parameter grid based on baseline results
|
|
flip_thresholds = [0.5, 0.6, 0.7]
|
|
ma_gaps = [0.30, 0.35, 0.40]
|
|
momentum_adx = [21, 23, 25]
|
|
|
|
job_count = 0
|
|
for flip in flip_thresholds:
|
|
for ma_gap in ma_gaps:
|
|
for adx in momentum_adx:
|
|
params = {
|
|
'flip_threshold': flip,
|
|
'ma_gap': ma_gap,
|
|
'momentum_adx': adx,
|
|
'long_pos': 70,
|
|
'short_pos': 25,
|
|
'cooldown_bars': 2
|
|
}
|
|
|
|
self.queue.create_job('v9_moneyline', params, PRIORITY_HIGH)
|
|
job_count += 1
|
|
|
|
print(f"✅ Created {job_count} v9 refinement jobs")
|
|
|
|
def run_forever(self):
|
|
"""Main control loop - runs 24/7"""
|
|
print("🚀 Cluster Master starting...")
|
|
print(f"📊 Workers: {len(WORKERS)}")
|
|
print(f"💾 Database: {DB_PATH}")
|
|
print(f"📁 Queue: {QUEUE_DIR}")
|
|
|
|
# Generate initial jobs if queue empty
|
|
if not list(QUEUE_DIR.glob("*.json")):
|
|
self.generate_v9_jobs()
|
|
|
|
iteration = 0
|
|
while True:
|
|
iteration += 1
|
|
print(f"\n{'='*60}")
|
|
print(f"🔄 Iteration {iteration} - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
# Check for completed results
|
|
self.collect_results()
|
|
|
|
# Assign jobs to idle workers
|
|
for worker_id in WORKERS:
|
|
if self.workers.check_worker_status(worker_id):
|
|
job = self.queue.get_next_job()
|
|
if job:
|
|
self.queue.mark_running(job, worker_id)
|
|
self.workers.assign_job(worker_id, job)
|
|
|
|
# Show status
|
|
self.show_status()
|
|
|
|
# Sleep before next iteration
|
|
time.sleep(60) # Check every minute
|
|
|
|
def collect_results(self):
|
|
"""Collect completed results from workers"""
|
|
for result_file in self.results_dir.glob("*.json"):
|
|
try:
|
|
with open(result_file, 'r') as f:
|
|
result = json.load(f)
|
|
|
|
# Store in database
|
|
self.store_result(result)
|
|
|
|
# Archive result
|
|
archive_dir = self.results_dir / 'archive'
|
|
archive_dir.mkdir(exist_ok=True)
|
|
result_file.rename(archive_dir / result_file.name)
|
|
|
|
print(f"✅ Processed result: {result['job_id']}")
|
|
except Exception as e:
|
|
print(f"❌ Error processing {result_file}: {e}")
|
|
|
|
def store_result(self, result):
|
|
"""Store backtest result in database"""
|
|
# Implementation here
|
|
pass
|
|
|
|
def show_status(self):
|
|
"""Show current cluster status"""
|
|
queued = len(list(QUEUE_DIR.glob("*.json")))
|
|
|
|
conn = sqlite3.connect(self.db.db_path)
|
|
c = conn.cursor()
|
|
|
|
c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'running'")
|
|
running = c.fetchone()[0]
|
|
|
|
c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'completed'")
|
|
completed = c.fetchone()[0]
|
|
|
|
conn.close()
|
|
|
|
print(f"📊 Status: {queued} queued | {running} running | {completed} completed")
|
|
|
|
# Show top strategies
|
|
top = self.db.get_top_strategies(3)
|
|
if top:
|
|
print("\n🏆 Top 3 Strategies:")
|
|
for i, strat in enumerate(top, 1):
|
|
print(f" {i}. {strat[0]}: ${strat[2]:.2f}/1k ({strat[3]} trades, {strat[4]:.1f}% WR)")
|
|
|
|
if __name__ == '__main__':
|
|
master = ClusterMaster()
|
|
master.run_forever()
|