#!/usr/bin/env python3 """ Distributed Continuous Optimization Coordinator Extends comprehensive_sweep.py to distribute massive parameter grids across 2 EPYC servers (64 cores total) for 24/7 strategy discovery. Architecture: 1. Master generates parameter grid (millions of combinations) 2. Splits into chunks (~10,000 combos per chunk) 3. Distributes chunks to workers via SSH 4. Workers run modified comprehensive_sweep on their chunk 5. Master aggregates results, identifies top performers 6. Master generates next exploration batch (nearby good configs) 7. Repeat forever - continuous improvement Integration with Existing System: - Uses simulator.py and MoneyLineInputs from /home/comprehensive_sweep/backtester/ - Preserves comprehensive_sweep.py output format (CSV with 14 params) - Works with existing .venv and data files on EPYC - Backwards compatible - can still run comprehensive_sweep.py standalone """ import sqlite3 import subprocess import json import time import itertools import hashlib import threading # ADDED Nov 30, 2025: Background monitoring from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass # Worker Configuration WORKERS = { 'worker1': { 'host': 'root@10.10.254.106', 'cores': 32, # Full 32 threads available 'workspace': '/home/comprehensive_sweep', 'venv_path': 'backtester/.venv/bin/activate', # Relative to workspace 'ssh_key': None, # Use default key }, 'worker2': { 'host': 'root@10.20.254.100', 'cores': 32, # Full 32 threads available 'workspace': '/home/backtest_dual/backtest', # CORRECTED: Actual path on bd-host01 'venv_path': '.venv/bin/activate', # CRITICAL FIX (Nov 30): Worker2 has venv at workspace root, not in backtester/ 'ssh_hop': 'root@10.10.254.106', # Connect through worker1 'ssh_key': None, } } CLUSTER_DIR = Path(__file__).parent RESULTS_DIR = CLUSTER_DIR / 'distributed_results' DB_PATH = CLUSTER_DIR / 'exploration.db' @dataclass class ParameterGrid: """Full parameter space for comprehensive sweep""" flip_thresholds: List[float] ma_gaps: List[float] adx_mins: List[int] long_pos_maxs: List[int] short_pos_mins: List[int] cooldowns: List[int] position_sizes: List[int] tp1_multipliers: List[float] tp2_multipliers: List[float] sl_multipliers: List[float] tp1_close_percents: List[int] trailing_multipliers: List[float] vol_mins: List[float] max_bars_list: List[int] def total_combinations(self) -> int: """Calculate total parameter space size""" return ( len(self.flip_thresholds) * len(self.ma_gaps) * len(self.adx_mins) * len(self.long_pos_maxs) * len(self.short_pos_mins) * len(self.cooldowns) * len(self.position_sizes) * len(self.tp1_multipliers) * len(self.tp2_multipliers) * len(self.sl_multipliers) * len(self.tp1_close_percents) * len(self.trailing_multipliers) * len(self.vol_mins) * len(self.max_bars_list) ) def to_dict(self) -> Dict[str, List]: """Convert to dict for JSON serialization""" return { 'flip_thresholds': self.flip_thresholds, 'ma_gaps': self.ma_gaps, 'adx_mins': self.adx_mins, 'long_pos_maxs': self.long_pos_maxs, 'short_pos_mins': self.short_pos_mins, 'cooldowns': self.cooldowns, 'position_sizes': self.position_sizes, 'tp1_multipliers': self.tp1_multipliers, 'tp2_multipliers': self.tp2_multipliers, 'sl_multipliers': self.sl_multipliers, 'tp1_close_percents': self.tp1_close_percents, 'trailing_multipliers': self.trailing_multipliers, 'vol_mins': self.vol_mins, 'max_bars_list': self.max_bars_list, } class ExplorationDatabase: """Track all tested strategies and exploration progress""" def __init__(self, db_path: Path): self.db_path = db_path self.init_db() def init_db(self): """Create tables""" conn = sqlite3.connect(self.db_path) c = conn.cursor() # Strategies table - all tested configurations c.execute(''' CREATE TABLE IF NOT EXISTS strategies ( id INTEGER PRIMARY KEY AUTOINCREMENT, param_hash TEXT UNIQUE NOT NULL, indicator_type TEXT NOT NULL, params_json TEXT NOT NULL, trades INTEGER, win_rate REAL, total_pnl REAL, pnl_per_1k REAL, profit_factor REAL, max_drawdown REAL, sharpe_ratio REAL, tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, worker_id TEXT, chunk_id TEXT ) ''') # Exploration chunks - work distribution tracking c.execute(''' CREATE TABLE IF NOT EXISTS chunks ( id TEXT PRIMARY KEY, indicator_type TEXT NOT NULL, grid_json TEXT NOT NULL, chunk_start INTEGER NOT NULL, chunk_end INTEGER NOT NULL, total_combos INTEGER NOT NULL, assigned_worker TEXT, status TEXT DEFAULT 'pending', started_at TIMESTAMP, completed_at TIMESTAMP, best_pnl_in_chunk REAL, results_csv_path TEXT ) ''') # Exploration phases - high-level progress c.execute(''' CREATE TABLE IF NOT EXISTS phases ( id INTEGER PRIMARY KEY AUTOINCREMENT, phase_name TEXT NOT NULL, indicator_type TEXT NOT NULL, grid_json TEXT NOT NULL, total_combos INTEGER NOT NULL, completed_combos INTEGER DEFAULT 0, best_pnl_overall REAL DEFAULT 0, best_params_json TEXT, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, estimated_completion TIMESTAMP, actual_completion TIMESTAMP ) ''') # Create indexes for fast queries c.execute('CREATE INDEX IF NOT EXISTS idx_pnl_per_1k ON strategies(pnl_per_1k DESC)') c.execute('CREATE INDEX IF NOT EXISTS idx_indicator_type ON strategies(indicator_type)') c.execute('CREATE INDEX IF NOT EXISTS idx_chunk_status ON chunks(status)') conn.commit() conn.close() def record_chunk(self, chunk_id: str, indicator_type: str, grid: ParameterGrid, chunk_start: int, chunk_end: int, assigned_worker: str) -> None: """Record new chunk assigned to worker""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' INSERT INTO chunks (id, indicator_type, grid_json, chunk_start, chunk_end, total_combos, assigned_worker, status, started_at) VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?) ''', (chunk_id, indicator_type, json.dumps(grid.to_dict()), chunk_start, chunk_end, chunk_end - chunk_start, assigned_worker, datetime.now())) conn.commit() conn.close() def complete_chunk(self, chunk_id: str, results_csv_path: str, best_pnl: float) -> None: """Mark chunk as completed with results""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' UPDATE chunks SET status='completed', completed_at=?, results_csv_path=?, best_pnl_in_chunk=? WHERE id=? ''', (datetime.now(), results_csv_path, best_pnl, chunk_id)) conn.commit() conn.close() def import_results_csv(self, csv_path: str, worker_id: str, chunk_id: str) -> int: """Import CSV results from comprehensive_sweep into strategies table""" import csv conn = sqlite3.connect(self.db_path) c = conn.cursor() imported = 0 with open(csv_path, 'r') as f: reader = csv.DictReader(f) for row in reader: # Create parameter hash for deduplication params = {k: v for k, v in row.items() if k not in [ 'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k', 'profit_factor', 'max_drawdown', 'sharpe_ratio' ]} param_hash = hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest() try: c.execute(''' INSERT INTO strategies ( param_hash, indicator_type, params_json, trades, win_rate, total_pnl, pnl_per_1k, profit_factor, max_drawdown, sharpe_ratio, worker_id, chunk_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( param_hash, 'v9_moneyline', json.dumps(params), int(row['trades']), float(row['win_rate']), float(row['total_pnl']), float(row['pnl_per_1k']), float(row.get('profit_factor', 0)), float(row.get('max_drawdown', 0)), float(row.get('sharpe_ratio', 0)), worker_id, chunk_id )) imported += 1 except sqlite3.IntegrityError: # Duplicate param_hash - already tested this config pass conn.commit() conn.close() return imported def get_top_strategies(self, limit: int = 100) -> List[Dict]: """Get top performing strategies across all tested""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute(''' SELECT indicator_type, params_json, trades, win_rate, total_pnl, pnl_per_1k, profit_factor, max_drawdown, sharpe_ratio, tested_at FROM strategies WHERE trades >= 700 -- Statistical significance AND win_rate >= 0.50 AND win_rate <= 0.70 -- Realistic AND profit_factor >= 1.2 -- Minimum edge ORDER BY pnl_per_1k DESC LIMIT ? ''', (limit,)) rows = c.fetchall() conn.close() results = [] for row in rows: results.append({ 'indicator_type': row[0], 'params': json.loads(row[1]), 'trades': row[2], 'win_rate': row[3], 'total_pnl': row[4], 'pnl_per_1k': row[5], 'profit_factor': row[6], 'max_drawdown': row[7], 'sharpe_ratio': row[8], 'tested_at': row[9], }) return results class DistributedCoordinator: """Coordinates distributed parameter sweeps across EPYC servers""" def __init__(self): self.db = ExplorationDatabase(DB_PATH) RESULTS_DIR.mkdir(parents=True, exist_ok=True) def ssh_command(self, worker_id: str, command: str) -> subprocess.CompletedProcess: """Execute command on worker via SSH""" worker = WORKERS[worker_id] # CRITICAL FIX (Dec 1, 2025): Add SSH options to prevent prompts and improve reliability ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5" if 'ssh_hop' in worker: # Worker 2 requires hop through worker 1 # CRITICAL FIX (Nov 29, 2025): Use double-nested quotes for 2-hop SSH # Single quotes don't pass command to inner SSH properly ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{command}'\"" else: ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{command}'" return subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True) def deploy_worker_script(self, worker_id: str) -> bool: """Deploy distributed_worker.py to EPYC server""" worker = WORKERS[worker_id] script_path = CLUSTER_DIR / 'distributed_worker.py' # Copy script to worker's comprehensive_sweep directory target = f"{worker['workspace']}/backtester/scripts/distributed_worker.py" if 'ssh_hop' in worker: # Two-hop copy for worker2 print(f"πŸ“€ Copying worker script to {worker_id} via hop...") # Copy to worker1 first subprocess.run(f"scp {script_path} {WORKERS['worker1']['host']}:/tmp/", shell=True) # Then copy from worker1 to worker2 self.ssh_command('worker1', f"scp /tmp/distributed_worker.py {worker['host']}:{target}") else: print(f"πŸ“€ Copying worker script to {worker_id}...") subprocess.run(f"scp {script_path} {worker['host']}:{target}", shell=True) print(f"βœ… Worker script deployed to {worker_id}") return True def assign_chunk(self, worker_id: str, chunk_id: str, grid: ParameterGrid, chunk_start: int, chunk_end: int) -> bool: """Assign parameter chunk to worker for processing""" worker = WORKERS[worker_id] # Record in database self.db.record_chunk(chunk_id, 'v9_moneyline', grid, chunk_start, chunk_end, worker_id) # Create chunk specification JSON chunk_spec = { 'chunk_id': chunk_id, 'chunk_start': chunk_start, 'chunk_end': chunk_end, 'grid': grid.to_dict(), 'num_workers': worker['cores'], } chunk_json_path = RESULTS_DIR / f"{chunk_id}_spec.json" with open(chunk_json_path, 'w') as f: json.dump(chunk_spec, f, indent=2) # Copy chunk spec to worker target_json = f"{worker['workspace']}/chunk_{chunk_id}.json" if 'ssh_hop' in worker: # Two-hop copy subprocess.run(f"scp {chunk_json_path} {WORKERS['worker1']['host']}:/tmp/", shell=True) self.ssh_command('worker1', f"scp /tmp/{chunk_id}_spec.json {worker['host']}:{target_json}") else: subprocess.run(f"scp {chunk_json_path} {worker['host']}:{target_json}", shell=True) # Execute distributed_worker.py on worker # CRITICAL FIX (Nov 30): Use per-worker venv_path to support heterogeneous cluster configurations # Worker1: backtester/.venv/bin/activate (venv inside backtester/) # Worker2: .venv/bin/activate (venv at workspace root) # PROVEN WORKING PATTERN (Nov 30): Manual SSH commands succeeded with this exact structure venv_path = worker.get('venv_path', 'backtester/.venv/bin/activate') # Default to worker1 pattern # Build command exactly as proven in manual tests # CRITICAL: Use nohup with explicit background redirect to detach properly cmd = (f"cd {worker['workspace']} && " f"source {venv_path} && " f"nohup python3 backtester/scripts/distributed_worker.py chunk_{chunk_id}.json " f"> /tmp/{chunk_id}.log 2>&1 &") print(f"πŸš€ Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...") # CRITICAL FIX (Dec 1, 2025): Add SSH options for reliability and timeout handling ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5" # Execute command and capture result to verify it started # CRITICAL FIX (Dec 1, 2025): Remove && because command ends with & (background) # In single quotes, & backgrounds process and shell continues to next command (echo) if 'ssh_hop' in worker: # Worker 2 requires hop through worker 1 ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'\"" else: ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'" # DEBUG: Print command for troubleshooting print(f" DEBUG: Executing SSH command:") print(f" {ssh_cmd[:200]}...") # First 200 chars # Use run() to capture output and verify success # CRITICAL FIX (Dec 1, 2025): Increase timeout from 30s to 60s for nested SSH hops try: result = subprocess.run( ssh_cmd, shell=True, capture_output=True, text=True, timeout=60 # 60 second timeout (was 30s, too short for 2-hop SSH) ) # Verify worker process started # CRITICAL FIX (Dec 1, 2025): Look for marker without spaces (Started_chunk_) if f'Started_chunk_{chunk_id}' in result.stdout: print(f"βœ… Chunk {chunk_id} started on {worker_id} successfully") return True else: print(f"❌ FAILED to start chunk {chunk_id} on {worker_id}") print(f" stdout: {result.stdout}") print(f" stderr: {result.stderr}") return False except subprocess.TimeoutExpired: print(f"⚠️ SSH command timed out for {chunk_id} on {worker_id}") print(f" This usually means SSH hop is misconfigured or slow") return False def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]: """Collect CSV results from worker""" worker = WORKERS[worker_id] # Check if results file exists on worker (in backtester/ subdirectory) results_csv = f"{worker['workspace']}/backtester/chunk_{chunk_id}_results.csv" check_cmd = f"test -f {results_csv} && echo 'exists'" result = self.ssh_command(worker_id, check_cmd) if 'exists' not in result.stdout: return None # Results not ready yet # Copy results back to master local_csv = RESULTS_DIR / f"{chunk_id}_results.csv" if 'ssh_hop' in worker: # Two-hop copy back self.ssh_command('worker1', f"scp {worker['host']}:{results_csv} /tmp/") subprocess.run(f"scp {WORKERS['worker1']['host']}:/tmp/chunk_{chunk_id}_results.csv {local_csv}", shell=True) else: subprocess.run(f"scp {worker['host']}:{results_csv} {local_csv}", shell=True) print(f"πŸ“₯ Collected results from {worker_id} chunk {chunk_id}") # Import into database imported = self.db.import_results_csv(str(local_csv), worker_id, chunk_id) print(f"πŸ“Š Imported {imported} unique strategies from {chunk_id}") # Get best P&L from CSV for chunk tracking import csv with open(local_csv, 'r') as f: reader = csv.DictReader(f) rows = list(reader) best_pnl = max(float(row['pnl_per_1k']) for row in rows) if rows else 0 self.db.complete_chunk(chunk_id, str(local_csv), best_pnl) return str(local_csv) def start_comprehensive_exploration(self, chunk_size: int = 10000): """Start massive comprehensive parameter sweep""" print("=" * 80) print("πŸš€ DISTRIBUTED COMPREHENSIVE EXPLORATION") print("=" * 80) print() # v9 Money Line parameter grid (Nov 30, 2025) # 6 swept parameters Γ— 4 values each = 4,096 combinations # Focus on core trend-following parameters, fix TP/SL to proven v9 values grid = ParameterGrid( flip_thresholds=[0.4, 0.5, 0.6, 0.7], ma_gaps=[0.20, 0.30, 0.40, 0.50], adx_mins=[18, 21, 24, 27], long_pos_maxs=[60, 65, 70, 75], short_pos_mins=[20, 25, 30, 35], cooldowns=[1, 2, 3, 4], # Fixed to standard v9 values position_sizes=[10000], tp1_multipliers=[2.0], tp2_multipliers=[4.0], sl_multipliers=[3.0], tp1_close_percents=[60], trailing_multipliers=[1.5], vol_mins=[1.0], max_bars_list=[500], ) total_combos = grid.total_combinations() print(f"πŸ“Š Total parameter space: {total_combos:,} combinations") print(f"πŸ“¦ Chunk size: {chunk_size:,} combinations per chunk") print(f"🎯 Total chunks: {(total_combos + chunk_size - 1) // chunk_size:,}") print(f"⏱️ Estimated time: {(total_combos * 1.6) / (64 * 3600):.1f} hours with 64 cores") print() # Deploy worker scripts for worker_id in WORKERS.keys(): self.deploy_worker_script(worker_id) print() print("πŸ”„ Distributing chunks to workers...") print() # CRITICAL FIX (Dec 1, 2025): Clear pending/failed chunks to allow retry # This prevents UNIQUE constraint errors on restart conn = sqlite3.connect(self.db.db_path) c = conn.cursor() c.execute("DELETE FROM chunks WHERE status IN ('pending', 'failed')") deleted_count = c.rowcount conn.commit() conn.close() if deleted_count > 0: print(f"πŸ—‘οΈ Cleared {deleted_count} pending/failed chunks for retry") print() # CRITICAL FIX (Dec 1, 2025): Resume from existing COMPLETED chunks only # Get max chunk ID from completed/running chunks, not all chunks # This allows retrying failed/pending chunks without UNIQUE constraint errors conn = sqlite3.connect(self.db.db_path) c = conn.cursor() c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' AND status IN ('completed', 'running') ORDER BY id DESC LIMIT 1") last_chunk = c.fetchone() conn.close() if last_chunk: # Extract counter from last completed chunk ID last_counter = int(last_chunk[0].split('_')[-1]) chunk_id_counter = last_counter + 1 chunk_start = chunk_id_counter * chunk_size print(f"πŸ“‹ Resuming from chunk {chunk_id_counter} (found {last_counter + 1} completed chunks)") print(f" Starting at combo {chunk_start:,} / {total_combos:,}") else: chunk_id_counter = 0 chunk_start = 0 print(f"πŸ“‹ Starting fresh - no completed chunks found") # Split work across workers active_chunks = {} worker_list = list(WORKERS.keys()) # ['worker1', 'worker2'] while chunk_start < total_combos: chunk_end = min(chunk_start + chunk_size, total_combos) chunk_id = f"v9_chunk_{chunk_id_counter:06d}" # Round-robin assignment across both workers for balanced load worker_id = worker_list[chunk_id_counter % len(worker_list)] if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end): active_chunks[chunk_id] = worker_id chunk_id_counter += 1 chunk_start = chunk_end # CPU limit: 1 chunk per worker = ~70% CPU usage (16 cores per chunk on 32-core machines) if len(active_chunks) >= len(WORKERS) * 1: print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active (70% CPU target)") print(f"⏳ Waiting for chunks to complete...") break print() print(f"βœ… Assigned {len(active_chunks)} initial chunks") print() print("πŸ“Š Monitor progress with: python3 cluster/exploration_status.py") print("πŸ† View top strategies: sqlite3 cluster/exploration.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'") print() print("πŸ”„ Starting background monitoring thread...") # Start monitoring in background thread (Nov 30, 2025) monitor_thread = threading.Thread( target=self._monitor_chunks_background, args=(grid, chunk_size, total_combos, active_chunks, worker_list, chunk_id_counter, chunk_start, last_counter if last_chunk else None), daemon=True # Die when main program exits ) monitor_thread.start() print("βœ… Monitoring thread started - coordinator will now exit") print(" (Monitoring continues in background - check logs or dashboard)") print() print("=" * 80) # Keep coordinator alive so daemon thread can continue # Thread will exit when all work is done print("πŸ’€ Main thread sleeping - monitoring continues in background...") print(" Press Ctrl+C to stop coordinator (will stop monitoring)") print() try: monitor_thread.join() # Wait for monitoring thread to finish except KeyboardInterrupt: print("\n⚠️ Coordinator interrupted by user") print(" Workers will continue running their current chunks") print(" Restart coordinator to resume monitoring") def _monitor_chunks_background(self, grid, chunk_size, total_combos, active_chunks, worker_list, chunk_id_counter, chunk_start, last_counter): """ Background monitoring thread to detect completions and assign new chunks. This runs continuously until all chunks are processed. Uses polling (SSH checks every 60s) to detect when workers complete chunks. Args: grid: Parameter grid for generating chunks chunk_size: Number of combinations per chunk total_combos: Total parameter combinations to process active_chunks: Dict mapping chunk_id -> worker_id for currently running chunks worker_list: List of worker IDs for round-robin assignment chunk_id_counter: Current chunk counter (for generating chunk IDs) chunk_start: Current position in parameter space last_counter: Counter from last existing chunk (for progress calculation) """ import time poll_interval = 60 # Check every 60 seconds print(f"πŸ”„ Monitoring thread started (poll interval: {poll_interval}s)") print(f" Will process {total_combos:,} combinations in chunks of {chunk_size:,}") print() try: while chunk_start < total_combos or active_chunks: time.sleep(poll_interval) # Check each active chunk for completion completed = [] for chunk_id, worker_id in list(active_chunks.items()): worker = WORKERS[worker_id] workspace = worker['workspace'] # Check if results CSV exists on worker results_csv = f"{workspace}/backtester/chunk_{chunk_id}_results.csv" # Use appropriate SSH path for two-hop workers if 'ssh_hop' in worker: check_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"test -f {results_csv} && echo EXISTS\"'" else: check_cmd = f"ssh {worker['host']} 'test -f {results_csv} && echo EXISTS'" result = subprocess.run(check_cmd, shell=True, capture_output=True, text=True) if 'EXISTS' in result.stdout: print(f"βœ… Detected completion: {chunk_id} on {worker_id}") try: # Collect results back to coordinator self.collect_results(worker_id, chunk_id) completed.append(chunk_id) print(f"πŸ“₯ Collected and imported results from {chunk_id}") except Exception as e: print(f"⚠️ Error collecting {chunk_id}: {e}") # Mark as completed anyway to prevent infinite retry completed.append(chunk_id) # Remove completed chunks from active tracking for chunk_id in completed: del active_chunks[chunk_id] # Assign new chunks if we have capacity and work remaining # Maintain 1 chunk per worker for 70% CPU target while len(active_chunks) < len(WORKERS) * 1 and chunk_start < total_combos: chunk_end = min(chunk_start + chunk_size, total_combos) chunk_id = f"v9_chunk_{chunk_id_counter:06d}" # Round-robin assignment worker_id = worker_list[chunk_id_counter % len(worker_list)] if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end): active_chunks[chunk_id] = worker_id print(f"🎯 Assigned new chunk {chunk_id} to {worker_id}") chunk_id_counter += 1 chunk_start = chunk_end # Status update completed_count = chunk_id_counter - len(active_chunks) - (last_counter + 1 if last_counter is not None else 0) total_chunks = (total_combos + chunk_size - 1) // chunk_size progress = (completed_count / total_chunks) * 100 print(f"πŸ“Š Progress: {completed_count}/{total_chunks} chunks ({progress:.1f}%) | Active: {len(active_chunks)}") print() print("=" * 80) print("πŸŽ‰ COMPREHENSIVE EXPLORATION COMPLETE!") print("=" * 80) except Exception as e: print(f"❌ Monitoring thread error: {e}") import traceback traceback.print_exc() def main(): """Main coordinator entry point""" import argparse parser = argparse.ArgumentParser(description='Distributed continuous optimization coordinator') parser.add_argument('--chunk-size', type=int, default=2000, help='Number of combinations per chunk (default: 2000)') parser.add_argument('--continuous', action='store_true', help='Run continuously (not implemented yet)') args = parser.parse_args() coordinator = DistributedCoordinator() coordinator.start_comprehensive_exploration(chunk_size=args.chunk_size) if __name__ == '__main__': main()