fix: EPYC cluster SSH timeout - increase timeout 30s→60s + add SSH options

CRITICAL FIX (Dec 1, 2025): Cluster start was failing with 'operation failed'

Problem:
- SSH commands timing out after 30s (too short for 2-hop SSH to worker2)
- Missing SSH options caused prompts/delays
- Result: Coordinator failed to start worker processes

Solution:
- Increased timeout from 30s to 60s for nested SSH hops
- Added SSH options: -o StrictHostKeyChecking=no -o ConnectTimeout=10
- Applied options to both ssh_command() and worker startup commands

Verification (Dec 1, 09:40):
- Worker1: 23 processes running (chunk 0-2000)
- Worker2: 24 processes running (chunk 2000-4000)
- Cluster status: ACTIVE with 2 workers
- Both chunks processing successfully

Files changed:
- cluster/distributed_coordinator.py (lines 302-314, 388-414)
This commit is contained in:
mindesbunister
2025-12-01 09:41:42 +01:00
parent 549fe8e077
commit ef371a19b9

View File

@@ -303,13 +303,16 @@ class DistributedCoordinator:
"""Execute command on worker via SSH""" """Execute command on worker via SSH"""
worker = WORKERS[worker_id] worker = WORKERS[worker_id]
# CRITICAL FIX (Dec 1, 2025): Add SSH options to prevent prompts and improve reliability
ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5"
if 'ssh_hop' in worker: if 'ssh_hop' in worker:
# Worker 2 requires hop through worker 1 # Worker 2 requires hop through worker 1
# CRITICAL FIX (Nov 29, 2025): Use double-nested quotes for 2-hop SSH # CRITICAL FIX (Nov 29, 2025): Use double-nested quotes for 2-hop SSH
# Single quotes don't pass command to inner SSH properly # Single quotes don't pass command to inner SSH properly
ssh_cmd = f"ssh {worker['ssh_hop']} \"ssh {worker['host']} '{command}'\"" ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{command}'\""
else: else:
ssh_cmd = f"ssh {worker['host']} '{command}'" ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{command}'"
return subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True) return subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True)
@@ -381,21 +384,25 @@ class DistributedCoordinator:
print(f"🚀 Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...") print(f"🚀 Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...")
# CRITICAL FIX (Dec 1, 2025): Add SSH options for reliability and timeout handling
ssh_opts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -o ServerAliveInterval=5"
# Execute command and capture result to verify it started # Execute command and capture result to verify it started
if 'ssh_hop' in worker: if 'ssh_hop' in worker:
# Worker 2 requires hop through worker 1 # Worker 2 requires hop through worker 1
ssh_cmd = f"ssh {worker['ssh_hop']} \"ssh {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'\"" ssh_cmd = f"ssh {ssh_opts} {worker['ssh_hop']} \"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'\""
else: else:
ssh_cmd = f"ssh {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'" ssh_cmd = f"ssh {ssh_opts} {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'"
# Use run() to capture output and verify success # Use run() to capture output and verify success
# CRITICAL FIX (Dec 1, 2025): Increase timeout from 30s to 60s for nested SSH hops
try: try:
result = subprocess.run( result = subprocess.run(
ssh_cmd, ssh_cmd,
shell=True, shell=True,
capture_output=True, capture_output=True,
text=True, text=True,
timeout=30 # 30 second timeout timeout=60 # 60 second timeout (was 30s, too short for 2-hop SSH)
) )
# Verify worker process started # Verify worker process started
@@ -409,6 +416,7 @@ class DistributedCoordinator:
return False return False
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
print(f"⚠️ SSH command timed out for {chunk_id} on {worker_id}") print(f"⚠️ SSH command timed out for {chunk_id} on {worker_id}")
print(f" This usually means SSH hop is misconfigured or slow")
return False return False
def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]: def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]: