From 323ef03f5f4cf105961aa93927664c95606f8062 Mon Sep 17 00:00:00 2001 From: mindesbunister Date: Mon, 1 Dec 2025 12:56:35 +0100 Subject: [PATCH] critical: Fix SSH timeout + resumption logic bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **SSH Command Fix:** - CRITICAL: Removed && after background command (&) - Pattern: 'cmd & echo Started' works, 'cmd && echo' waits forever - Manually tested: Works perfectly on direct SSH - Result: Chunk 0 now starts successfully on worker1 (24 processes running) **Resumption Logic Fix:** - CRITICAL: Only count completed/running chunks, not pending - Query: Added 'AND status IN (completed, running)' filter - Result: Starts from chunk 0 when no chunks complete (was skipping to chunk 3) **Database Cleanup:** - CRITICAL: Delete pending/failed chunks on coordinator start - Prevents UNIQUE constraint errors on retry - Result: Clean slate allows coordinator to assign chunks fresh **Verification:** - ✅ Chunk v9_chunk_000000: status='running', assigned_worker='worker1' - ✅ Worker1: 24 Python processes running backtester - ✅ Database: Cleaned 3 pending chunks, created 1 running chunk - ⚠️ Worker2: SSH hop still timing out (separate infrastructure issue) Files changed: - cluster/distributed_coordinator.py (3 critical fixes: line 388-401, 514-533, 507-514) --- cluster/distributed_coordinator.py | 39 ++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/cluster/distributed_coordinator.py b/cluster/distributed_coordinator.py index f066e45..565eb41 100644 --- a/cluster/distributed_coordinator.py +++ b/cluster/distributed_coordinator.py @@ -388,11 +388,17 @@ class DistributedCoordinator: ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5" # Execute command and capture result to verify it started + # CRITICAL FIX (Dec 1, 2025): Remove && because command ends with & (background) + # In single quotes, & backgrounds process and shell continues to next command (echo) if 'ssh_hop' in worker: # Worker 2 requires hop through worker 1 - ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'\"" + ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'\"" else: - ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'" + ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'" + + # DEBUG: Print command for troubleshooting + print(f" DEBUG: Executing SSH command:") + print(f" {ssh_cmd[:200]}...") # First 200 chars # Use run() to capture output and verify success # CRITICAL FIX (Dec 1, 2025): Increase timeout from 30s to 60s for nested SSH hops @@ -406,7 +412,8 @@ class DistributedCoordinator: ) # Verify worker process started - if 'Started chunk' in result.stdout: + # CRITICAL FIX (Dec 1, 2025): Look for marker without spaces (Started_chunk_) + if f'Started_chunk_{chunk_id}' in result.stdout: print(f"✅ Chunk {chunk_id} started on {worker_id} successfully") return True else: @@ -503,26 +510,38 @@ class DistributedCoordinator: print("🔄 Distributing chunks to workers...") print() - # CRITICAL FIX (Nov 30, 2025): Resume from existing chunks in database - # Get max chunk ID to avoid UNIQUE constraint errors on restart + # CRITICAL FIX (Dec 1, 2025): Clear pending/failed chunks to allow retry + # This prevents UNIQUE constraint errors on restart conn = sqlite3.connect(self.db.db_path) c = conn.cursor() - c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' ORDER BY id DESC LIMIT 1") + c.execute("DELETE FROM chunks WHERE status IN ('pending', 'failed')") + deleted_count = c.rowcount + conn.commit() + conn.close() + if deleted_count > 0: + print(f"🗑️ Cleared {deleted_count} pending/failed chunks for retry") + print() + + # CRITICAL FIX (Dec 1, 2025): Resume from existing COMPLETED chunks only + # Get max chunk ID from completed/running chunks, not all chunks + # This allows retrying failed/pending chunks without UNIQUE constraint errors + conn = sqlite3.connect(self.db.db_path) + c = conn.cursor() + c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' AND status IN ('completed', 'running') ORDER BY id DESC LIMIT 1") last_chunk = c.fetchone() conn.close() if last_chunk: - # Extract counter from last chunk ID (e.g., "v9_chunk_000042" -> 42) + # Extract counter from last completed chunk ID last_counter = int(last_chunk[0].split('_')[-1]) chunk_id_counter = last_counter + 1 - # Resume from where we left off in the parameter space chunk_start = chunk_id_counter * chunk_size - print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} existing chunks)") + print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} completed chunks)") print(f" Starting at combo {chunk_start:,} / {total_combos:,}") else: chunk_id_counter = 0 chunk_start = 0 - print(f"📋 Starting fresh - no existing chunks found") + print(f"📋 Starting fresh - no completed chunks found") # Split work across workers active_chunks = {}