#!/usr/bin/env python3 """ Continuous Optimization Cluster - MASTER Controller Manages job queue, coordinates workers, aggregates results """ import json import time import os import subprocess from datetime import datetime from pathlib import Path import sqlite3 # Configuration WORKERS = { 'worker1': { 'host': 'root@10.10.254.106', 'cores': 22, # 70% of 32 'workspace': '/root/optimization-cluster', }, 'worker2': { 'host': 'root@10.10.254.106', # Connect through monitor01 'ssh_hop': 'root@10.20.254.100', # Actual target 'cores': 22, # 70% of 32 'workspace': '/root/optimization-cluster', } } CLUSTER_DIR = Path(__file__).parent QUEUE_DIR = CLUSTER_DIR / 'queue' RESULTS_DIR = CLUSTER_DIR / 'results' DB_PATH = CLUSTER_DIR / 'strategies.db' # Job priorities PRIORITY_HIGH = 1 # Known good strategies (v9 refinements) PRIORITY_MEDIUM = 2 # New concepts (volume profiles, orderflow) PRIORITY_LOW = 3 # Experimental (ML, neural networks) class StrategyDatabase: """SQLite database for strategy performance tracking""" def __init__(self, db_path): self.db_path = db_path self.init_db() def init_db(self): """Create tables if not exist""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS strategies ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, indicator_type TEXT NOT NULL, params JSON NOT NULL, pnl_per_1k REAL, trade_count INTEGER, win_rate REAL, profit_factor REAL, max_drawdown REAL, sharpe_ratio REAL, tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'pending', notes TEXT ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS backtest_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, strategy_id INTEGER, config JSON NOT NULL, pnl REAL, trades INTEGER, win_rate REAL, completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (strategy_id) REFERENCES strategies (id) ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_file TEXT UNIQUE NOT NULL, priority INTEGER DEFAULT 2, worker_id TEXT, status TEXT DEFAULT 'queued', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP ) ''') conn.commit() conn.close() def add_strategy(self, name, indicator_type, params): """Add new strategy to test""" conn = sqlite3.connect(self.db_path) c = conn.cursor() try: c.execute(''' INSERT INTO strategies (name, indicator_type, params) VALUES (?, ?, ?) ''', (name, indicator_type, json.dumps(params))) conn.commit() return c.lastrowid except sqlite3.IntegrityError: # Strategy already exists return None finally: conn.close() def get_top_strategies(self, limit=10): """Get top performing strategies""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' SELECT name, indicator_type, pnl_per_1k, trade_count, win_rate, profit_factor, max_drawdown FROM strategies WHERE status = 'completed' ORDER BY pnl_per_1k DESC LIMIT ? ''', (limit,)) results = c.fetchall() conn.close() return results class JobQueue: """Manages job queue and worker assignments""" def __init__(self, queue_dir, db): self.queue_dir = Path(queue_dir) self.queue_dir.mkdir(exist_ok=True) self.db = db def create_job(self, indicator, params, priority=PRIORITY_MEDIUM): """Create a new backtest job""" job_id = f"{indicator}_{int(time.time() * 1000)}" job_file = self.queue_dir / f"{job_id}.json" job = { 'id': job_id, 'indicator': indicator, 'params': params, 'priority': priority, 'created_at': datetime.now().isoformat(), 'status': 'queued' } with open(job_file, 'w') as f: json.dump(job, f, indent=2) # Log to database conn = sqlite3.connect(self.db.db_path) c = conn.cursor() c.execute(''' INSERT INTO jobs (job_file, priority, status) VALUES (?, ?, 'queued') ''', (job_file.name, priority)) conn.commit() conn.close() print(f"āœ… Created job: {job_id} (priority {priority})") return job_id def get_next_job(self): """Get highest priority job from queue""" jobs = sorted( self.queue_dir.glob("*.json"), key=lambda f: json.load(open(f)).get('priority', 2) ) if jobs: return jobs[0] return None def mark_running(self, job_file, worker_id): """Mark job as running""" with open(job_file, 'r') as f: job = json.load(f) job['status'] = 'running' job['worker_id'] = worker_id job['started_at'] = datetime.now().isoformat() with open(job_file, 'w') as f: json.dump(job, f, indent=2) # Update database conn = sqlite3.connect(self.db.db_path) c = conn.cursor() c.execute(''' UPDATE jobs SET status = 'running', worker_id = ?, started_at = CURRENT_TIMESTAMP WHERE job_file = ? ''', (worker_id, job_file.name)) conn.commit() conn.close() class WorkerManager: """Manages worker coordination""" def __init__(self, workers): self.workers = workers def check_worker_status(self, worker_id): """Check if worker is idle""" worker = self.workers[worker_id] # Check via SSH if worker has running jobs cmd = f"ssh {worker['host']}" if 'ssh_hop' in worker: cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}" cmd += " 'pgrep -f backtester || echo IDLE'" try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5) return 'IDLE' in result.stdout except: return False def assign_job(self, worker_id, job_file): """Assign job to worker""" worker = self.workers[worker_id] print(f"šŸ“¤ Assigning {job_file.name} to {worker_id}...") # Copy job to worker cmd = f"scp {job_file} {worker['host']}:{worker['workspace']}/jobs/" subprocess.run(cmd, shell=True) # Trigger worker execution cmd = f"ssh {worker['host']}" if 'ssh_hop' in worker: cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}" cmd += f" 'cd {worker['workspace']} && python3 worker.py {job_file.name} > logs/worker.log 2>&1 &'" subprocess.run(cmd, shell=True) print(f"āœ… Job started on {worker_id}") class ClusterMaster: """Main cluster controller""" def __init__(self): self.db = StrategyDatabase(DB_PATH) self.queue = JobQueue(QUEUE_DIR, self.db) self.workers = WorkerManager(WORKERS) self.results_dir = Path(RESULTS_DIR) self.results_dir.mkdir(exist_ok=True) def generate_v9_jobs(self): """Generate v9 parameter sweep jobs""" print("šŸ”§ Generating v9 parameter sweep jobs...") # Refined parameter grid based on baseline results flip_thresholds = [0.5, 0.6, 0.7] ma_gaps = [0.30, 0.35, 0.40] momentum_adx = [21, 23, 25] job_count = 0 for flip in flip_thresholds: for ma_gap in ma_gaps: for adx in momentum_adx: params = { 'flip_threshold': flip, 'ma_gap': ma_gap, 'momentum_adx': adx, 'long_pos': 70, 'short_pos': 25, 'cooldown_bars': 2 } self.queue.create_job('v9_moneyline', params, PRIORITY_HIGH) job_count += 1 print(f"āœ… Created {job_count} v9 refinement jobs") def run_forever(self): """Main control loop - runs 24/7""" print("šŸš€ Cluster Master starting...") print(f"šŸ“Š Workers: {len(WORKERS)}") print(f"šŸ’¾ Database: {DB_PATH}") print(f"šŸ“ Queue: {QUEUE_DIR}") # Generate initial jobs if queue empty if not list(QUEUE_DIR.glob("*.json")): self.generate_v9_jobs() iteration = 0 while True: iteration += 1 print(f"\n{'='*60}") print(f"šŸ”„ Iteration {iteration} - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # Check for completed results self.collect_results() # Assign jobs to idle workers for worker_id in WORKERS: if self.workers.check_worker_status(worker_id): job = self.queue.get_next_job() if job: self.queue.mark_running(job, worker_id) self.workers.assign_job(worker_id, job) # Show status self.show_status() # Sleep before next iteration time.sleep(60) # Check every minute def collect_results(self): """Collect completed results from workers""" for result_file in self.results_dir.glob("*.json"): try: with open(result_file, 'r') as f: result = json.load(f) # Store in database self.store_result(result) # Archive result archive_dir = self.results_dir / 'archive' archive_dir.mkdir(exist_ok=True) result_file.rename(archive_dir / result_file.name) print(f"āœ… Processed result: {result['job_id']}") except Exception as e: print(f"āŒ Error processing {result_file}: {e}") def store_result(self, result): """Store backtest result in database""" # Implementation here pass def show_status(self): """Show current cluster status""" queued = len(list(QUEUE_DIR.glob("*.json"))) conn = sqlite3.connect(self.db.db_path) c = conn.cursor() c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'running'") running = c.fetchone()[0] c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'completed'") completed = c.fetchone()[0] conn.close() print(f"šŸ“Š Status: {queued} queued | {running} running | {completed} completed") # Show top strategies top = self.db.get_top_strategies(3) if top: print("\nšŸ† Top 3 Strategies:") for i, strat in enumerate(top, 1): print(f" {i}. {strat[0]}: ${strat[2]:.2f}/1k ({strat[3]} trades, {strat[4]:.1f}% WR)") if __name__ == '__main__': master = ClusterMaster() master.run_forever()