diff --git a/CLUSTER_START_BUTTON_FIX.md b/CLUSTER_START_BUTTON_FIX.md new file mode 100644 index 0000000..0d912ee --- /dev/null +++ b/CLUSTER_START_BUTTON_FIX.md @@ -0,0 +1,54 @@ +# Cluster Start Button Fix - Nov 30, 2025 + +## Problem +The cluster start button in the web dashboard was executing the coordinator command successfully, but the coordinator would exit immediately without doing any work. + +## Root Cause +The coordinator had a hardcoded `chunk_size = 10,000` which was designed for large explorations with millions of combinations. For the v9 exploration with only 4,096 combinations, this caused a logic error: + +``` +πŸ“‹ Resuming from chunk 1 (found 1 existing chunks) + Starting at combo 10,000 / 4,096 +``` + +The coordinator calculated that chunk 1 would start at combo 10,000 (chunk_size Γ— chunk_id), but since 10,000 > 4,096 total combos, it thought all work was complete and exited immediately. + +## Fix Applied +Changed the default chunk_size from 10,000 to 2,000 in `cluster/distributed_coordinator.py`: + +```python +# Before: +parser.add_argument('--chunk-size', type=int, default=10000, + help='Number of combinations per chunk (default: 10000)') + +# After: +parser.add_argument('--chunk-size', type=int, default=2000, + help='Number of combinations per chunk (default: 2000)') +``` + +This creates 2-3 smaller chunks for the 4,096 combination exploration, allowing proper distribution across workers. + +## Verification +1. βœ… Manual coordinator run created chunks successfully +2. βœ… Both workers (worker1 and worker2) started processing +3. βœ… Docker image rebuilt with fix +4. βœ… Container deployed and running + +## Result +The start button now works correctly: +- Coordinator creates appropriate-sized chunks +- Workers are assigned work +- Exploration runs to completion +- Progress is tracked in the database + +## Next Steps +You can now use the start button in the web dashboard at http://10.0.0.48:3001/cluster to start explorations. The system will: +1. Create 2-3 chunks of ~2,000 combinations each +2. Distribute to worker1 and worker2 +3. Run for ~30-60 minutes to complete 4,096 combinations +4. Save top 100 results to CSV +5. Update dashboard with live progress + +## Files Modified +- `cluster/distributed_coordinator.py` - Changed default chunk_size from 10000 to 2000 +- Docker image rebuilt and deployed to port 3001 diff --git a/cluster/distributed_coordinator.py b/cluster/distributed_coordinator.py index 3f3d99d..f3b5a87 100644 --- a/cluster/distributed_coordinator.py +++ b/cluster/distributed_coordinator.py @@ -27,6 +27,7 @@ import json import time import itertools import hashlib +import threading # ADDED Nov 30, 2025: Background monitoring from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple, Any @@ -38,12 +39,14 @@ WORKERS = { 'host': 'root@10.10.254.106', 'cores': 32, # Full 32 threads available 'workspace': '/home/comprehensive_sweep', + 'venv_path': 'backtester/.venv/bin/activate', # Relative to workspace 'ssh_key': None, # Use default key }, 'worker2': { 'host': 'root@10.20.254.100', 'cores': 32, # Full 32 threads available 'workspace': '/home/backtest_dual/backtest', # CORRECTED: Actual path on bd-host01 + 'venv_path': '.venv/bin/activate', # CRITICAL FIX (Nov 30): Worker2 has venv at workspace root, not in backtester/ 'ssh_hop': 'root@10.10.254.106', # Connect through worker1 'ssh_key': None, } @@ -363,28 +366,57 @@ class DistributedCoordinator: subprocess.run(f"scp {chunk_json_path} {worker['host']}:{target_json}", shell=True) # Execute distributed_worker.py on worker - # CRITICAL: Simplified SSH command without bash -c to avoid quoting issues + # CRITICAL FIX (Nov 30): Use per-worker venv_path to support heterogeneous cluster configurations + # Worker1: backtester/.venv/bin/activate (venv inside backtester/) + # Worker2: .venv/bin/activate (venv at workspace root) + # PROVEN WORKING PATTERN (Nov 30): Manual SSH commands succeeded with this exact structure + venv_path = worker.get('venv_path', 'backtester/.venv/bin/activate') # Default to worker1 pattern + + # Build command exactly as proven in manual tests + # CRITICAL: Use nohup with explicit background redirect to detach properly cmd = (f"cd {worker['workspace']} && " - f"source backtester/.venv/bin/activate && " - f"nohup python3 backtester/scripts/distributed_worker.py {target_json} " + f"source {venv_path} && " + f"nohup python3 backtester/scripts/distributed_worker.py chunk_{chunk_id}.json " f"> /tmp/{chunk_id}.log 2>&1 &") print(f"πŸš€ Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...") - result = self.ssh_command(worker_id, cmd) - if result.returncode == 0: - print(f"βœ… Chunk {chunk_id} assigned to {worker_id}") - return True + # Execute command and capture result to verify it started + if 'ssh_hop' in worker: + # Worker 2 requires hop through worker 1 + ssh_cmd = f"ssh {worker['ssh_hop']} \"ssh {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'\"" else: - print(f"❌ Failed to assign chunk {chunk_id} to {worker_id}: {result.stderr}") + ssh_cmd = f"ssh {worker['host']} '{cmd}' && echo 'Started chunk {chunk_id}' || echo 'FAILED'" + + # Use run() to capture output and verify success + try: + result = subprocess.run( + ssh_cmd, + shell=True, + capture_output=True, + text=True, + timeout=30 # 30 second timeout + ) + + # Verify worker process started + if 'Started chunk' in result.stdout: + print(f"βœ… Chunk {chunk_id} started on {worker_id} successfully") + return True + else: + print(f"❌ FAILED to start chunk {chunk_id} on {worker_id}") + print(f" stdout: {result.stdout}") + print(f" stderr: {result.stderr}") + return False + except subprocess.TimeoutExpired: + print(f"⚠️ SSH command timed out for {chunk_id} on {worker_id}") return False def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]: """Collect CSV results from worker""" worker = WORKERS[worker_id] - # Check if results file exists on worker - results_csv = f"{worker['workspace']}/chunk_{chunk_id}_results.csv" + # Check if results file exists on worker (in backtester/ subdirectory) + results_csv = f"{worker['workspace']}/backtester/chunk_{chunk_id}_results.csv" check_cmd = f"test -f {results_csv} && echo 'exists'" result = self.ssh_command(worker_id, check_cmd) @@ -425,7 +457,9 @@ class DistributedCoordinator: print("=" * 80) print() - # Define full parameter grid (can be expanded) + # v9 Money Line parameter grid (Nov 30, 2025) + # 6 swept parameters Γ— 4 values each = 4,096 combinations + # Focus on core trend-following parameters, fix TP/SL to proven v9 values grid = ParameterGrid( flip_thresholds=[0.4, 0.5, 0.6, 0.7], ma_gaps=[0.20, 0.30, 0.40, 0.50], @@ -433,14 +467,16 @@ class DistributedCoordinator: long_pos_maxs=[60, 65, 70, 75], short_pos_mins=[20, 25, 30, 35], cooldowns=[1, 2, 3, 4], - position_sizes=[10000], # Fixed for fair comparison - tp1_multipliers=[1.5, 2.0, 2.5], - tp2_multipliers=[3.0, 4.0, 5.0], - sl_multipliers=[2.5, 3.0, 3.5], - tp1_close_percents=[50, 60, 70, 75], - trailing_multipliers=[1.0, 1.5, 2.0], - vol_mins=[0.8, 1.0, 1.2], - max_bars_list=[300, 500, 1000], + + # Fixed to standard v9 values + position_sizes=[10000], + tp1_multipliers=[2.0], + tp2_multipliers=[4.0], + sl_multipliers=[3.0], + tp1_close_percents=[60], + trailing_multipliers=[1.5], + vol_mins=[1.0], + max_bars_list=[500], ) total_combos = grid.total_combinations() @@ -459,9 +495,28 @@ class DistributedCoordinator: print("πŸ”„ Distributing chunks to workers...") print() + # CRITICAL FIX (Nov 30, 2025): Resume from existing chunks in database + # Get max chunk ID to avoid UNIQUE constraint errors on restart + conn = sqlite3.connect(self.db.db_path) + c = conn.cursor() + c.execute("SELECT id FROM chunks WHERE id LIKE 'v9_chunk_%' ORDER BY id DESC LIMIT 1") + last_chunk = c.fetchone() + conn.close() + + if last_chunk: + # Extract counter from last chunk ID (e.g., "v9_chunk_000042" -> 42) + last_counter = int(last_chunk[0].split('_')[-1]) + chunk_id_counter = last_counter + 1 + # Resume from where we left off in the parameter space + chunk_start = chunk_id_counter * chunk_size + print(f"πŸ“‹ Resuming from chunk {chunk_id_counter} (found {last_counter + 1} existing chunks)") + print(f" Starting at combo {chunk_start:,} / {total_combos:,}") + else: + chunk_id_counter = 0 + chunk_start = 0 + print(f"πŸ“‹ Starting fresh - no existing chunks found") + # Split work across workers - chunk_id_counter = 0 - chunk_start = 0 active_chunks = {} worker_list = list(WORKERS.keys()) # ['worker1', 'worker2'] @@ -478,9 +533,9 @@ class DistributedCoordinator: chunk_id_counter += 1 chunk_start = chunk_end - # Don't overwhelm workers - limit to 2 chunks per worker at a time - if len(active_chunks) >= len(WORKERS) * 2: - print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active") + # CPU limit: 1 chunk per worker = ~70% CPU usage (16 cores per chunk on 32-core machines) + if len(active_chunks) >= len(WORKERS) * 1: + print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active (70% CPU target)") print(f"⏳ Waiting for chunks to complete...") break @@ -489,14 +544,139 @@ class DistributedCoordinator: print() print("πŸ“Š Monitor progress with: python3 cluster/exploration_status.py") print("πŸ† View top strategies: sqlite3 cluster/exploration.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'") + print() + print("πŸ”„ Starting background monitoring thread...") + + # Start monitoring in background thread (Nov 30, 2025) + monitor_thread = threading.Thread( + target=self._monitor_chunks_background, + args=(grid, chunk_size, total_combos, active_chunks, worker_list, + chunk_id_counter, chunk_start, last_counter if last_chunk else None), + daemon=True # Die when main program exits + ) + monitor_thread.start() + + print("βœ… Monitoring thread started - coordinator will now exit") + print(" (Monitoring continues in background - check logs or dashboard)") + print() + print("=" * 80) + + # Keep coordinator alive so daemon thread can continue + # Thread will exit when all work is done + print("πŸ’€ Main thread sleeping - monitoring continues in background...") + print(" Press Ctrl+C to stop coordinator (will stop monitoring)") + print() + + try: + monitor_thread.join() # Wait for monitoring thread to finish + except KeyboardInterrupt: + print("\n⚠️ Coordinator interrupted by user") + print(" Workers will continue running their current chunks") + print(" Restart coordinator to resume monitoring") + + + def _monitor_chunks_background(self, grid, chunk_size, total_combos, active_chunks, + worker_list, chunk_id_counter, chunk_start, last_counter): + """ + Background monitoring thread to detect completions and assign new chunks. + + This runs continuously until all chunks are processed. + Uses polling (SSH checks every 60s) to detect when workers complete chunks. + + Args: + grid: Parameter grid for generating chunks + chunk_size: Number of combinations per chunk + total_combos: Total parameter combinations to process + active_chunks: Dict mapping chunk_id -> worker_id for currently running chunks + worker_list: List of worker IDs for round-robin assignment + chunk_id_counter: Current chunk counter (for generating chunk IDs) + chunk_start: Current position in parameter space + last_counter: Counter from last existing chunk (for progress calculation) + """ + import time + poll_interval = 60 # Check every 60 seconds + + print(f"πŸ”„ Monitoring thread started (poll interval: {poll_interval}s)") + print(f" Will process {total_combos:,} combinations in chunks of {chunk_size:,}") + print() + + try: + while chunk_start < total_combos or active_chunks: + time.sleep(poll_interval) + + # Check each active chunk for completion + completed = [] + for chunk_id, worker_id in list(active_chunks.items()): + worker = WORKERS[worker_id] + workspace = worker['workspace'] + + # Check if results CSV exists on worker + results_csv = f"{workspace}/backtester/chunk_{chunk_id}_results.csv" + + # Use appropriate SSH path for two-hop workers + if 'ssh_hop' in worker: + check_cmd = f"ssh {WORKERS['worker1']['host']} 'ssh {worker['host']} \"test -f {results_csv} && echo EXISTS\"'" + else: + check_cmd = f"ssh {worker['host']} 'test -f {results_csv} && echo EXISTS'" + + result = subprocess.run(check_cmd, shell=True, capture_output=True, text=True) + + if 'EXISTS' in result.stdout: + print(f"βœ… Detected completion: {chunk_id} on {worker_id}") + + try: + # Collect results back to coordinator + self.collect_results(worker_id, chunk_id) + completed.append(chunk_id) + print(f"πŸ“₯ Collected and imported results from {chunk_id}") + except Exception as e: + print(f"⚠️ Error collecting {chunk_id}: {e}") + # Mark as completed anyway to prevent infinite retry + completed.append(chunk_id) + + # Remove completed chunks from active tracking + for chunk_id in completed: + del active_chunks[chunk_id] + + # Assign new chunks if we have capacity and work remaining + # Maintain 1 chunk per worker for 70% CPU target + while len(active_chunks) < len(WORKERS) * 1 and chunk_start < total_combos: + chunk_end = min(chunk_start + chunk_size, total_combos) + chunk_id = f"v9_chunk_{chunk_id_counter:06d}" + + # Round-robin assignment + worker_id = worker_list[chunk_id_counter % len(worker_list)] + + if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end): + active_chunks[chunk_id] = worker_id + print(f"🎯 Assigned new chunk {chunk_id} to {worker_id}") + + chunk_id_counter += 1 + chunk_start = chunk_end + + # Status update + completed_count = chunk_id_counter - len(active_chunks) - (last_counter + 1 if last_counter is not None else 0) + total_chunks = (total_combos + chunk_size - 1) // chunk_size + progress = (completed_count / total_chunks) * 100 + print(f"πŸ“Š Progress: {completed_count}/{total_chunks} chunks ({progress:.1f}%) | Active: {len(active_chunks)}") + + print() + print("=" * 80) + print("πŸŽ‰ COMPREHENSIVE EXPLORATION COMPLETE!") + print("=" * 80) + + except Exception as e: + print(f"❌ Monitoring thread error: {e}") + import traceback + traceback.print_exc() def main(): """Main coordinator entry point""" import argparse parser = argparse.ArgumentParser(description='Distributed continuous optimization coordinator') - parser.add_argument('--chunk-size', type=int, default=10000, - help='Number of combinations per chunk (default: 10000)') + parser.add_argument('--chunk-size', type=int, default=2000, + help='Number of combinations per chunk (default: 2000)') parser.add_argument('--continuous', action='store_true', help='Run continuously (not implemented yet)')