critical: Fix SSH timeout + resumption logic bugs

**SSH Command Fix:**
- CRITICAL: Removed && after background command (&)
- Pattern: 'cmd & echo Started' works, 'cmd && echo' waits forever
- Manually tested: Works perfectly on direct SSH
- Result: Chunk 0 now starts successfully on worker1 (24 processes running)

**Resumption Logic Fix:**
- CRITICAL: Only count completed/running chunks, not pending
- Query: Added 'AND status IN (completed, running)' filter
- Result: Starts from chunk 0 when no chunks complete (was skipping to chunk 3)

**Database Cleanup:**
- CRITICAL: Delete pending/failed chunks on coordinator start
- Prevents UNIQUE constraint errors on retry
- Result: Clean slate allows coordinator to assign chunks fresh

**Verification:**
-  Chunk v9_chunk_000000: status='running', assigned_worker='worker1'
-  Worker1: 24 Python processes running backtester
-  Database: Cleaned 3 pending chunks, created 1 running chunk
- ⚠️  Worker2: SSH hop still timing out (separate infrastructure issue)

Files changed:
- cluster/distributed_coordinator.py (3 critical fixes: line 388-401, 514-533, 507-514)
This commit is contained in:
mindesbunister
2025-12-01 12:56:35 +01:00
parent 1f83a7d7c4
commit 323ef03f5f

View File

@@ -388,11 +388,17 @@ class DistributedCoordinator:
ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5"
# Execute command and capture result to verify it started
# CRITICAL FIX (Dec 1, 2025): Remove && because command ends with & (background)
# In single quotes, & backgrounds process and shell continues to next command (echo)
if 'ssh_hop' in worker:
# Worker 2 requires hop through worker 1
ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'\""
ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'\""
else:
ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'"
ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'"
# DEBUG: Print command for troubleshooting
print(f" DEBUG: Executing SSH command:")
print(f" {ssh_cmd[:200]}...") # First 200 chars
# Use run() to capture output and verify success
# CRITICAL FIX (Dec 1, 2025): Increase timeout from 30s to 60s for nested SSH hops
@@ -406,7 +412,8 @@ class DistributedCoordinator:
)
# Verify worker process started
if 'Started chunk' in result.stdout:
# CRITICAL FIX (Dec 1, 2025): Look for marker without spaces (Started_chunk_)
if f'Started_chunk_{chunk_id}' in result.stdout:
print(f"✅ Chunk {chunk_id} started on {worker_id} successfully")
return True
else:
@@ -503,26 +510,38 @@ class DistributedCoordinator:
print("🔄 Distributing chunks to workers...")
print()
# CRITICAL FIX (Nov 30, 2025): Resume from existing chunks in database
# Get max chunk ID to avoid UNIQUE constraint errors on restart
# CRITICAL FIX (Dec 1, 2025): Clear pending/failed chunks to allow retry
# This prevents UNIQUE constraint errors on restart
conn = sqlite3.connect(self.db.db_path)
c = conn.cursor()
c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' ORDER BY id DESC LIMIT 1")
c.execute("DELETE FROM chunks WHERE status IN ('pending', 'failed')")
deleted_count = c.rowcount
conn.commit()
conn.close()
if deleted_count > 0:
print(f"🗑️ Cleared {deleted_count} pending/failed chunks for retry")
print()
# CRITICAL FIX (Dec 1, 2025): Resume from existing COMPLETED chunks only
# Get max chunk ID from completed/running chunks, not all chunks
# This allows retrying failed/pending chunks without UNIQUE constraint errors
conn = sqlite3.connect(self.db.db_path)
c = conn.cursor()
c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' AND status IN ('completed', 'running') ORDER BY id DESC LIMIT 1")
last_chunk = c.fetchone()
conn.close()
if last_chunk:
# Extract counter from last chunk ID (e.g., "v9_chunk_000042" -> 42)
# Extract counter from last completed chunk ID
last_counter = int(last_chunk[0].split('_')[-1])
chunk_id_counter = last_counter + 1
# Resume from where we left off in the parameter space
chunk_start = chunk_id_counter * chunk_size
print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} existing chunks)")
print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} completed chunks)")
print(f" Starting at combo {chunk_start:,} / {total_combos:,}")
else:
chunk_id_counter = 0
chunk_start = 0
print(f"📋 Starting fresh - no existing chunks found")
print(f"📋 Starting fresh - no completed chunks found")
# Split work across workers
active_chunks = {}