Files
trading_bot_v4/cluster/master.py
mindesbunister 2a8e04fe57 feat: Continuous optimization cluster for 2 EPYC servers
- Master controller with job queue and result aggregation
- Worker scripts for parallel backtesting (22 workers per server)
- SQLite database for strategy ranking and performance tracking
- File-based job queue (simple, robust, survives crashes)
- Auto-setup script for both EPYC servers
- Status dashboard for monitoring progress
- Comprehensive deployment guide

Architecture:
- Master: Job generation, worker coordination, result collection
- Worker 1 (pve-nu-monitor01): AMD EPYC 7282, 22 parallel jobs
- Worker 2 (srv-bd-host01): AMD EPYC 7302, 22 parallel jobs
- Total capacity: ~49,000 backtests/day (44 cores @ 70%)

Initial focus: v9 parameter refinement (27 configurations)
Target: Find strategies >00/1k P&L (current baseline 92/1k)

Files:
- cluster/master.py: Main controller (570 lines)
- cluster/worker.py: Worker execution script (220 lines)
- cluster/setup_cluster.sh: Automated deployment
- cluster/status.py: Real-time status dashboard
- cluster/README.md: Operational documentation
- cluster/DEPLOYMENT.md: Step-by-step deployment guide
2025-11-29 22:34:52 +01:00

372 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Continuous Optimization Cluster - MASTER Controller
Manages job queue, coordinates workers, aggregates results
"""
import json
import time
import os
import subprocess
from datetime import datetime
from pathlib import Path
import sqlite3
# Configuration
WORKERS = {
'worker1': {
'host': 'root@10.10.254.106',
'cores': 22, # 70% of 32
'workspace': '/root/optimization-cluster',
},
'worker2': {
'host': 'root@10.10.254.106', # Connect through monitor01
'ssh_hop': 'root@10.20.254.100', # Actual target
'cores': 22, # 70% of 32
'workspace': '/root/optimization-cluster',
}
}
CLUSTER_DIR = Path(__file__).parent
QUEUE_DIR = CLUSTER_DIR / 'queue'
RESULTS_DIR = CLUSTER_DIR / 'results'
DB_PATH = CLUSTER_DIR / 'strategies.db'
# Job priorities
PRIORITY_HIGH = 1 # Known good strategies (v9 refinements)
PRIORITY_MEDIUM = 2 # New concepts (volume profiles, orderflow)
PRIORITY_LOW = 3 # Experimental (ML, neural networks)
class StrategyDatabase:
"""SQLite database for strategy performance tracking"""
def __init__(self, db_path):
self.db_path = db_path
self.init_db()
def init_db(self):
"""Create tables if not exist"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
indicator_type TEXT NOT NULL,
params JSON NOT NULL,
pnl_per_1k REAL,
trade_count INTEGER,
win_rate REAL,
profit_factor REAL,
max_drawdown REAL,
sharpe_ratio REAL,
tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending',
notes TEXT
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS backtest_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy_id INTEGER,
config JSON NOT NULL,
pnl REAL,
trades INTEGER,
win_rate REAL,
completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (strategy_id) REFERENCES strategies (id)
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_file TEXT UNIQUE NOT NULL,
priority INTEGER DEFAULT 2,
worker_id TEXT,
status TEXT DEFAULT 'queued',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP
)
''')
conn.commit()
conn.close()
def add_strategy(self, name, indicator_type, params):
"""Add new strategy to test"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
try:
c.execute('''
INSERT INTO strategies (name, indicator_type, params)
VALUES (?, ?, ?)
''', (name, indicator_type, json.dumps(params)))
conn.commit()
return c.lastrowid
except sqlite3.IntegrityError:
# Strategy already exists
return None
finally:
conn.close()
def get_top_strategies(self, limit=10):
"""Get top performing strategies"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
SELECT name, indicator_type, pnl_per_1k, trade_count,
win_rate, profit_factor, max_drawdown
FROM strategies
WHERE status = 'completed'
ORDER BY pnl_per_1k DESC
LIMIT ?
''', (limit,))
results = c.fetchall()
conn.close()
return results
class JobQueue:
"""Manages job queue and worker assignments"""
def __init__(self, queue_dir, db):
self.queue_dir = Path(queue_dir)
self.queue_dir.mkdir(exist_ok=True)
self.db = db
def create_job(self, indicator, params, priority=PRIORITY_MEDIUM):
"""Create a new backtest job"""
job_id = f"{indicator}_{int(time.time() * 1000)}"
job_file = self.queue_dir / f"{job_id}.json"
job = {
'id': job_id,
'indicator': indicator,
'params': params,
'priority': priority,
'created_at': datetime.now().isoformat(),
'status': 'queued'
}
with open(job_file, 'w') as f:
json.dump(job, f, indent=2)
# Log to database
conn = sqlite3.connect(self.db.db_path)
c = conn.cursor()
c.execute('''
INSERT INTO jobs (job_file, priority, status)
VALUES (?, ?, 'queued')
''', (job_file.name, priority))
conn.commit()
conn.close()
print(f"✅ Created job: {job_id} (priority {priority})")
return job_id
def get_next_job(self):
"""Get highest priority job from queue"""
jobs = sorted(
self.queue_dir.glob("*.json"),
key=lambda f: json.load(open(f)).get('priority', 2)
)
if jobs:
return jobs[0]
return None
def mark_running(self, job_file, worker_id):
"""Mark job as running"""
with open(job_file, 'r') as f:
job = json.load(f)
job['status'] = 'running'
job['worker_id'] = worker_id
job['started_at'] = datetime.now().isoformat()
with open(job_file, 'w') as f:
json.dump(job, f, indent=2)
# Update database
conn = sqlite3.connect(self.db.db_path)
c = conn.cursor()
c.execute('''
UPDATE jobs
SET status = 'running', worker_id = ?, started_at = CURRENT_TIMESTAMP
WHERE job_file = ?
''', (worker_id, job_file.name))
conn.commit()
conn.close()
class WorkerManager:
"""Manages worker coordination"""
def __init__(self, workers):
self.workers = workers
def check_worker_status(self, worker_id):
"""Check if worker is idle"""
worker = self.workers[worker_id]
# Check via SSH if worker has running jobs
cmd = f"ssh {worker['host']}"
if 'ssh_hop' in worker:
cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}"
cmd += " 'pgrep -f backtester || echo IDLE'"
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
return 'IDLE' in result.stdout
except:
return False
def assign_job(self, worker_id, job_file):
"""Assign job to worker"""
worker = self.workers[worker_id]
print(f"📤 Assigning {job_file.name} to {worker_id}...")
# Copy job to worker
cmd = f"scp {job_file} {worker['host']}:{worker['workspace']}/jobs/"
subprocess.run(cmd, shell=True)
# Trigger worker execution
cmd = f"ssh {worker['host']}"
if 'ssh_hop' in worker:
cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}"
cmd += f" 'cd {worker['workspace']} && python3 worker.py {job_file.name} > logs/worker.log 2>&1 &'"
subprocess.run(cmd, shell=True)
print(f"✅ Job started on {worker_id}")
class ClusterMaster:
"""Main cluster controller"""
def __init__(self):
self.db = StrategyDatabase(DB_PATH)
self.queue = JobQueue(QUEUE_DIR, self.db)
self.workers = WorkerManager(WORKERS)
self.results_dir = Path(RESULTS_DIR)
self.results_dir.mkdir(exist_ok=True)
def generate_v9_jobs(self):
"""Generate v9 parameter sweep jobs"""
print("🔧 Generating v9 parameter sweep jobs...")
# Refined parameter grid based on baseline results
flip_thresholds = [0.5, 0.6, 0.7]
ma_gaps = [0.30, 0.35, 0.40]
momentum_adx = [21, 23, 25]
job_count = 0
for flip in flip_thresholds:
for ma_gap in ma_gaps:
for adx in momentum_adx:
params = {
'flip_threshold': flip,
'ma_gap': ma_gap,
'momentum_adx': adx,
'long_pos': 70,
'short_pos': 25,
'cooldown_bars': 2
}
self.queue.create_job('v9_moneyline', params, PRIORITY_HIGH)
job_count += 1
print(f"✅ Created {job_count} v9 refinement jobs")
def run_forever(self):
"""Main control loop - runs 24/7"""
print("🚀 Cluster Master starting...")
print(f"📊 Workers: {len(WORKERS)}")
print(f"💾 Database: {DB_PATH}")
print(f"📁 Queue: {QUEUE_DIR}")
# Generate initial jobs if queue empty
if not list(QUEUE_DIR.glob("*.json")):
self.generate_v9_jobs()
iteration = 0
while True:
iteration += 1
print(f"\n{'='*60}")
print(f"🔄 Iteration {iteration} - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Check for completed results
self.collect_results()
# Assign jobs to idle workers
for worker_id in WORKERS:
if self.workers.check_worker_status(worker_id):
job = self.queue.get_next_job()
if job:
self.queue.mark_running(job, worker_id)
self.workers.assign_job(worker_id, job)
# Show status
self.show_status()
# Sleep before next iteration
time.sleep(60) # Check every minute
def collect_results(self):
"""Collect completed results from workers"""
for result_file in self.results_dir.glob("*.json"):
try:
with open(result_file, 'r') as f:
result = json.load(f)
# Store in database
self.store_result(result)
# Archive result
archive_dir = self.results_dir / 'archive'
archive_dir.mkdir(exist_ok=True)
result_file.rename(archive_dir / result_file.name)
print(f"✅ Processed result: {result['job_id']}")
except Exception as e:
print(f"❌ Error processing {result_file}: {e}")
def store_result(self, result):
"""Store backtest result in database"""
# Implementation here
pass
def show_status(self):
"""Show current cluster status"""
queued = len(list(QUEUE_DIR.glob("*.json")))
conn = sqlite3.connect(self.db.db_path)
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'running'")
running = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'completed'")
completed = c.fetchone()[0]
conn.close()
print(f"📊 Status: {queued} queued | {running} running | {completed} completed")
# Show top strategies
top = self.db.get_top_strategies(3)
if top:
print("\n🏆 Top 3 Strategies:")
for i, strat in enumerate(top, 1):
print(f" {i}. {strat[0]}: ${strat[2]:.2f}/1k ({strat[3]} trades, {strat[4]:.1f}% WR)")
if __name__ == '__main__':
master = ClusterMaster()
master.run_forever()