Files
trading_bot_v4/cluster/distributed_coordinator.py
mindesbunister 323ef03f5f critical: Fix SSH timeout + resumption logic bugs
**SSH Command Fix:**
- CRITICAL: Removed && after background command (&)
- Pattern: 'cmd & echo Started' works, 'cmd && echo' waits forever
- Manually tested: Works perfectly on direct SSH
- Result: Chunk 0 now starts successfully on worker1 (24 processes running)

**Resumption Logic Fix:**
- CRITICAL: Only count completed/running chunks, not pending
- Query: Added 'AND status IN (completed, running)' filter
- Result: Starts from chunk 0 when no chunks complete (was skipping to chunk 3)

**Database Cleanup:**
- CRITICAL: Delete pending/failed chunks on coordinator start
- Prevents UNIQUE constraint errors on retry
- Result: Clean slate allows coordinator to assign chunks fresh

**Verification:**
-  Chunk v9_chunk_000000: status='running', assigned_worker='worker1'
-  Worker1: 24 Python processes running backtester
-  Database: Cleaned 3 pending chunks, created 1 running chunk
- ⚠️  Worker2: SSH hop still timing out (separate infrastructure issue)

Files changed:
- cluster/distributed_coordinator.py (3 critical fixes: line 388-401, 514-533, 507-514)
2025-12-01 12:56:35 +01:00

717 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Distributed Continuous Optimization Coordinator
Extends comprehensive_sweep.py to distribute massive parameter grids
across 2 EPYC servers (64 cores total) for 24/7 strategy discovery.
Architecture:
1. Master generates parameter grid (millions of combinations)
2. Splits into chunks (~10,000 combos per chunk)
3. Distributes chunks to workers via SSH
4. Workers run modified comprehensive_sweep on their chunk
5. Master aggregates results, identifies top performers
6. Master generates next exploration batch (nearby good configs)
7. Repeat forever - continuous improvement
Integration with Existing System:
- Uses simulator.py and MoneyLineInputs from /home/comprehensive_sweep/backtester/
- Preserves comprehensive_sweep.py output format (CSV with 14 params)
- Works with existing .venv and data files on EPYC
- Backwards compatible - can still run comprehensive_sweep.py standalone
"""
import sqlite3
import subprocess
import json
import time
import itertools
import hashlib
import threading # ADDED Nov 30, 2025: Background monitoring
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
# Worker Configuration
WORKERS = {
'worker1': {
'host': 'root@10.10.254.106',
'cores': 32, # Full 32 threads available
'workspace': '/home/comprehensive_sweep',
'venv_path': 'backtester/.venv/bin/activate', # Relative to workspace
'ssh_key': None, # Use default key
},
'worker2': {
'host': 'root@10.20.254.100',
'cores': 32, # Full 32 threads available
'workspace': '/home/backtest_dual/backtest', # CORRECTED: Actual path on bd-host01
'venv_path': '.venv/bin/activate', # CRITICAL FIX (Nov 30): Worker2 has venv at workspace root, not in backtester/
'ssh_hop': 'root@10.10.254.106', # Connect through worker1
'ssh_key': None,
}
}
CLUSTER_DIR = Path(__file__).parent
RESULTS_DIR = CLUSTER_DIR / 'distributed_results'
DB_PATH = CLUSTER_DIR / 'exploration.db'
@dataclass
class ParameterGrid:
"""Full parameter space for comprehensive sweep"""
flip_thresholds: List[float]
ma_gaps: List[float]
adx_mins: List[int]
long_pos_maxs: List[int]
short_pos_mins: List[int]
cooldowns: List[int]
position_sizes: List[int]
tp1_multipliers: List[float]
tp2_multipliers: List[float]
sl_multipliers: List[float]
tp1_close_percents: List[int]
trailing_multipliers: List[float]
vol_mins: List[float]
max_bars_list: List[int]
def total_combinations(self) -> int:
"""Calculate total parameter space size"""
return (
len(self.flip_thresholds) * len(self.ma_gaps) * len(self.adx_mins) *
len(self.long_pos_maxs) * len(self.short_pos_mins) * len(self.cooldowns) *
len(self.position_sizes) * len(self.tp1_multipliers) * len(self.tp2_multipliers) *
len(self.sl_multipliers) * len(self.tp1_close_percents) *
len(self.trailing_multipliers) * len(self.vol_mins) * len(self.max_bars_list)
)
def to_dict(self) -> Dict[str, List]:
"""Convert to dict for JSON serialization"""
return {
'flip_thresholds': self.flip_thresholds,
'ma_gaps': self.ma_gaps,
'adx_mins': self.adx_mins,
'long_pos_maxs': self.long_pos_maxs,
'short_pos_mins': self.short_pos_mins,
'cooldowns': self.cooldowns,
'position_sizes': self.position_sizes,
'tp1_multipliers': self.tp1_multipliers,
'tp2_multipliers': self.tp2_multipliers,
'sl_multipliers': self.sl_multipliers,
'tp1_close_percents': self.tp1_close_percents,
'trailing_multipliers': self.trailing_multipliers,
'vol_mins': self.vol_mins,
'max_bars_list': self.max_bars_list,
}
class ExplorationDatabase:
"""Track all tested strategies and exploration progress"""
def __init__(self, db_path: Path):
self.db_path = db_path
self.init_db()
def init_db(self):
"""Create tables"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Strategies table - all tested configurations
c.execute('''
CREATE TABLE IF NOT EXISTS strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
param_hash TEXT UNIQUE NOT NULL,
indicator_type TEXT NOT NULL,
params_json TEXT NOT NULL,
trades INTEGER,
win_rate REAL,
total_pnl REAL,
pnl_per_1k REAL,
profit_factor REAL,
max_drawdown REAL,
sharpe_ratio REAL,
tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
worker_id TEXT,
chunk_id TEXT
)
''')
# Exploration chunks - work distribution tracking
c.execute('''
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
indicator_type TEXT NOT NULL,
grid_json TEXT NOT NULL,
chunk_start INTEGER NOT NULL,
chunk_end INTEGER NOT NULL,
total_combos INTEGER NOT NULL,
assigned_worker TEXT,
status TEXT DEFAULT 'pending',
started_at TIMESTAMP,
completed_at TIMESTAMP,
best_pnl_in_chunk REAL,
results_csv_path TEXT
)
''')
# Exploration phases - high-level progress
c.execute('''
CREATE TABLE IF NOT EXISTS phases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phase_name TEXT NOT NULL,
indicator_type TEXT NOT NULL,
grid_json TEXT NOT NULL,
total_combos INTEGER NOT NULL,
completed_combos INTEGER DEFAULT 0,
best_pnl_overall REAL DEFAULT 0,
best_params_json TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
estimated_completion TIMESTAMP,
actual_completion TIMESTAMP
)
''')
# Create indexes for fast queries
c.execute('CREATE INDEX IF NOT EXISTS idx_pnl_per_1k ON strategies(pnl_per_1k DESC)')
c.execute('CREATE INDEX IF NOT EXISTS idx_indicator_type ON strategies(indicator_type)')
c.execute('CREATE INDEX IF NOT EXISTS idx_chunk_status ON chunks(status)')
conn.commit()
conn.close()
def record_chunk(self, chunk_id: str, indicator_type: str, grid: ParameterGrid,
chunk_start: int, chunk_end: int, assigned_worker: str) -> None:
"""Record new chunk assigned to worker"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
INSERT INTO chunks (id, indicator_type, grid_json, chunk_start, chunk_end,
total_combos, assigned_worker, status, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?)
''', (chunk_id, indicator_type, json.dumps(grid.to_dict()), chunk_start, chunk_end,
chunk_end - chunk_start, assigned_worker, datetime.now()))
conn.commit()
conn.close()
def complete_chunk(self, chunk_id: str, results_csv_path: str, best_pnl: float) -> None:
"""Mark chunk as completed with results"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
UPDATE chunks
SET status='completed', completed_at=?, results_csv_path=?, best_pnl_in_chunk=?
WHERE id=?
''', (datetime.now(), results_csv_path, best_pnl, chunk_id))
conn.commit()
conn.close()
def import_results_csv(self, csv_path: str, worker_id: str, chunk_id: str) -> int:
"""Import CSV results from comprehensive_sweep into strategies table"""
import csv
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
imported = 0
with open(csv_path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
# Create parameter hash for deduplication
params = {k: v for k, v in row.items() if k not in [
'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k',
'profit_factor', 'max_drawdown', 'sharpe_ratio'
]}
param_hash = hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest()
try:
c.execute('''
INSERT INTO strategies (
param_hash, indicator_type, params_json,
trades, win_rate, total_pnl, pnl_per_1k,
profit_factor, max_drawdown, sharpe_ratio,
worker_id, chunk_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
param_hash, 'v9_moneyline', json.dumps(params),
int(row['trades']), float(row['win_rate']), float(row['total_pnl']),
float(row['pnl_per_1k']), float(row.get('profit_factor', 0)),
float(row.get('max_drawdown', 0)), float(row.get('sharpe_ratio', 0)),
worker_id, chunk_id
))
imported += 1
except sqlite3.IntegrityError:
# Duplicate param_hash - already tested this config
pass
conn.commit()
conn.close()
return imported
def get_top_strategies(self, limit: int = 100) -> List[Dict]:
"""Get top performing strategies across all tested"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''
SELECT indicator_type, params_json, trades, win_rate, total_pnl, pnl_per_1k,
profit_factor, max_drawdown, sharpe_ratio, tested_at
FROM strategies
WHERE trades >= 700 -- Statistical significance
AND win_rate >= 0.50 AND win_rate <= 0.70 -- Realistic
AND profit_factor >= 1.2 -- Minimum edge
ORDER BY pnl_per_1k DESC
LIMIT ?
''', (limit,))
rows = c.fetchall()
conn.close()
results = []
for row in rows:
results.append({
'indicator_type': row[0],
'params': json.loads(row[1]),
'trades': row[2],
'win_rate': row[3],
'total_pnl': row[4],
'pnl_per_1k': row[5],
'profit_factor': row[6],
'max_drawdown': row[7],
'sharpe_ratio': row[8],
'tested_at': row[9],
})
return results
class DistributedCoordinator:
"""Coordinates distributed parameter sweeps across EPYC servers"""
def __init__(self):
self.db = ExplorationDatabase(DB_PATH)
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
def ssh_command(self, worker_id: str, command: str) -> subprocess.CompletedProcess:
"""Execute command on worker via SSH"""
worker = WORKERS[worker_id]
# CRITICAL FIX (Dec 1, 2025): Add SSH options to prevent prompts and improve reliability
ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5"
if 'ssh_hop' in worker:
# Worker 2 requires hop through worker 1
# CRITICAL FIX (Nov 29, 2025): Use double-nested quotes for 2-hop SSH
# Single quotes don't pass command to inner SSH properly
ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{command}'\""
else:
ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{command}'"
return subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True)
def deploy_worker_script(self, worker_id: str) -> bool:
"""Deploy distributed_worker.py to EPYC server"""
worker = WORKERS[worker_id]
script_path = CLUSTER_DIR / 'distributed_worker.py'
# Copy script to worker's comprehensive_sweep directory
target = f"{worker['workspace']}/backtester/scripts/distributed_worker.py"
if 'ssh_hop' in worker:
# Two-hop copy for worker2
print(f"📤 Copying worker script to {worker_id} via hop...")
# Copy to worker1 first
subprocess.run(f"scp {script_path} {WORKERS['worker1']['host']}:/tmp/", shell=True)
# Then copy from worker1 to worker2
self.ssh_command('worker1', f"scp /tmp/distributed_worker.py {worker['host']}:{target}")
else:
print(f"📤 Copying worker script to {worker_id}...")
subprocess.run(f"scp {script_path} {worker['host']}:{target}", shell=True)
print(f"✅ Worker script deployed to {worker_id}")
return True
def assign_chunk(self, worker_id: str, chunk_id: str, grid: ParameterGrid,
chunk_start: int, chunk_end: int) -> bool:
"""Assign parameter chunk to worker for processing"""
worker = WORKERS[worker_id]
# Record in database
self.db.record_chunk(chunk_id, 'v9_moneyline', grid, chunk_start, chunk_end, worker_id)
# Create chunk specification JSON
chunk_spec = {
'chunk_id': chunk_id,
'chunk_start': chunk_start,
'chunk_end': chunk_end,
'grid': grid.to_dict(),
'num_workers': worker['cores'],
}
chunk_json_path = RESULTS_DIR / f"{chunk_id}_spec.json"
with open(chunk_json_path, 'w') as f:
json.dump(chunk_spec, f, indent=2)
# Copy chunk spec to worker
target_json = f"{worker['workspace']}/chunk_{chunk_id}.json"
if 'ssh_hop' in worker:
# Two-hop copy
subprocess.run(f"scp {chunk_json_path} {WORKERS['worker1']['host']}:/tmp/", shell=True)
self.ssh_command('worker1', f"scp /tmp/{chunk_id}_spec.json {worker['host']}:{target_json}")
else:
subprocess.run(f"scp {chunk_json_path} {worker['host']}:{target_json}", shell=True)
# Execute distributed_worker.py on worker
# CRITICAL FIX (Nov 30): Use per-worker venv_path to support heterogeneous cluster configurations
# Worker1: backtester/.venv/bin/activate (venv inside backtester/)
# Worker2: .venv/bin/activate (venv at workspace root)
# PROVEN WORKING PATTERN (Nov 30): Manual SSH commands succeeded with this exact structure
venv_path = worker.get('venv_path', 'backtester/.venv/bin/activate') # Default to worker1 pattern
# Build command exactly as proven in manual tests
# CRITICAL: Use nohup with explicit background redirect to detach properly
cmd = (f"cd {worker['workspace']} && "
f"source {venv_path} && "
f"nohup python3 backtester/scripts/distributed_worker.py chunk_{chunk_id}.json "
f"> /tmp/{chunk_id}.log 2>&1 &")
print(f"🚀 Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...")
# CRITICAL FIX (Dec 1, 2025): Add SSH options for reliability and timeout handling
ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5"
# Execute command and capture result to verify it started
# CRITICAL FIX (Dec 1, 2025): Remove && because command ends with & (background)
# In single quotes, & backgrounds process and shell continues to next command (echo)
if 'ssh_hop' in worker:
# Worker 2 requires hop through worker 1
ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'\""
else:
ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'"
# DEBUG: Print command for troubleshooting
print(f" DEBUG: Executing SSH command:")
print(f" {ssh_cmd[:200]}...") # First 200 chars
# Use run() to capture output and verify success
# CRITICAL FIX (Dec 1, 2025): Increase timeout from 30s to 60s for nested SSH hops
try:
result = subprocess.run(
ssh_cmd,
shell=True,
capture_output=True,
text=True,
timeout=60 # 60 second timeout (was 30s, too short for 2-hop SSH)
)
# Verify worker process started
# CRITICAL FIX (Dec 1, 2025): Look for marker without spaces (Started_chunk_)
if f'Started_chunk_{chunk_id}' in result.stdout:
print(f"✅ Chunk {chunk_id} started on {worker_id} successfully")
return True
else:
print(f"❌ FAILED to start chunk {chunk_id} on {worker_id}")
print(f" stdout: {result.stdout}")
print(f" stderr: {result.stderr}")
return False
except subprocess.TimeoutExpired:
print(f"⚠️ SSH command timed out for {chunk_id} on {worker_id}")
print(f" This usually means SSH hop is misconfigured or slow")
return False
def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]:
"""Collect CSV results from worker"""
worker = WORKERS[worker_id]
# Check if results file exists on worker (in backtester/ subdirectory)
results_csv = f"{worker['workspace']}/backtester/chunk_{chunk_id}_results.csv"
check_cmd = f"test -f {results_csv} && echo 'exists'"
result = self.ssh_command(worker_id, check_cmd)
if 'exists' not in result.stdout:
return None # Results not ready yet
# Copy results back to master
local_csv = RESULTS_DIR / f"{chunk_id}_results.csv"
if 'ssh_hop' in worker:
# Two-hop copy back
self.ssh_command('worker1', f"scp {worker['host']}:{results_csv} /tmp/")
subprocess.run(f"scp {WORKERS['worker1']['host']}:/tmp/chunk_{chunk_id}_results.csv {local_csv}", shell=True)
else:
subprocess.run(f"scp {worker['host']}:{results_csv} {local_csv}", shell=True)
print(f"📥 Collected results from {worker_id} chunk {chunk_id}")
# Import into database
imported = self.db.import_results_csv(str(local_csv), worker_id, chunk_id)
print(f"📊 Imported {imported} unique strategies from {chunk_id}")
# Get best P&L from CSV for chunk tracking
import csv
with open(local_csv, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
best_pnl = max(float(row['pnl_per_1k']) for row in rows) if rows else 0
self.db.complete_chunk(chunk_id, str(local_csv), best_pnl)
return str(local_csv)
def start_comprehensive_exploration(self, chunk_size: int = 10000):
"""Start massive comprehensive parameter sweep"""
print("=" * 80)
print("🚀 DISTRIBUTED COMPREHENSIVE EXPLORATION")
print("=" * 80)
print()
# v9 Money Line parameter grid (Nov 30, 2025)
# 6 swept parameters × 4 values each = 4,096 combinations
# Focus on core trend-following parameters, fix TP/SL to proven v9 values
grid = ParameterGrid(
flip_thresholds=[0.4, 0.5, 0.6, 0.7],
ma_gaps=[0.20, 0.30, 0.40, 0.50],
adx_mins=[18, 21, 24, 27],
long_pos_maxs=[60, 65, 70, 75],
short_pos_mins=[20, 25, 30, 35],
cooldowns=[1, 2, 3, 4],
# Fixed to standard v9 values
position_sizes=[10000],
tp1_multipliers=[2.0],
tp2_multipliers=[4.0],
sl_multipliers=[3.0],
tp1_close_percents=[60],
trailing_multipliers=[1.5],
vol_mins=[1.0],
max_bars_list=[500],
)
total_combos = grid.total_combinations()
print(f"📊 Total parameter space: {total_combos:,} combinations")
print(f"📦 Chunk size: {chunk_size:,} combinations per chunk")
print(f"🎯 Total chunks: {(total_combos + chunk_size - 1) // chunk_size:,}")
print(f"⏱️ Estimated time: {(total_combos * 1.6) / (64 * 3600):.1f} hours with 64 cores")
print()
# Deploy worker scripts
for worker_id in WORKERS.keys():
self.deploy_worker_script(worker_id)
print()
print("🔄 Distributing chunks to workers...")
print()
# CRITICAL FIX (Dec 1, 2025): Clear pending/failed chunks to allow retry
# This prevents UNIQUE constraint errors on restart
conn = sqlite3.connect(self.db.db_path)
c = conn.cursor()
c.execute("DELETE FROM chunks WHERE status IN ('pending', 'failed')")
deleted_count = c.rowcount
conn.commit()
conn.close()
if deleted_count > 0:
print(f"🗑️ Cleared {deleted_count} pending/failed chunks for retry")
print()
# CRITICAL FIX (Dec 1, 2025): Resume from existing COMPLETED chunks only
# Get max chunk ID from completed/running chunks, not all chunks
# This allows retrying failed/pending chunks without UNIQUE constraint errors
conn = sqlite3.connect(self.db.db_path)
c = conn.cursor()
c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' AND status IN ('completed', 'running') ORDER BY id DESC LIMIT 1")
last_chunk = c.fetchone()
conn.close()
if last_chunk:
# Extract counter from last completed chunk ID
last_counter = int(last_chunk[0].split('_')[-1])
chunk_id_counter = last_counter + 1
chunk_start = chunk_id_counter * chunk_size
print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} completed chunks)")
print(f" Starting at combo {chunk_start:,} / {total_combos:,}")
else:
chunk_id_counter = 0
chunk_start = 0
print(f"📋 Starting fresh - no completed chunks found")
# Split work across workers
active_chunks = {}
worker_list = list(WORKERS.keys()) # ['worker1', 'worker2']
while chunk_start < total_combos:
chunk_end = min(chunk_start + chunk_size, total_combos)
chunk_id = f"v9_chunk_{chunk_id_counter:06d}"
# Round-robin assignment across both workers for balanced load
worker_id = worker_list[chunk_id_counter % len(worker_list)]
if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end):
active_chunks[chunk_id] = worker_id
chunk_id_counter += 1
chunk_start = chunk_end
# CPU limit: 1 chunk per worker = ~70% CPU usage (16 cores per chunk on 32-core machines)
if len(active_chunks) >= len(WORKERS) * 1:
print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active (70% CPU target)")
print(f"⏳ Waiting for chunks to complete...")
break
print()
print(f"✅ Assigned {len(active_chunks)} initial chunks")
print()
print("📊 Monitor progress with: python3 cluster/exploration_status.py")
print("🏆 View top strategies: sqlite3 cluster/exploration.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'")
print()
print("🔄 Starting background monitoring thread...")
# Start monitoring in background thread (Nov 30, 2025)
monitor_thread = threading.Thread(
target=self._monitor_chunks_background,
args=(grid, chunk_size, total_combos, active_chunks, worker_list,
chunk_id_counter, chunk_start, last_counter if last_chunk else None),
daemon=True # Die when main program exits
)
monitor_thread.start()
print("✅ Monitoring thread started - coordinator will now exit")
print(" (Monitoring continues in background - check logs or dashboard)")
print()
print("=" * 80)
# Keep coordinator alive so daemon thread can continue
# Thread will exit when all work is done
print("💤 Main thread sleeping - monitoring continues in background...")
print(" Press Ctrl+C to stop coordinator (will stop monitoring)")
print()
try:
monitor_thread.join() # Wait for monitoring thread to finish
except KeyboardInterrupt:
print("\n⚠️ Coordinator interrupted by user")
print(" Workers will continue running their current chunks")
print(" Restart coordinator to resume monitoring")
def _monitor_chunks_background(self, grid, chunk_size, total_combos, active_chunks,
worker_list, chunk_id_counter, chunk_start, last_counter):
"""
Background monitoring thread to detect completions and assign new chunks.
This runs continuously until all chunks are processed.
Uses polling (SSH checks every 60s) to detect when workers complete chunks.
Args:
grid: Parameter grid for generating chunks
chunk_size: Number of combinations per chunk
total_combos: Total parameter combinations to process
active_chunks: Dict mapping chunk_id -> worker_id for currently running chunks
worker_list: List of worker IDs for round-robin assignment
chunk_id_counter: Current chunk counter (for generating chunk IDs)
chunk_start: Current position in parameter space
last_counter: Counter from last existing chunk (for progress calculation)
"""
import time
poll_interval = 60 # Check every 60 seconds
print(f"🔄 Monitoring thread started (poll interval: {poll_interval}s)")
print(f" Will process {total_combos:,} combinations in chunks of {chunk_size:,}")
print()
try:
while chunk_start < total_combos or active_chunks:
time.sleep(poll_interval)
# Check each active chunk for completion
completed = []
for chunk_id, worker_id in list(active_chunks.items()):
worker = WORKERS[worker_id]
workspace = worker['workspace']
# Check if results CSV exists on worker
results_csv = f"{workspace}/backtester/chunk_{chunk_id}_results.csv"
# Use appropriate SSH path for two-hop workers
if 'ssh_hop' in worker:
check_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"test -f {results_csv} && echo EXISTS\"'"
else:
check_cmd = f"ssh {worker['host']} 'test -f {results_csv} && echo EXISTS'"
result = subprocess.run(check_cmd, shell=True, capture_output=True, text=True)
if 'EXISTS' in result.stdout:
print(f"✅ Detected completion: {chunk_id} on {worker_id}")
try:
# Collect results back to coordinator
self.collect_results(worker_id, chunk_id)
completed.append(chunk_id)
print(f"📥 Collected and imported results from {chunk_id}")
except Exception as e:
print(f"⚠️ Error collecting {chunk_id}: {e}")
# Mark as completed anyway to prevent infinite retry
completed.append(chunk_id)
# Remove completed chunks from active tracking
for chunk_id in completed:
del active_chunks[chunk_id]
# Assign new chunks if we have capacity and work remaining
# Maintain 1 chunk per worker for 70% CPU target
while len(active_chunks) < len(WORKERS) * 1 and chunk_start < total_combos:
chunk_end = min(chunk_start + chunk_size, total_combos)
chunk_id = f"v9_chunk_{chunk_id_counter:06d}"
# Round-robin assignment
worker_id = worker_list[chunk_id_counter % len(worker_list)]
if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end):
active_chunks[chunk_id] = worker_id
print(f"🎯 Assigned new chunk {chunk_id} to {worker_id}")
chunk_id_counter += 1
chunk_start = chunk_end
# Status update
completed_count = chunk_id_counter - len(active_chunks) - (last_counter + 1 if last_counter is not None else 0)
total_chunks = (total_combos + chunk_size - 1) // chunk_size
progress = (completed_count / total_chunks) * 100
print(f"📊 Progress: {completed_count}/{total_chunks} chunks ({progress:.1f}%) | Active: {len(active_chunks)}")
print()
print("=" * 80)
print("🎉 COMPREHENSIVE EXPLORATION COMPLETE!")
print("=" * 80)
except Exception as e:
print(f"❌ Monitoring thread error: {e}")
import traceback
traceback.print_exc()
def main():
"""Main coordinator entry point"""
import argparse
parser = argparse.ArgumentParser(description='Distributed continuous optimization coordinator')
parser.add_argument('--chunk-size', type=int, default=2000,
help='Number of combinations per chunk (default: 2000)')
parser.add_argument('--continuous', action='store_true',
help='Run continuously (not implemented yet)')
args = parser.parse_args()
coordinator = DistributedCoordinator()
coordinator.start_comprehensive_exploration(chunk_size=args.chunk_size)
if __name__ == '__main__':
main()