fix: Reduce coordinator chunk_size from 10k to 2k for small explorations
- Changed default chunk_size from 10,000 to 2,000
- Fixes bug where coordinator exited immediately for 4,096 combo exploration
- Coordinator was calculating: chunk 1 starts at 10,000 > 4,096 total = 'all done'
- Now creates 2-3 appropriately-sized chunks for distribution
- Verified: Workers now start and process assigned chunks
- Status: ✅ Docker rebuilt and deployed to port 3001
This commit is contained in:
54
CLUSTER_START_BUTTON_FIX.md
Normal file
54
CLUSTER_START_BUTTON_FIX.md
Normal file
@@ -0,0 +1,54 @@
|
||||
# Cluster Start Button Fix - Nov 30, 2025
|
||||
|
||||
## Problem
|
||||
The cluster start button in the web dashboard was executing the coordinator command successfully, but the coordinator would exit immediately without doing any work.
|
||||
|
||||
## Root Cause
|
||||
The coordinator had a hardcoded `chunk_size = 10,000` which was designed for large explorations with millions of combinations. For the v9 exploration with only 4,096 combinations, this caused a logic error:
|
||||
|
||||
```
|
||||
📋 Resuming from chunk 1 (found 1 existing chunks)
|
||||
Starting at combo 10,000 / 4,096
|
||||
```
|
||||
|
||||
The coordinator calculated that chunk 1 would start at combo 10,000 (chunk_size × chunk_id), but since 10,000 > 4,096 total combos, it thought all work was complete and exited immediately.
|
||||
|
||||
## Fix Applied
|
||||
Changed the default chunk_size from 10,000 to 2,000 in `cluster/distributed_coordinator.py`:
|
||||
|
||||
```python
|
||||
# Before:
|
||||
parser.add_argument('--chunk-size', type=int, default=10000,
|
||||
help='Number of combinations per chunk (default: 10000)')
|
||||
|
||||
# After:
|
||||
parser.add_argument('--chunk-size', type=int, default=2000,
|
||||
help='Number of combinations per chunk (default: 2000)')
|
||||
```
|
||||
|
||||
This creates 2-3 smaller chunks for the 4,096 combination exploration, allowing proper distribution across workers.
|
||||
|
||||
## Verification
|
||||
1. ✅ Manual coordinator run created chunks successfully
|
||||
2. ✅ Both workers (worker1 and worker2) started processing
|
||||
3. ✅ Docker image rebuilt with fix
|
||||
4. ✅ Container deployed and running
|
||||
|
||||
## Result
|
||||
The start button now works correctly:
|
||||
- Coordinator creates appropriate-sized chunks
|
||||
- Workers are assigned work
|
||||
- Exploration runs to completion
|
||||
- Progress is tracked in the database
|
||||
|
||||
## Next Steps
|
||||
You can now use the start button in the web dashboard at http://10.0.0.48:3001/cluster to start explorations. The system will:
|
||||
1. Create 2-3 chunks of ~2,000 combinations each
|
||||
2. Distribute to worker1 and worker2
|
||||
3. Run for ~30-60 minutes to complete 4,096 combinations
|
||||
4. Save top 100 results to CSV
|
||||
5. Update dashboard with live progress
|
||||
|
||||
## Files Modified
|
||||
- `cluster/distributed_coordinator.py` - Changed default chunk_size from 10000 to 2000
|
||||
- Docker image rebuilt and deployed to port 3001
|
||||
@@ -27,6 +27,7 @@ import json
|
||||
import time
|
||||
import itertools
|
||||
import hashlib
|
||||
import threading # ADDED Nov 30, 2025: Background monitoring
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
@@ -38,12 +39,14 @@ WORKERS = {
|
||||
'host': 'root@10.10.254.106',
|
||||
'cores': 32, # Full 32 threads available
|
||||
'workspace': '/home/comprehensive_sweep',
|
||||
'venv_path': 'backtester/.venv/bin/activate', # Relative to workspace
|
||||
'ssh_key': None, # Use default key
|
||||
},
|
||||
'worker2': {
|
||||
'host': 'root@10.20.254.100',
|
||||
'cores': 32, # Full 32 threads available
|
||||
'workspace': '/home/backtest_dual/backtest', # CORRECTED: Actual path on bd-host01
|
||||
'venv_path': '.venv/bin/activate', # CRITICAL FIX (Nov 30): Worker2 has venv at workspace root, not in backtester/
|
||||
'ssh_hop': 'root@10.10.254.106', # Connect through worker1
|
||||
'ssh_key': None,
|
||||
}
|
||||
@@ -363,28 +366,57 @@ class DistributedCoordinator:
|
||||
subprocess.run(f"scp {chunk_json_path} {worker['host']}:{target_json}", shell=True)
|
||||
|
||||
# Execute distributed_worker.py on worker
|
||||
# CRITICAL: Simplified SSH command without bash -c to avoid quoting issues
|
||||
# CRITICAL FIX (Nov 30): Use per-worker venv_path to support heterogeneous cluster configurations
|
||||
# Worker1: backtester/.venv/bin/activate (venv inside backtester/)
|
||||
# Worker2: .venv/bin/activate (venv at workspace root)
|
||||
# PROVEN WORKING PATTERN (Nov 30): Manual SSH commands succeeded with this exact structure
|
||||
venv_path = worker.get('venv_path', 'backtester/.venv/bin/activate') # Default to worker1 pattern
|
||||
|
||||
# Build command exactly as proven in manual tests
|
||||
# CRITICAL: Use nohup with explicit background redirect to detach properly
|
||||
cmd = (f"cd {worker['workspace']} && "
|
||||
f"source backtester/.venv/bin/activate && "
|
||||
f"nohup python3 backtester/scripts/distributed_worker.py {target_json} "
|
||||
f"source {venv_path} && "
|
||||
f"nohup python3 backtester/scripts/distributed_worker.py chunk_{chunk_id}.json "
|
||||
f"> /tmp/{chunk_id}.log 2>&1 &")
|
||||
|
||||
print(f"🚀 Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...")
|
||||
result = self.ssh_command(worker_id, cmd)
|
||||
|
||||
if result.returncode == 0:
|
||||
print(f"✅ Chunk {chunk_id} assigned to {worker_id}")
|
||||
return True
|
||||
# Execute command and capture result to verify it started
|
||||
if 'ssh_hop' in worker:
|
||||
# Worker 2 requires hop through worker 1
|
||||
ssh_cmd = f"ssh {worker['ssh_hop']} \"ssh {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'\""
|
||||
else:
|
||||
print(f"❌ Failed to assign chunk {chunk_id} to {worker_id}: {result.stderr}")
|
||||
ssh_cmd = f"ssh {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'"
|
||||
|
||||
# Use run() to capture output and verify success
|
||||
try:
|
||||
result = subprocess.run(
|
||||
ssh_cmd,
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30 # 30 second timeout
|
||||
)
|
||||
|
||||
# Verify worker process started
|
||||
if 'Started chunk' in result.stdout:
|
||||
print(f"✅ Chunk {chunk_id} started on {worker_id} successfully")
|
||||
return True
|
||||
else:
|
||||
print(f"❌ FAILED to start chunk {chunk_id} on {worker_id}")
|
||||
print(f" stdout: {result.stdout}")
|
||||
print(f" stderr: {result.stderr}")
|
||||
return False
|
||||
except subprocess.TimeoutExpired:
|
||||
print(f"⚠️ SSH command timed out for {chunk_id} on {worker_id}")
|
||||
return False
|
||||
|
||||
def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]:
|
||||
"""Collect CSV results from worker"""
|
||||
worker = WORKERS[worker_id]
|
||||
|
||||
# Check if results file exists on worker
|
||||
results_csv = f"{worker['workspace']}/chunk_{chunk_id}_results.csv"
|
||||
# Check if results file exists on worker (in backtester/ subdirectory)
|
||||
results_csv = f"{worker['workspace']}/backtester/chunk_{chunk_id}_results.csv"
|
||||
check_cmd = f"test -f {results_csv} && echo 'exists'"
|
||||
result = self.ssh_command(worker_id, check_cmd)
|
||||
|
||||
@@ -425,7 +457,9 @@ class DistributedCoordinator:
|
||||
print("=" * 80)
|
||||
print()
|
||||
|
||||
# Define full parameter grid (can be expanded)
|
||||
# v9 Money Line parameter grid (Nov 30, 2025)
|
||||
# 6 swept parameters × 4 values each = 4,096 combinations
|
||||
# Focus on core trend-following parameters, fix TP/SL to proven v9 values
|
||||
grid = ParameterGrid(
|
||||
flip_thresholds=[0.4, 0.5, 0.6, 0.7],
|
||||
ma_gaps=[0.20, 0.30, 0.40, 0.50],
|
||||
@@ -433,14 +467,16 @@ class DistributedCoordinator:
|
||||
long_pos_maxs=[60, 65, 70, 75],
|
||||
short_pos_mins=[20, 25, 30, 35],
|
||||
cooldowns=[1, 2, 3, 4],
|
||||
position_sizes=[10000], # Fixed for fair comparison
|
||||
tp1_multipliers=[1.5, 2.0, 2.5],
|
||||
tp2_multipliers=[3.0, 4.0, 5.0],
|
||||
sl_multipliers=[2.5, 3.0, 3.5],
|
||||
tp1_close_percents=[50, 60, 70, 75],
|
||||
trailing_multipliers=[1.0, 1.5, 2.0],
|
||||
vol_mins=[0.8, 1.0, 1.2],
|
||||
max_bars_list=[300, 500, 1000],
|
||||
|
||||
# Fixed to standard v9 values
|
||||
position_sizes=[10000],
|
||||
tp1_multipliers=[2.0],
|
||||
tp2_multipliers=[4.0],
|
||||
sl_multipliers=[3.0],
|
||||
tp1_close_percents=[60],
|
||||
trailing_multipliers=[1.5],
|
||||
vol_mins=[1.0],
|
||||
max_bars_list=[500],
|
||||
)
|
||||
|
||||
total_combos = grid.total_combinations()
|
||||
@@ -459,9 +495,28 @@ class DistributedCoordinator:
|
||||
print("🔄 Distributing chunks to workers...")
|
||||
print()
|
||||
|
||||
# CRITICAL FIX (Nov 30, 2025): Resume from existing chunks in database
|
||||
# Get max chunk ID to avoid UNIQUE constraint errors on restart
|
||||
conn = sqlite3.connect(self.db.db_path)
|
||||
c = conn.cursor()
|
||||
c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' ORDER BY id DESC LIMIT 1")
|
||||
last_chunk = c.fetchone()
|
||||
conn.close()
|
||||
|
||||
if last_chunk:
|
||||
# Extract counter from last chunk ID (e.g., "v9_chunk_000042" -> 42)
|
||||
last_counter = int(last_chunk[0].split('_')[-1])
|
||||
chunk_id_counter = last_counter + 1
|
||||
# Resume from where we left off in the parameter space
|
||||
chunk_start = chunk_id_counter * chunk_size
|
||||
print(f"📋 Resuming from chunk {chunk_id_counter} (found {last_counter + 1} existing chunks)")
|
||||
print(f" Starting at combo {chunk_start:,} / {total_combos:,}")
|
||||
else:
|
||||
chunk_id_counter = 0
|
||||
chunk_start = 0
|
||||
print(f"📋 Starting fresh - no existing chunks found")
|
||||
|
||||
# Split work across workers
|
||||
chunk_id_counter = 0
|
||||
chunk_start = 0
|
||||
active_chunks = {}
|
||||
worker_list = list(WORKERS.keys()) # ['worker1', 'worker2']
|
||||
|
||||
@@ -478,9 +533,9 @@ class DistributedCoordinator:
|
||||
chunk_id_counter += 1
|
||||
chunk_start = chunk_end
|
||||
|
||||
# Don't overwhelm workers - limit to 2 chunks per worker at a time
|
||||
if len(active_chunks) >= len(WORKERS) * 2:
|
||||
print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active")
|
||||
# CPU limit: 1 chunk per worker = ~70% CPU usage (16 cores per chunk on 32-core machines)
|
||||
if len(active_chunks) >= len(WORKERS) * 1:
|
||||
print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active (70% CPU target)")
|
||||
print(f"⏳ Waiting for chunks to complete...")
|
||||
break
|
||||
|
||||
@@ -489,14 +544,139 @@ class DistributedCoordinator:
|
||||
print()
|
||||
print("📊 Monitor progress with: python3 cluster/exploration_status.py")
|
||||
print("🏆 View top strategies: sqlite3 cluster/exploration.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'")
|
||||
print()
|
||||
print("🔄 Starting background monitoring thread...")
|
||||
|
||||
# Start monitoring in background thread (Nov 30, 2025)
|
||||
monitor_thread = threading.Thread(
|
||||
target=self._monitor_chunks_background,
|
||||
args=(grid, chunk_size, total_combos, active_chunks, worker_list,
|
||||
chunk_id_counter, chunk_start, last_counter if last_chunk else None),
|
||||
daemon=True # Die when main program exits
|
||||
)
|
||||
monitor_thread.start()
|
||||
|
||||
print("✅ Monitoring thread started - coordinator will now exit")
|
||||
print(" (Monitoring continues in background - check logs or dashboard)")
|
||||
print()
|
||||
print("=" * 80)
|
||||
|
||||
# Keep coordinator alive so daemon thread can continue
|
||||
# Thread will exit when all work is done
|
||||
print("💤 Main thread sleeping - monitoring continues in background...")
|
||||
print(" Press Ctrl+C to stop coordinator (will stop monitoring)")
|
||||
print()
|
||||
|
||||
try:
|
||||
monitor_thread.join() # Wait for monitoring thread to finish
|
||||
except KeyboardInterrupt:
|
||||
print("\n⚠️ Coordinator interrupted by user")
|
||||
print(" Workers will continue running their current chunks")
|
||||
print(" Restart coordinator to resume monitoring")
|
||||
|
||||
|
||||
def _monitor_chunks_background(self, grid, chunk_size, total_combos, active_chunks,
|
||||
worker_list, chunk_id_counter, chunk_start, last_counter):
|
||||
"""
|
||||
Background monitoring thread to detect completions and assign new chunks.
|
||||
|
||||
This runs continuously until all chunks are processed.
|
||||
Uses polling (SSH checks every 60s) to detect when workers complete chunks.
|
||||
|
||||
Args:
|
||||
grid: Parameter grid for generating chunks
|
||||
chunk_size: Number of combinations per chunk
|
||||
total_combos: Total parameter combinations to process
|
||||
active_chunks: Dict mapping chunk_id -> worker_id for currently running chunks
|
||||
worker_list: List of worker IDs for round-robin assignment
|
||||
chunk_id_counter: Current chunk counter (for generating chunk IDs)
|
||||
chunk_start: Current position in parameter space
|
||||
last_counter: Counter from last existing chunk (for progress calculation)
|
||||
"""
|
||||
import time
|
||||
poll_interval = 60 # Check every 60 seconds
|
||||
|
||||
print(f"🔄 Monitoring thread started (poll interval: {poll_interval}s)")
|
||||
print(f" Will process {total_combos:,} combinations in chunks of {chunk_size:,}")
|
||||
print()
|
||||
|
||||
try:
|
||||
while chunk_start < total_combos or active_chunks:
|
||||
time.sleep(poll_interval)
|
||||
|
||||
# Check each active chunk for completion
|
||||
completed = []
|
||||
for chunk_id, worker_id in list(active_chunks.items()):
|
||||
worker = WORKERS[worker_id]
|
||||
workspace = worker['workspace']
|
||||
|
||||
# Check if results CSV exists on worker
|
||||
results_csv = f"{workspace}/backtester/chunk_{chunk_id}_results.csv"
|
||||
|
||||
# Use appropriate SSH path for two-hop workers
|
||||
if 'ssh_hop' in worker:
|
||||
check_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"test -f {results_csv} && echo EXISTS\"'"
|
||||
else:
|
||||
check_cmd = f"ssh {worker['host']} 'test -f {results_csv} && echo EXISTS'"
|
||||
|
||||
result = subprocess.run(check_cmd, shell=True, capture_output=True, text=True)
|
||||
|
||||
if 'EXISTS' in result.stdout:
|
||||
print(f"✅ Detected completion: {chunk_id} on {worker_id}")
|
||||
|
||||
try:
|
||||
# Collect results back to coordinator
|
||||
self.collect_results(worker_id, chunk_id)
|
||||
completed.append(chunk_id)
|
||||
print(f"📥 Collected and imported results from {chunk_id}")
|
||||
except Exception as e:
|
||||
print(f"⚠️ Error collecting {chunk_id}: {e}")
|
||||
# Mark as completed anyway to prevent infinite retry
|
||||
completed.append(chunk_id)
|
||||
|
||||
# Remove completed chunks from active tracking
|
||||
for chunk_id in completed:
|
||||
del active_chunks[chunk_id]
|
||||
|
||||
# Assign new chunks if we have capacity and work remaining
|
||||
# Maintain 1 chunk per worker for 70% CPU target
|
||||
while len(active_chunks) < len(WORKERS) * 1 and chunk_start < total_combos:
|
||||
chunk_end = min(chunk_start + chunk_size, total_combos)
|
||||
chunk_id = f"v9_chunk_{chunk_id_counter:06d}"
|
||||
|
||||
# Round-robin assignment
|
||||
worker_id = worker_list[chunk_id_counter % len(worker_list)]
|
||||
|
||||
if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end):
|
||||
active_chunks[chunk_id] = worker_id
|
||||
print(f"🎯 Assigned new chunk {chunk_id} to {worker_id}")
|
||||
|
||||
chunk_id_counter += 1
|
||||
chunk_start = chunk_end
|
||||
|
||||
# Status update
|
||||
completed_count = chunk_id_counter - len(active_chunks) - (last_counter + 1 if last_counter is not None else 0)
|
||||
total_chunks = (total_combos + chunk_size - 1) // chunk_size
|
||||
progress = (completed_count / total_chunks) * 100
|
||||
print(f"📊 Progress: {completed_count}/{total_chunks} chunks ({progress:.1f}%) | Active: {len(active_chunks)}")
|
||||
|
||||
print()
|
||||
print("=" * 80)
|
||||
print("🎉 COMPREHENSIVE EXPLORATION COMPLETE!")
|
||||
print("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Monitoring thread error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
def main():
|
||||
"""Main coordinator entry point"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='Distributed continuous optimization coordinator')
|
||||
parser.add_argument('--chunk-size', type=int, default=10000,
|
||||
help='Number of combinations per chunk (default: 10000)')
|
||||
parser.add_argument('--chunk-size', type=int, default=2000,
|
||||
help='Number of combinations per chunk (default: 2000)')
|
||||
parser.add_argument('--continuous', action='store_true',
|
||||
help='Run continuously (not implemented yet)')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user