diff --git a/cluster/distributed_coordinator.py b/cluster/distributed_coordinator.py index f066e45..565eb41 100644 --- a/cluster/distributed_coordinator.py +++ b/cluster/distributed_coordinator.py @@ -388,11 +388,17 @@ class DistributedCoordinator: ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5" # Execute command and capture result to verify it started + # CRITICAL FIX (Dec 1, 2025): Remove && because command ends with & (background) + # In single quotes, & backgrounds process and shell continues to next command (echo) if 'ssh_hop' in worker: # Worker 2 requires hop through worker 1 - ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'\"" + ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'\"" else: - ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'" + ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd} echo Started_chunk_{chunk_id}'" + + # DEBUG: Print command for troubleshooting + print(f" DEBUG: Executing SSH command:") + print(f" {ssh_cmd[:200]}...") # First 200 chars # Use run() to capture output and verify success # CRITICAL FIX (Dec 1, 2025): Increase timeout from 30s to 60s for nested SSH hops @@ -406,7 +412,8 @@ class DistributedCoordinator: ) # Verify worker process started - if 'Started chunk' in result.stdout: + # CRITICAL FIX (Dec 1, 2025): Look for marker without spaces (Started_chunk_) + if f'Started_chunk_{chunk_id}' in result.stdout: print(f"✅ Chunk {chunk_id} started on {worker_id} successfully") return True else: @@ -503,26 +510,38 @@ class DistributedCoordinator: print("🔄 Distributing chunks to workers...") print() - # CRITICAL FIX (Nov 30, 2025): Resume from existing chunks in database - # Get max chunk ID to avoid UNIQUE constraint errors on restart + # CRITICAL FIX (Dec 1, 2025): Clear pending/failed chunks to allow retry + # This prevents UNIQUE constraint errors on restart conn = sqlite3.connect(self.db.db_path) c = conn.cursor() - c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' ORDER BY id DESC LIMIT 1") + c.execute("DELETE FROM chunks WHERE status IN ('pending', 'failed')") + deleted_count = c.rowcount + conn.commit() + conn.close() + if deleted_count > 0: + print(f"🗑️ Cleared {deleted_count} pending/failed chunks for retry") + print() + + # CRITICAL FIX (Dec 1, 2025): Resume from existing COMPLETED chunks only + # Get max chunk ID from completed/running chunks, not all chunks + # This allows retrying failed/pending chunks without UNIQUE constraint errors + conn = sqlite3.connect(self.db.db_path) + c = conn.cursor() + c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' AND status IN ('completed', 'running') ORDER BY id DESC LIMIT 1") last_chunk = c.fetchone() conn.close() if last_chunk: - # Extract counter from last chunk ID (e.g., "v9_chunk_000042" -> 42) + # Extract counter from last completed chunk ID last_counter = int(last_chunk[0].split('_')[-1]) chunk_id_counter = last_counter + 1 - # Resume from where we left off in the parameter space chunk_start = chunk_id_counter * chunk_size - print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} existing chunks)") + print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} completed chunks)") print(f" Starting at combo {chunk_start:,} / {total_combos:,}") else: chunk_id_counter = 0 chunk_start = 0 - print(f"📋 Starting fresh - no existing chunks found") + print(f"📋 Starting fresh - no completed chunks found") # Split work across workers active_chunks = {}