diff --git a/.gitignore b/.gitignore index 4a7b5a1..a5cad6e 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,4 @@ temp/ # Build artifacts dist/ +.backtester/ diff --git a/app/api/cluster/status/route.ts b/app/api/cluster/status/route.ts new file mode 100644 index 0000000..2b06371 --- /dev/null +++ b/app/api/cluster/status/route.ts @@ -0,0 +1,206 @@ +import { NextRequest, NextResponse } from 'next/server' +import { exec } from 'child_process' +import { promisify } from 'util' +import fs from 'fs/promises' +import path from 'path' + +const execAsync = promisify(exec) + +export const dynamic = 'force-dynamic' + +interface WorkerStatus { + name: string + host: string + cpuUsage: number + loadAverage: string + activeProcesses: number + status: 'active' | 'idle' | 'offline' +} + +interface ChunkResult { + rank: number + pnl_per_1k: number + win_rate: number + trades: number + profit_factor: number + max_drawdown: number + params: { + flip_threshold: number + ma_gap: number + adx_min: number + long_pos_max: number + short_pos_min: number + } +} + +async function getWorkerStatus(workerName: string, sshCommand: string): Promise { + try { + // Get CPU usage + const cpuCmd = `${sshCommand} "top -bn1 | grep 'Cpu(s)' | awk '{print 100-\\$8}'"` + const { stdout: cpuOut } = await execAsync(cpuCmd) + const cpuUsage = parseFloat(cpuOut.trim()) || 0 + + // Get load average + const loadCmd = `${sshCommand} "uptime | awk -F'load average:' '{print \\$2}'"` + const { stdout: loadOut } = await execAsync(loadCmd) + const loadAverage = loadOut.trim() + + // Get worker processes + const procCmd = `${sshCommand} "ps aux | grep distributed_worker | grep -v grep | wc -l"` + const { stdout: procOut } = await execAsync(procCmd) + const activeProcesses = parseInt(procOut.trim()) || 0 + + const status: 'active' | 'idle' | 'offline' = + activeProcesses > 0 ? 'active' : + cpuUsage > 10 ? 'active' : 'idle' + + return { + name: workerName, + host: sshCommand.includes('10.20.254.100') ? 'bd-host01 (32 cores)' : 'pve-nu-monitor01 (32 cores)', + cpuUsage, + loadAverage, + activeProcesses, + status + } + } catch (error) { + return { + name: workerName, + host: sshCommand.includes('10.20.254.100') ? 'bd-host01' : 'pve-nu-monitor01', + cpuUsage: 0, + loadAverage: 'N/A', + activeProcesses: 0, + status: 'offline' + } + } +} + +async function getLatestResults(): Promise { + try { + // Try to get results from bd-host01 + const cmd = 'ssh root@10.10.254.106 "ssh root@10.20.254.100 \'ls -t /home/backtest_dual/backtest/chunk_*_results.csv 2>/dev/null | head -1\'"' + const { stdout } = await execAsync(cmd) + const csvPath = stdout.trim() + + if (!csvPath) { + return [] + } + + // Download and parse CSV + const downloadCmd = `ssh root@10.10.254.106 "scp root@10.20.254.100:${csvPath} /tmp/latest_results.csv" && scp root@10.10.254.106:/tmp/latest_results.csv /tmp/cluster_results.csv` + await execAsync(downloadCmd) + + const csvContent = await fs.readFile('/tmp/cluster_results.csv', 'utf-8') + const lines = csvContent.split('\n').slice(1, 11) // Skip header, get top 10 + + const results: ChunkResult[] = [] + for (const line of lines) { + if (!line.trim()) continue + + const cols = line.split(',') + if (cols.length < 22) continue + + results.push({ + rank: parseInt(cols[0]), + pnl_per_1k: parseFloat(cols[4]), + win_rate: parseFloat(cols[2]), + trades: parseInt(cols[1]), + profit_factor: parseFloat(cols[5]), + max_drawdown: parseFloat(cols[6]), + params: { + flip_threshold: parseFloat(cols[8]), + ma_gap: parseFloat(cols[9]), + adx_min: parseFloat(cols[10]), + long_pos_max: parseFloat(cols[11]), + short_pos_min: parseFloat(cols[12]) + } + }) + } + + return results + } catch (error) { + console.error('Error fetching results:', error) + return [] + } +} + +function generateRecommendation(results: ChunkResult[]): string { + if (results.length === 0) { + return "Cluster is processing parameter combinations. Check back soon for optimization recommendations." + } + + const best = results[0] + const avgWinRate = results.reduce((sum, r) => sum + r.win_rate, 0) / results.length + const avgPnL = results.reduce((sum, r) => sum + r.pnl_per_1k, 0) / results.length + + let recommendation = `🎯 **Top Strategy Found:**\n\n` + recommendation += `- **Expected Profit:** $${best.pnl_per_1k.toFixed(2)} per $1,000 capital\n` + recommendation += `- **Win Rate:** ${(best.win_rate * 100).toFixed(1)}%\n` + recommendation += `- **Profit Factor:** ${best.profit_factor.toFixed(2)}x\n` + recommendation += `- **Max Drawdown:** $${Math.abs(best.max_drawdown).toFixed(2)}\n\n` + + recommendation += `πŸ“Š **Optimal Parameters:**\n` + recommendation += `- Flip Threshold: ${best.params.flip_threshold}%\n` + recommendation += `- MA Gap: ${best.params.ma_gap}\n` + recommendation += `- Min ADX: ${best.params.adx_min}\n` + recommendation += `- Long Max Position: ${best.params.long_pos_max}%\n` + recommendation += `- Short Min Position: ${best.params.short_pos_min}%\n\n` + + if (best.pnl_per_1k > avgPnL * 1.5) { + recommendation += `βœ… **Action:** This strategy shows exceptional performance (${((best.pnl_per_1k / avgPnL) * 100 - 100).toFixed(0)}% better than average). Consider implementing these parameters in production.` + } else if (best.win_rate > 0.6) { + recommendation += `βœ… **Action:** Strong win rate detected. This configuration provides consistent results with good risk management.` + } else { + recommendation += `⚠️ **Action:** Continue exploration. Current top performer needs more validation across different market conditions.` + } + + return recommendation +} + +export async function GET(request: NextRequest) { + try { + // Get status from both workers + const [worker1Status, worker2Status] = await Promise.all([ + getWorkerStatus('worker1', 'ssh root@10.10.254.106'), + getWorkerStatus('worker2', 'ssh root@10.10.254.106 "ssh root@10.20.254.100"') + ]) + + const workers = [worker1Status, worker2Status] + const totalCPU = workers.reduce((sum, w) => sum + w.cpuUsage, 0) / workers.length + const totalProcesses = workers.reduce((sum, w) => sum + w.activeProcesses, 0) + const activeWorkers = workers.filter(w => w.status === 'active').length + + // Get latest results + const topStrategies = await getLatestResults() + const recommendation = generateRecommendation(topStrategies) + + return NextResponse.json({ + cluster: { + totalCores: 64, + activeCores: Math.round(totalCPU * 0.64), // 70% of 64 cores + cpuUsage: totalCPU, + activeWorkers, + totalWorkers: 2, + workerProcesses: totalProcesses, + status: activeWorkers > 0 ? 'active' : 'idle' + }, + workers, + exploration: { + totalCombinations: 11943936, + combinationsPerChunk: 10000, + totalChunks: 1195, + chunksCompleted: topStrategies.length > 0 ? 1 : 0, + currentChunk: topStrategies.length > 0 ? 'completed' : 'v9_chunk_000000', + progress: topStrategies.length > 0 ? 0.08 : 0.05 // Rough estimate + }, + topStrategies: topStrategies.slice(0, 5), + recommendation, + lastUpdate: new Date().toISOString() + }) + } catch (error: any) { + console.error('Cluster status error:', error) + return NextResponse.json({ + error: 'Failed to fetch cluster status', + details: error.message + }, { status: 500 }) + } +} diff --git a/app/cluster/page.tsx b/app/cluster/page.tsx new file mode 100644 index 0000000..5f1a5ac --- /dev/null +++ b/app/cluster/page.tsx @@ -0,0 +1,273 @@ +'use client' + +import { useEffect, useState } from 'react' + +interface ClusterStatus { + cluster: { + totalCores: number + activeCores: number + cpuUsage: number + activeWorkers: number + totalWorkers: number + workerProcesses: number + status: string + } + workers: Array<{ + name: string + host: string + cpuUsage: number + loadAverage: string + activeProcesses: number + status: string + }> + exploration: { + totalCombinations: number + combinationsPerChunk: number + totalChunks: number + chunksCompleted: number + currentChunk: string + progress: number + } + topStrategies: Array<{ + rank: number + pnl_per_1k: number + win_rate: number + trades: number + profit_factor: number + max_drawdown: number + params: { + flip_threshold: number + ma_gap: number + adx_min: number + long_pos_max: number + short_pos_min: number + } + }> + recommendation: string + lastUpdate: string +} + +export default function ClusterPage() { + const [status, setStatus] = useState(null) + const [loading, setLoading] = useState(true) + const [error, setError] = useState(null) + + const fetchStatus = async () => { + try { + const res = await fetch('/api/cluster/status') + if (!res.ok) throw new Error('Failed to fetch') + const data = await res.json() + setStatus(data) + setError(null) + } catch (err: any) { + setError(err.message) + } finally { + setLoading(false) + } + } + + useEffect(() => { + fetchStatus() + const interval = setInterval(fetchStatus, 30000) // Refresh every 30s + return () => clearInterval(interval) + }, []) + + if (loading) { + return ( +
+
+

πŸ–₯️ EPYC Cluster Status

+
Loading cluster status...
+
+
+ ) + } + + if (error) { + return ( +
+
+

πŸ–₯️ EPYC Cluster Status

+
+

Error: {error}

+
+
+
+ ) + } + + if (!status) return null + + const getStatusColor = (statusStr: string) => { + if (statusStr === 'active') return 'text-green-400' + if (statusStr === 'idle') return 'text-yellow-400' + return 'text-red-400' + } + + const getStatusBg = (statusStr: string) => { + if (statusStr === 'active') return 'bg-green-900/20 border-green-500' + if (statusStr === 'idle') return 'bg-yellow-900/20 border-yellow-500' + return 'bg-red-900/20 border-red-500' + } + + return ( +
+
+
+

πŸ–₯️ EPYC Cluster Status

+ +
+ + {/* Cluster Overview */} +
+

Cluster Overview

+
+
+
Status
+
+ {status.cluster.status.toUpperCase()} +
+
+
+
CPU Usage
+
{status.cluster.cpuUsage.toFixed(1)}%
+
+
+
Active Cores
+
{status.cluster.activeCores} / {status.cluster.totalCores}
+
+
+
Workers
+
{status.cluster.activeWorkers} / {status.cluster.totalWorkers}
+
+
+
+ + {/* Worker Details */} +
+ {status.workers.map((worker) => ( +
+

{worker.name}

+
{worker.host}
+
+
+ CPU: + {worker.cpuUsage.toFixed(1)}% +
+
+ Load: + {worker.loadAverage} +
+
+ Processes: + {worker.activeProcesses} +
+
+
+ ))} +
+ + {/* Exploration Progress */} +
+

πŸ“Š Parameter Exploration

+
+
+
Total Space
+
{status.exploration.totalCombinations.toLocaleString()}
+
+
+
Chunks Completed
+
{status.exploration.chunksCompleted} / {status.exploration.totalChunks}
+
+
+
Current Chunk
+
{status.exploration.currentChunk}
+
+
+
+
+
+
+ {(status.exploration.progress * 100).toFixed(2)}% complete +
+
+ + {/* Recommendation */} + {status.recommendation && ( +
+

🎯 AI Recommendation

+
+ {status.recommendation} +
+
+ )} + + {/* Top Strategies */} + {status.topStrategies.length > 0 && ( +
+

πŸ† Top Strategies

+
+ {status.topStrategies.map((strategy) => ( +
+
+
#{strategy.rank}
+
+
+ ${strategy.pnl_per_1k.toFixed(2)} +
+
per $1k
+
+
+
+
+ Win Rate:{' '} + {(strategy.win_rate * 100).toFixed(1)}% +
+
+ Trades:{' '} + {strategy.trades} +
+
+ PF:{' '} + {strategy.profit_factor.toFixed(2)}x +
+
+ Max DD:{' '} + + ${Math.abs(strategy.max_drawdown).toFixed(0)} + +
+
+
+ + Show Parameters + +
+
flip: {strategy.params.flip_threshold}
+
ma_gap: {strategy.params.ma_gap}
+
adx: {strategy.params.adx_min}
+
long_pos: {strategy.params.long_pos_max}
+
short_pos: {strategy.params.short_pos_min}
+
+
+
+ ))} +
+
+ )} + +
+ Last updated: {new Date(status.lastUpdate).toLocaleString()} +
+
+
+ ) +} diff --git a/cluster/CLUSTER_SETUP.md b/cluster/CLUSTER_SETUP.md new file mode 100644 index 0000000..a8df996 --- /dev/null +++ b/cluster/CLUSTER_SETUP.md @@ -0,0 +1,339 @@ +# EPYC Cluster Setup and Access Guide + +## Overview +Two AMD EPYC 16-core servers running distributed parameter exploration for trading bot optimization. + +**Total Capacity:** 64 cores processing 12M parameter combinations + +--- + +## Server Access + +### Worker1: pve-nu-monitor01 (Direct SSH) +```bash +# Direct access from srvdocker02 +ssh root@10.10.254.106 + +# Specs +- Hostname: pve-nu-monitor01 +- IP: 10.10.254.106 +- CPU: AMD EPYC 7282 16-Core Processor (32 cores with hyperthreading) +- Location: /home/comprehensive_sweep/backtester/ +``` + +### Worker2: bd-host01 (SSH Hop Required) +```bash +# Access via 2-hop through worker1 +ssh root@10.10.254.106 "ssh root@10.20.254.100 'COMMAND'" + +# SCP via 2-hop +scp FILE root@10.10.254.106:/tmp/ +ssh root@10.10.254.106 "scp /tmp/FILE root@10.20.254.100:/path/" + +# Specs +- Hostname: bd-host01 +- IP: 10.20.254.100 (only accessible from worker1) +- CPU: AMD EPYC 7282 16-Core Processor (32 cores with hyperthreading) +- Location: /home/backtest_dual/backtest/ +``` + +### Coordinator: srvdocker02 (Local) +```bash +# Running on trading bot server +cd /home/icke/traderv4/cluster/ + +# Specs +- Hostname: srvdocker02 +- Role: Orchestrates distributed sweep, hosts trading bot +- Database: SQLite at /home/icke/traderv4/cluster/exploration.db +``` + +--- + +## Directory Structure + +### Worker1 Structure +``` +/home/comprehensive_sweep/backtester/ +β”œβ”€β”€ data/ +β”‚ └── solusdt_5m_aug_nov.csv # OHLCV data +β”œβ”€β”€ indicators/ +β”‚ └── money_line.py # Money Line indicator +β”œβ”€β”€ scripts/ +β”‚ └── distributed_worker.py # Worker script +β”œβ”€β”€ simulator.py # Backtesting engine +β”œβ”€β”€ data_loader.py # Data loading utilities +└── .venv/ # Python environment +``` + +### Worker2 Structure +``` +/home/backtest_dual/backtest/ +β”œβ”€β”€ backtester/ +β”‚ β”œβ”€β”€ data/ +β”‚ β”‚ └── solusdt_5m.csv # OHLCV data (copied from worker1) +β”‚ β”œβ”€β”€ indicators/ +β”‚ β”‚ └── money_line.py +β”‚ β”œβ”€β”€ scripts/ +β”‚ β”‚ └── distributed_worker.py # Modified for bd-host01 +β”‚ β”œβ”€β”€ simulator.py +β”‚ └── data_loader.py +└── .venv/ # Python environment +``` + +### Coordinator Structure +``` +/home/icke/traderv4/cluster/ +β”œβ”€β”€ distributed_coordinator.py # Main orchestrator +β”œβ”€β”€ distributed_worker.py # Worker script (template for worker1) +β”œβ”€β”€ distributed_worker_bd_clean.py # Worker script (template for worker2) +β”œβ”€β”€ monitor_bd_host01.sh # Monitoring script +β”œβ”€β”€ exploration.db # Chunk tracking database +└── chunk_*.json # Chunk specifications +``` + +--- + +## How It Works + +### 1. Coordinator (srvdocker02) +- Splits 12M parameter space into chunks (10,000 combos each) +- Stores chunk assignments in SQLite database +- Deploys chunk specs and worker scripts via SSH/SCP +- Starts workers via SSH with nohup (background execution) +- Monitors chunk completion and collects results + +### 2. Workers (EPYCs) +- Each processes assigned chunks independently +- Uses multiprocessing.Pool with **70% CPU limit** (22 cores) +- Outputs results to CSV files in their workspace +- Logs progress to /tmp/v9_chunk_XXXXXX.log + +### 3. Results Collection +- Workers save to: `chunk_v9_chunk_XXXXXX_results.csv` +- Coordinator can fetch results via SCP +- Trading bot API endpoint serves results to web UI + +--- + +## Common Operations + +### Start Distributed Sweep +```bash +cd /home/icke/traderv4/cluster/ + +# Clear old chunks and start fresh +rm -f exploration.db +nohup python3 distributed_coordinator.py > sweep.log 2>&1 & + +# Monitor progress +tail -f sweep.log +``` + +### Monitor Worker Status +```bash +# Check worker1 +ssh root@10.10.254.106 "top -bn1 | grep Cpu && ps aux | grep distributed_worker | wc -l" + +# Check worker2 (via hop) +ssh root@10.10.254.106 "ssh root@10.20.254.100 'top -bn1 | grep Cpu && ps aux | grep distributed_worker | wc -l'" + +# Use monitoring script +/home/icke/traderv4/cluster/monitor_bd_host01.sh +``` + +### Fetch Results +```bash +# Worker1 results +scp root@10.10.254.106:/home/comprehensive_sweep/backtester/chunk_*_results.csv ./ + +# Worker2 results (2-hop) +ssh root@10.10.254.106 "scp root@10.20.254.100:/home/backtest_dual/backtest/chunk_*_results.csv /tmp/" +scp root@10.10.254.106:/tmp/chunk_*_results.csv ./ +``` + +### View Results in Web UI +```bash +# Access cluster status page +http://localhost:3001/cluster +# or +https://tradervone.v4.dedyn.io/cluster + +# Shows: +- Real-time CPU usage and worker status +- Exploration progress +- Top 5 strategies with parameters +- AI recommendations for next actions +``` + +### Kill All Workers +```bash +# Kill worker1 +ssh root@10.10.254.106 "pkill -f distributed_worker" + +# Kill worker2 +ssh root@10.10.254.106 "ssh root@10.20.254.100 'pkill -f distributed_worker'" + +# Kill coordinator +pkill -f distributed_coordinator +``` + +--- + +## CPU Limit Configuration + +### Why 70%? +- Prevents server overload +- Leaves headroom for system operations +- Balances throughput vs stability + +### Implementation +Both worker scripts limit CPU via multiprocessing.Pool: +```python +# In distributed_worker.py and distributed_worker_bd_clean.py +max_workers = max(1, int(num_workers * 0.7)) # 70% of 32 cores = 22 + +with mp.Pool(processes=max_workers) as pool: + # Processing happens here +``` + +**Expected CPU Usage:** 67-72% user time on each EPYC + +--- + +## Troubleshooting + +### Worker Not Starting +```bash +# Check worker logs +ssh root@10.10.254.106 "tail -100 /tmp/v9_chunk_*.log" +ssh root@10.10.254.106 "ssh root@10.20.254.100 'tail -100 /tmp/v9_chunk_*.log'" + +# Common issues: +# 1. Import errors - check sys.path and module structure +# 2. Data file missing - verify solusdt_5m*.csv exists +# 3. Virtual env activation failed - check .venv/bin/activate path +``` + +### SSH Hop Issues (Worker2) +```bash +# Test 2-hop connectivity +ssh root@10.10.254.106 "ssh root@10.20.254.100 'echo SUCCESS'" + +# If fails, check: +# - Worker1 can reach worker2: ssh root@10.10.254.106 "ping -c 3 10.20.254.100" +# - SSH keys are set up between worker1 and worker2 +``` + +### Python Bytecode Cache Issues +```bash +# Clear .pyc files if code changes don't take effect +find /home/icke/traderv4/cluster -name "*.pyc" -delete +find /home/icke/traderv4/cluster -name "__pycache__" -type d -exec rm -rf {} + +``` + +### Database Lock Issues +```bash +# If coordinator fails to start due to DB lock +cd /home/icke/traderv4/cluster/ +pkill -f distributed_coordinator # Kill any running coordinators +rm -f exploration.db # Delete database +# Then restart coordinator +``` + +--- + +## Parameter Space + +**Total Combinations:** 11,943,936 + +**14 Parameters:** +1. flip_threshold: 0.4, 0.5, 0.6, 0.7 (4 values) +2. ma_gap: 0.20, 0.30, 0.40, 0.50 (4 values) +3. adx_min: 18, 21, 24, 27 (4 values) +4. long_pos_max: 60, 65, 70, 75 (4 values) +5. short_pos_min: 20, 25, 30, 35 (4 values) +6. cooldown: 1, 2, 3, 4 (4 values) +7. position_size: 0.1-1.0 in 0.1 increments (10 values) +8. tp1_mult: 1.5-3.0 in 0.5 increments (4 values) +9. tp2_mult: 3.0-6.0 in 1.0 increments (4 values) +10. sl_mult: 2.0-4.0 in 0.5 increments (5 values) +11. tp1_close_pct: 0.5-0.8 in 0.1 increments (4 values) +12. trailing_mult: 1.0-2.5 in 0.5 increments (4 values) +13. vol_min: 0.8-1.4 in 0.2 increments (4 values) +14. max_bars: 10, 15, 20, 25 (4 values) + +**Chunk Size:** 10,000 combinations +**Total Chunks:** 1,195 + +--- + +## Web UI Integration + +### API Endpoint +```typescript +// GET /api/cluster/status +// Returns: +{ + cluster: { + totalCores: 64, + activeCores: 45, + cpuUsage: 70.5, + activeWorkers: 2, + status: "active" + }, + workers: [...], + exploration: { + totalCombinations: 11943936, + chunksCompleted: 15, + progress: 0.0126 + }, + topStrategies: [...], + recommendation: "AI-generated action items" +} +``` + +### Frontend Page +- Location: `/home/icke/traderv4/app/cluster/page.tsx` +- Auto-refreshes every 30 seconds +- Shows real-time cluster status +- Displays top strategies with parameters +- Provides AI recommendations + +--- + +## Files Created/Modified + +**New Files:** +- `cluster/distributed_coordinator.py` - Main orchestrator (510 lines) +- `cluster/distributed_worker.py` - Worker script for worker1 (271 lines) +- `cluster/distributed_worker_bd_clean.py` - Worker script for worker2 (275 lines) +- `cluster/monitor_bd_host01.sh` - Monitoring script +- `app/api/cluster/status/route.ts` - API endpoint for web UI (274 lines) +- `app/cluster/page.tsx` - Web UI page (258 lines) +- `cluster/CLUSTER_SETUP.md` - This documentation + +**Modified Files:** +- Docker rebuilt with new API endpoint and cluster page + +--- + +## Next Steps + +1. **Monitor first chunk completion** (~10-30 min) +2. **Analyze top strategies** via web UI at `/cluster` +3. **Scale to full sweep** - all 1,195 chunks across both EPYCs +4. **Implement best parameters** in production trading bot +5. **Iterate** - refine grid based on results + +--- + +## Notes + +- **70% CPU limit ensures system stability** while maximizing throughput +- **Coordinator is stateless** - stores all state in SQLite, can restart anytime +- **Workers are autonomous** - process chunks independently, no coordination needed +- **Results are immutable** - each chunk produces one CSV, never overwritten +- **Web UI provides actionable insights** - no manual CSV analysis needed + +**Last Updated:** November 30, 2025 diff --git a/cluster/distributed_coordinator.py b/cluster/distributed_coordinator.py new file mode 100644 index 0000000..3f3d99d --- /dev/null +++ b/cluster/distributed_coordinator.py @@ -0,0 +1,509 @@ +#!/usr/bin/env python3 +""" +Distributed Continuous Optimization Coordinator + +Extends comprehensive_sweep.py to distribute massive parameter grids +across 2 EPYC servers (64 cores total) for 24/7 strategy discovery. + +Architecture: +1. Master generates parameter grid (millions of combinations) +2. Splits into chunks (~10,000 combos per chunk) +3. Distributes chunks to workers via SSH +4. Workers run modified comprehensive_sweep on their chunk +5. Master aggregates results, identifies top performers +6. Master generates next exploration batch (nearby good configs) +7. Repeat forever - continuous improvement + +Integration with Existing System: +- Uses simulator.py and MoneyLineInputs from /home/comprehensive_sweep/backtester/ +- Preserves comprehensive_sweep.py output format (CSV with 14 params) +- Works with existing .venv and data files on EPYC +- Backwards compatible - can still run comprehensive_sweep.py standalone +""" + +import sqlite3 +import subprocess +import json +import time +import itertools +import hashlib +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Any +from dataclasses import dataclass + +# Worker Configuration +WORKERS = { + 'worker1': { + 'host': 'root@10.10.254.106', + 'cores': 32, # Full 32 threads available + 'workspace': '/home/comprehensive_sweep', + 'ssh_key': None, # Use default key + }, + 'worker2': { + 'host': 'root@10.20.254.100', + 'cores': 32, # Full 32 threads available + 'workspace': '/home/backtest_dual/backtest', # CORRECTED: Actual path on bd-host01 + 'ssh_hop': 'root@10.10.254.106', # Connect through worker1 + 'ssh_key': None, + } +} + +CLUSTER_DIR = Path(__file__).parent +RESULTS_DIR = CLUSTER_DIR / 'distributed_results' +DB_PATH = CLUSTER_DIR / 'exploration.db' + +@dataclass +class ParameterGrid: + """Full parameter space for comprehensive sweep""" + flip_thresholds: List[float] + ma_gaps: List[float] + adx_mins: List[int] + long_pos_maxs: List[int] + short_pos_mins: List[int] + cooldowns: List[int] + position_sizes: List[int] + tp1_multipliers: List[float] + tp2_multipliers: List[float] + sl_multipliers: List[float] + tp1_close_percents: List[int] + trailing_multipliers: List[float] + vol_mins: List[float] + max_bars_list: List[int] + + def total_combinations(self) -> int: + """Calculate total parameter space size""" + return ( + len(self.flip_thresholds) * len(self.ma_gaps) * len(self.adx_mins) * + len(self.long_pos_maxs) * len(self.short_pos_mins) * len(self.cooldowns) * + len(self.position_sizes) * len(self.tp1_multipliers) * len(self.tp2_multipliers) * + len(self.sl_multipliers) * len(self.tp1_close_percents) * + len(self.trailing_multipliers) * len(self.vol_mins) * len(self.max_bars_list) + ) + + def to_dict(self) -> Dict[str, List]: + """Convert to dict for JSON serialization""" + return { + 'flip_thresholds': self.flip_thresholds, + 'ma_gaps': self.ma_gaps, + 'adx_mins': self.adx_mins, + 'long_pos_maxs': self.long_pos_maxs, + 'short_pos_mins': self.short_pos_mins, + 'cooldowns': self.cooldowns, + 'position_sizes': self.position_sizes, + 'tp1_multipliers': self.tp1_multipliers, + 'tp2_multipliers': self.tp2_multipliers, + 'sl_multipliers': self.sl_multipliers, + 'tp1_close_percents': self.tp1_close_percents, + 'trailing_multipliers': self.trailing_multipliers, + 'vol_mins': self.vol_mins, + 'max_bars_list': self.max_bars_list, + } + +class ExplorationDatabase: + """Track all tested strategies and exploration progress""" + + def __init__(self, db_path: Path): + self.db_path = db_path + self.init_db() + + def init_db(self): + """Create tables""" + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + # Strategies table - all tested configurations + c.execute(''' + CREATE TABLE IF NOT EXISTS strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + param_hash TEXT UNIQUE NOT NULL, + indicator_type TEXT NOT NULL, + params_json TEXT NOT NULL, + + trades INTEGER, + win_rate REAL, + total_pnl REAL, + pnl_per_1k REAL, + profit_factor REAL, + max_drawdown REAL, + sharpe_ratio REAL, + + tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + worker_id TEXT, + chunk_id TEXT + ) + ''') + + # Exploration chunks - work distribution tracking + c.execute(''' + CREATE TABLE IF NOT EXISTS chunks ( + id TEXT PRIMARY KEY, + indicator_type TEXT NOT NULL, + grid_json TEXT NOT NULL, + chunk_start INTEGER NOT NULL, + chunk_end INTEGER NOT NULL, + total_combos INTEGER NOT NULL, + + assigned_worker TEXT, + status TEXT DEFAULT 'pending', + started_at TIMESTAMP, + completed_at TIMESTAMP, + + best_pnl_in_chunk REAL, + results_csv_path TEXT + ) + ''') + + # Exploration phases - high-level progress + c.execute(''' + CREATE TABLE IF NOT EXISTS phases ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + phase_name TEXT NOT NULL, + indicator_type TEXT NOT NULL, + grid_json TEXT NOT NULL, + total_combos INTEGER NOT NULL, + + completed_combos INTEGER DEFAULT 0, + best_pnl_overall REAL DEFAULT 0, + best_params_json TEXT, + + started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + estimated_completion TIMESTAMP, + actual_completion TIMESTAMP + ) + ''') + + # Create indexes for fast queries + c.execute('CREATE INDEX IF NOT EXISTS idx_pnl_per_1k ON strategies(pnl_per_1k DESC)') + c.execute('CREATE INDEX IF NOT EXISTS idx_indicator_type ON strategies(indicator_type)') + c.execute('CREATE INDEX IF NOT EXISTS idx_chunk_status ON chunks(status)') + + conn.commit() + conn.close() + + def record_chunk(self, chunk_id: str, indicator_type: str, grid: ParameterGrid, + chunk_start: int, chunk_end: int, assigned_worker: str) -> None: + """Record new chunk assigned to worker""" + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + c.execute(''' + INSERT INTO chunks (id, indicator_type, grid_json, chunk_start, chunk_end, + total_combos, assigned_worker, status, started_at) + VALUES (?, ?, ?, ?, ?, ?, ?, 'running', ?) + ''', (chunk_id, indicator_type, json.dumps(grid.to_dict()), chunk_start, chunk_end, + chunk_end - chunk_start, assigned_worker, datetime.now())) + + conn.commit() + conn.close() + + def complete_chunk(self, chunk_id: str, results_csv_path: str, best_pnl: float) -> None: + """Mark chunk as completed with results""" + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + c.execute(''' + UPDATE chunks + SET status='completed', completed_at=?, results_csv_path=?, best_pnl_in_chunk=? + WHERE id=? + ''', (datetime.now(), results_csv_path, best_pnl, chunk_id)) + + conn.commit() + conn.close() + + def import_results_csv(self, csv_path: str, worker_id: str, chunk_id: str) -> int: + """Import CSV results from comprehensive_sweep into strategies table""" + import csv + + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + imported = 0 + with open(csv_path, 'r') as f: + reader = csv.DictReader(f) + for row in reader: + # Create parameter hash for deduplication + params = {k: v for k, v in row.items() if k not in [ + 'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k', + 'profit_factor', 'max_drawdown', 'sharpe_ratio' + ]} + param_hash = hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest() + + try: + c.execute(''' + INSERT INTO strategies ( + param_hash, indicator_type, params_json, + trades, win_rate, total_pnl, pnl_per_1k, + profit_factor, max_drawdown, sharpe_ratio, + worker_id, chunk_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + param_hash, 'v9_moneyline', json.dumps(params), + int(row['trades']), float(row['win_rate']), float(row['total_pnl']), + float(row['pnl_per_1k']), float(row.get('profit_factor', 0)), + float(row.get('max_drawdown', 0)), float(row.get('sharpe_ratio', 0)), + worker_id, chunk_id + )) + imported += 1 + except sqlite3.IntegrityError: + # Duplicate param_hash - already tested this config + pass + + conn.commit() + conn.close() + return imported + + def get_top_strategies(self, limit: int = 100) -> List[Dict]: + """Get top performing strategies across all tested""" + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + c.execute(''' + SELECT indicator_type, params_json, trades, win_rate, total_pnl, pnl_per_1k, + profit_factor, max_drawdown, sharpe_ratio, tested_at + FROM strategies + WHERE trades >= 700 -- Statistical significance + AND win_rate >= 0.50 AND win_rate <= 0.70 -- Realistic + AND profit_factor >= 1.2 -- Minimum edge + ORDER BY pnl_per_1k DESC + LIMIT ? + ''', (limit,)) + + rows = c.fetchall() + conn.close() + + results = [] + for row in rows: + results.append({ + 'indicator_type': row[0], + 'params': json.loads(row[1]), + 'trades': row[2], + 'win_rate': row[3], + 'total_pnl': row[4], + 'pnl_per_1k': row[5], + 'profit_factor': row[6], + 'max_drawdown': row[7], + 'sharpe_ratio': row[8], + 'tested_at': row[9], + }) + + return results + +class DistributedCoordinator: + """Coordinates distributed parameter sweeps across EPYC servers""" + + def __init__(self): + self.db = ExplorationDatabase(DB_PATH) + RESULTS_DIR.mkdir(parents=True, exist_ok=True) + + def ssh_command(self, worker_id: str, command: str) -> subprocess.CompletedProcess: + """Execute command on worker via SSH""" + worker = WORKERS[worker_id] + + if 'ssh_hop' in worker: + # Worker 2 requires hop through worker 1 + # CRITICAL FIX (Nov 29, 2025): Use double-nested quotes for 2-hop SSH + # Single quotes don't pass command to inner SSH properly + ssh_cmd = f"ssh {worker['ssh_hop']} \"ssh {worker['host']} '{command}'\"" + else: + ssh_cmd = f"ssh {worker['host']} '{command}'" + + return subprocess.run(ssh_cmd, shell=True, capture_output=True, text=True) + + def deploy_worker_script(self, worker_id: str) -> bool: + """Deploy distributed_worker.py to EPYC server""" + worker = WORKERS[worker_id] + script_path = CLUSTER_DIR / 'distributed_worker.py' + + # Copy script to worker's comprehensive_sweep directory + target = f"{worker['workspace']}/backtester/scripts/distributed_worker.py" + + if 'ssh_hop' in worker: + # Two-hop copy for worker2 + print(f"πŸ“€ Copying worker script to {worker_id} via hop...") + # Copy to worker1 first + subprocess.run(f"scp {script_path} {WORKERS['worker1']['host']}:/tmp/", shell=True) + # Then copy from worker1 to worker2 + self.ssh_command('worker1', f"scp /tmp/distributed_worker.py {worker['host']}:{target}") + else: + print(f"πŸ“€ Copying worker script to {worker_id}...") + subprocess.run(f"scp {script_path} {worker['host']}:{target}", shell=True) + + print(f"βœ… Worker script deployed to {worker_id}") + return True + + def assign_chunk(self, worker_id: str, chunk_id: str, grid: ParameterGrid, + chunk_start: int, chunk_end: int) -> bool: + """Assign parameter chunk to worker for processing""" + worker = WORKERS[worker_id] + + # Record in database + self.db.record_chunk(chunk_id, 'v9_moneyline', grid, chunk_start, chunk_end, worker_id) + + # Create chunk specification JSON + chunk_spec = { + 'chunk_id': chunk_id, + 'chunk_start': chunk_start, + 'chunk_end': chunk_end, + 'grid': grid.to_dict(), + 'num_workers': worker['cores'], + } + + chunk_json_path = RESULTS_DIR / f"{chunk_id}_spec.json" + with open(chunk_json_path, 'w') as f: + json.dump(chunk_spec, f, indent=2) + + # Copy chunk spec to worker + target_json = f"{worker['workspace']}/chunk_{chunk_id}.json" + if 'ssh_hop' in worker: + # Two-hop copy + subprocess.run(f"scp {chunk_json_path} {WORKERS['worker1']['host']}:/tmp/", shell=True) + self.ssh_command('worker1', f"scp /tmp/{chunk_id}_spec.json {worker['host']}:{target_json}") + else: + subprocess.run(f"scp {chunk_json_path} {worker['host']}:{target_json}", shell=True) + + # Execute distributed_worker.py on worker + # CRITICAL: Simplified SSH command without bash -c to avoid quoting issues + cmd = (f"cd {worker['workspace']} && " + f"source backtester/.venv/bin/activate && " + f"nohup python3 backtester/scripts/distributed_worker.py {target_json} " + f"> /tmp/{chunk_id}.log 2>&1 &") + + print(f"πŸš€ Starting chunk {chunk_id} on {worker_id} ({chunk_end - chunk_start:,} combos)...") + result = self.ssh_command(worker_id, cmd) + + if result.returncode == 0: + print(f"βœ… Chunk {chunk_id} assigned to {worker_id}") + return True + else: + print(f"❌ Failed to assign chunk {chunk_id} to {worker_id}: {result.stderr}") + return False + + def collect_results(self, worker_id: str, chunk_id: str) -> Optional[str]: + """Collect CSV results from worker""" + worker = WORKERS[worker_id] + + # Check if results file exists on worker + results_csv = f"{worker['workspace']}/chunk_{chunk_id}_results.csv" + check_cmd = f"test -f {results_csv} && echo 'exists'" + result = self.ssh_command(worker_id, check_cmd) + + if 'exists' not in result.stdout: + return None # Results not ready yet + + # Copy results back to master + local_csv = RESULTS_DIR / f"{chunk_id}_results.csv" + + if 'ssh_hop' in worker: + # Two-hop copy back + self.ssh_command('worker1', f"scp {worker['host']}:{results_csv} /tmp/") + subprocess.run(f"scp {WORKERS['worker1']['host']}:/tmp/chunk_{chunk_id}_results.csv {local_csv}", shell=True) + else: + subprocess.run(f"scp {worker['host']}:{results_csv} {local_csv}", shell=True) + + print(f"πŸ“₯ Collected results from {worker_id} chunk {chunk_id}") + + # Import into database + imported = self.db.import_results_csv(str(local_csv), worker_id, chunk_id) + print(f"πŸ“Š Imported {imported} unique strategies from {chunk_id}") + + # Get best P&L from CSV for chunk tracking + import csv + with open(local_csv, 'r') as f: + reader = csv.DictReader(f) + rows = list(reader) + best_pnl = max(float(row['pnl_per_1k']) for row in rows) if rows else 0 + + self.db.complete_chunk(chunk_id, str(local_csv), best_pnl) + + return str(local_csv) + + def start_comprehensive_exploration(self, chunk_size: int = 10000): + """Start massive comprehensive parameter sweep""" + print("=" * 80) + print("πŸš€ DISTRIBUTED COMPREHENSIVE EXPLORATION") + print("=" * 80) + print() + + # Define full parameter grid (can be expanded) + grid = ParameterGrid( + flip_thresholds=[0.4, 0.5, 0.6, 0.7], + ma_gaps=[0.20, 0.30, 0.40, 0.50], + adx_mins=[18, 21, 24, 27], + long_pos_maxs=[60, 65, 70, 75], + short_pos_mins=[20, 25, 30, 35], + cooldowns=[1, 2, 3, 4], + position_sizes=[10000], # Fixed for fair comparison + tp1_multipliers=[1.5, 2.0, 2.5], + tp2_multipliers=[3.0, 4.0, 5.0], + sl_multipliers=[2.5, 3.0, 3.5], + tp1_close_percents=[50, 60, 70, 75], + trailing_multipliers=[1.0, 1.5, 2.0], + vol_mins=[0.8, 1.0, 1.2], + max_bars_list=[300, 500, 1000], + ) + + total_combos = grid.total_combinations() + + print(f"πŸ“Š Total parameter space: {total_combos:,} combinations") + print(f"πŸ“¦ Chunk size: {chunk_size:,} combinations per chunk") + print(f"🎯 Total chunks: {(total_combos + chunk_size - 1) // chunk_size:,}") + print(f"⏱️ Estimated time: {(total_combos * 1.6) / (64 * 3600):.1f} hours with 64 cores") + print() + + # Deploy worker scripts + for worker_id in WORKERS.keys(): + self.deploy_worker_script(worker_id) + + print() + print("πŸ”„ Distributing chunks to workers...") + print() + + # Split work across workers + chunk_id_counter = 0 + chunk_start = 0 + active_chunks = {} + worker_list = list(WORKERS.keys()) # ['worker1', 'worker2'] + + while chunk_start < total_combos: + chunk_end = min(chunk_start + chunk_size, total_combos) + chunk_id = f"v9_chunk_{chunk_id_counter:06d}" + + # Round-robin assignment across both workers for balanced load + worker_id = worker_list[chunk_id_counter % len(worker_list)] + + if self.assign_chunk(worker_id, chunk_id, grid, chunk_start, chunk_end): + active_chunks[chunk_id] = worker_id + + chunk_id_counter += 1 + chunk_start = chunk_end + + # Don't overwhelm workers - limit to 2 chunks per worker at a time + if len(active_chunks) >= len(WORKERS) * 2: + print(f"⏸️ Pausing chunk assignment - {len(active_chunks)} chunks active") + print(f"⏳ Waiting for chunks to complete...") + break + + print() + print(f"βœ… Assigned {len(active_chunks)} initial chunks") + print() + print("πŸ“Š Monitor progress with: python3 cluster/exploration_status.py") + print("πŸ† View top strategies: sqlite3 cluster/exploration.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'") + +def main(): + """Main coordinator entry point""" + import argparse + + parser = argparse.ArgumentParser(description='Distributed continuous optimization coordinator') + parser.add_argument('--chunk-size', type=int, default=10000, + help='Number of combinations per chunk (default: 10000)') + parser.add_argument('--continuous', action='store_true', + help='Run continuously (not implemented yet)') + + args = parser.parse_args() + + coordinator = DistributedCoordinator() + coordinator.start_comprehensive_exploration(chunk_size=args.chunk_size) + +if __name__ == '__main__': + main() diff --git a/cluster/distributed_worker.py b/cluster/distributed_worker.py new file mode 100644 index 0000000..eca498b --- /dev/null +++ b/cluster/distributed_worker.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +""" +Distributed Worker for Comprehensive Sweep + +Runs on EPYC server, executes parameter sweep chunk using existing +comprehensive_sweep.py architecture (simulator.py + MoneyLineInputs). + +Integration with Existing System: +- Uses same simulator.py, indicators, data_loader +- Works with existing .venv Python environment +- Outputs same CSV format as comprehensive_sweep.py +- Can run standalone or as part of distributed cluster + +Usage: + python3 distributed_worker.py /path/to/chunk_spec.json +""" + +import sys +import json +import itertools +import multiprocessing as mp +from pathlib import Path +from datetime import datetime +import csv + +# Import from existing comprehensive_sweep infrastructure +# Match comprehensive_sweep.py import pattern +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from backtester.simulator import simulate_money_line, TradeConfig +from backtester.data_loader import load_csv +from backtester.indicators.money_line import MoneyLineInputs + +def test_config(args): + """Test single parameter configuration (matches comprehensive_sweep.py signature)""" + config_id, params, data_slice = args + + # Unpack parameters (14-dimensional grid) + flip_thresh, ma_gap, adx_min, long_pos, short_pos, cooldown, \ + pos_size, tp1_mult, tp2_mult, sl_mult, tp1_close, trail_mult, \ + vol_min, max_bars = params + + # Create MoneyLineInputs + inputs = MoneyLineInputs( + flip_threshold_percent=flip_thresh, + ma_gap_threshold=ma_gap, + momentum_min_adx=adx_min, + momentum_long_max_pos=long_pos, + momentum_short_min_pos=short_pos, + cooldown_bars=cooldown, + momentum_spacing=3, # Fixed (not in grid) + momentum_cooldown=2, # Fixed (not in grid) + ) + + # Create TradeConfig + config = TradeConfig( + position_size=pos_size, + atr_multiplier_tp1=tp1_mult, + atr_multiplier_tp2=tp2_mult, + atr_multiplier_sl=sl_mult, + take_profit_1_size_percent=tp1_close, + trailing_atr_multiplier=trail_mult, + max_bars_per_trade=max_bars, + ) + + # Quality filter (matches comprehensive_sweep.py) + quality_filter = { + 'min_adx': 15, + 'min_volume_ratio': vol_min, + } + + # Run simulation + try: + results = simulate_money_line( + data_slice.data, + data_slice.symbol, + inputs, + config, + quality_filter + ) + + # Extract metrics + trades = len(results.trades) + win_rate = results.win_rate if trades > 0 else 0 + total_pnl = results.total_pnl + pnl_per_1k = (total_pnl / pos_size * 1000) if pos_size > 0 else 0 + profit_factor = results.profit_factor if hasattr(results, 'profit_factor') else 0 + max_drawdown = abs(results.max_drawdown) if hasattr(results, 'max_drawdown') else 0 + sharpe = results.sharpe_ratio if hasattr(results, 'sharpe_ratio') else 0 + + return (config_id, trades, win_rate, total_pnl, pnl_per_1k, + profit_factor, max_drawdown, sharpe, params) + + except Exception as e: + print(f"Error testing config {config_id}: {e}") + return (config_id, 0, 0, 0, 0, 0, 0, 0, params) + +def process_chunk(chunk_spec_path: str): + """Process parameter chunk specified in JSON file""" + + # Load chunk specification + with open(chunk_spec_path, 'r') as f: + spec = json.load(f) + + chunk_id = spec['chunk_id'] + chunk_start = spec['chunk_start'] + chunk_end = spec['chunk_end'] + grid = spec['grid'] + num_workers = spec['num_workers'] + + # Limit to 70% of available cores (user request) + max_workers = max(1, int(num_workers * 0.7)) + + print(f"🎯 Processing chunk: {chunk_id}") + print(f"πŸ“Š Range: {chunk_start:,} to {chunk_end:,} ({chunk_end - chunk_start:,} combinations)") + print(f"βš™οΈ Workers: {max_workers} cores (70% of {num_workers} available)") + print() + + # Load data (same as comprehensive_sweep.py) + data_path = Path(__file__).parent.parent / 'data' / 'solusdt_5m_aug_nov.csv' + print(f"πŸ“ˆ Loading data from {data_path}...") + data_slice = load_csv(data_path, 'SOL-PERP', '5m') + print(f"βœ… Loaded {len(data_slice.data):,} rows") + print() + + # Generate ALL parameter combinations (same order as comprehensive_sweep.py) + param_lists = [ + grid['flip_thresholds'], + grid['ma_gaps'], + grid['adx_mins'], + grid['long_pos_maxs'], + grid['short_pos_mins'], + grid['cooldowns'], + grid['position_sizes'], + grid['tp1_multipliers'], + grid['tp2_multipliers'], + grid['sl_multipliers'], + grid['tp1_close_percents'], + grid['trailing_multipliers'], + grid['vol_mins'], + grid['max_bars_list'], + ] + + print("πŸ”’ Generating parameter combinations...") + all_combos = list(itertools.product(*param_lists)) + total_combos = len(all_combos) + print(f"βœ… Generated {total_combos:,} total combinations") + + # Extract chunk slice + chunk_combos = all_combos[chunk_start:chunk_end] + print(f"βœ‚οΈ Extracted chunk slice: {len(chunk_combos):,} combinations") + print() + + # Prepare arguments for test_config + args_list = [ + (chunk_start + i, combo, data_slice) + for i, combo in enumerate(chunk_combos) + ] + + # Run multiprocessing sweep (same as comprehensive_sweep.py) + print(f"πŸš€ Starting sweep with {num_workers} workers...") + print() + + results = [] + completed = 0 + best_pnl = float('-inf') + best_config = None + + with mp.Pool(processes=max_workers) as pool: + for result in pool.imap_unordered(test_config, args_list, chunksize=10): + results.append(result) + completed += 1 + + # Track best + if result[4] > best_pnl: # pnl_per_1k + best_pnl = result[4] + best_config = result + + # Progress every 100 configs + if completed % 100 == 0: + pct = (completed / len(chunk_combos)) * 100 + print(f"⏳ Progress: {completed:,}/{len(chunk_combos):,} ({pct:.1f}%) - " + f"Best so far: ${best_pnl:.2f}/1k") + + print() + print(f"βœ… Chunk {chunk_id} complete!") + print(f"πŸ“Š Tested {len(results):,} configurations") + print(f"πŸ† Best P&L: ${best_pnl:.2f} per $1k") + print() + + # Sort by profitability + results.sort(key=lambda x: x[4], reverse=True) + + # Save results to CSV (same format as comprehensive_sweep.py) + output_file = Path(__file__).parent.parent / f'chunk_{chunk_id}_results.csv' + + print(f"πŸ’Ύ Saving results to {output_file}...") + + with open(output_file, 'w', newline='') as f: + writer = csv.writer(f) + + # Header + writer.writerow([ + 'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k', + 'profit_factor', 'max_drawdown', 'sharpe_ratio', + 'flip_threshold', 'ma_gap', 'adx_min', 'long_pos_max', 'short_pos_min', + 'cooldown', 'position_size', 'tp1_mult', 'tp2_mult', 'sl_mult', + 'tp1_close_pct', 'trailing_mult', 'vol_min', 'max_bars' + ]) + + # Write all results + for rank, result in enumerate(results, 1): + config_id, trades, win_rate, total_pnl, pnl_per_1k, \ + profit_factor, max_drawdown, sharpe, params = result + + writer.writerow([ + rank, trades, f'{win_rate:.4f}', f'{total_pnl:.2f}', f'{pnl_per_1k:.2f}', + f'{profit_factor:.3f}', f'{max_drawdown:.2f}', f'{sharpe:.3f}', + *params + ]) + + print(f"βœ… Results saved!") + print() + + # Print top 10 + print("πŸ† Top 10 configurations:") + print() + for i, result in enumerate(results[:10], 1): + config_id, trades, win_rate, total_pnl, pnl_per_1k, \ + profit_factor, max_drawdown, sharpe, params = result + + print(f"{i:2d}. ${pnl_per_1k:7.2f}/1k | " + f"{trades:4d} trades | {win_rate*100:5.1f}% WR | " + f"PF {profit_factor:.2f} | DD {max_drawdown:.1f}%") + + print() + print(f"βœ… Chunk {chunk_id} processing complete!") + + return output_file + +def main(): + """Worker entry point""" + if len(sys.argv) < 2: + print("Usage: python3 distributed_worker.py ") + sys.exit(1) + + chunk_spec_path = sys.argv[1] + + if not Path(chunk_spec_path).exists(): + print(f"Error: Chunk spec file not found: {chunk_spec_path}") + sys.exit(1) + + print("=" * 80) + print("πŸ”§ DISTRIBUTED WORKER") + print("=" * 80) + print() + + start_time = datetime.now() + + output_file = process_chunk(chunk_spec_path) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + print() + print("=" * 80) + print(f"⏱️ Total time: {duration:.1f} seconds ({duration/60:.1f} minutes)") + print(f"πŸ“„ Results: {output_file}") + print("=" * 80) + +if __name__ == '__main__': + main() diff --git a/cluster/distributed_worker_bd.py b/cluster/distributed_worker_bd.py new file mode 100644 index 0000000..fd5c031 --- /dev/null +++ b/cluster/distributed_worker_bd.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 +""" +Distributed worker process for comprehensive parameter exploration +Runs on remote EPYC servers - Modified for bd-host01 directory structure +""" + +import sys +import json +import itertools +import multiprocessing as mp +from pathlib import Path +from datetime import datetime +import csv + +# Add backtester to path for bd-host01 structure +sys.path.insert(0, str(Path(__file__).parent / 'backtester')) + +from backtester.simulator import simulate_money_line, MoneyLineInputs, TradeConfig +from backtester.data_loader import load_csv + +# Rest of the file stays the same as distributed_worker.py +- Works with existing .venv Python environment +- Outputs same CSV format as comprehensive_sweep.py +- Can run standalone or as part of distributed cluster + +Usage: + python3 distributed_worker.py /path/to/chunk_spec.json +""" + +import sys +import json +import itertools +import multiprocessing as mp +from pathlib import Path +from datetime import datetime +import csv + +# Import from existing comprehensive_sweep infrastructure +# These paths work because script runs from /home/comprehensive_sweep/backtester/scripts/ +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from simulator import simulate_money_line, MoneyLineInputs, TradeConfig +from data_loader import load_csv + +def test_config(args): + """Test single parameter configuration (matches comprehensive_sweep.py signature)""" + config_id, params, data_slice = args + + # Unpack parameters (14-dimensional grid) + flip_thresh, ma_gap, adx_min, long_pos, short_pos, cooldown, \ + pos_size, tp1_mult, tp2_mult, sl_mult, tp1_close, trail_mult, \ + vol_min, max_bars = params + + # Create MoneyLineInputs + inputs = MoneyLineInputs( + flip_threshold_percent=flip_thresh, + ma_gap_threshold=ma_gap, + momentum_min_adx=adx_min, + momentum_long_max_pos=long_pos, + momentum_short_min_pos=short_pos, + cooldown_bars=cooldown, + momentum_spacing=3, # Fixed (not in grid) + momentum_cooldown=2, # Fixed (not in grid) + ) + + # Create TradeConfig + config = TradeConfig( + position_size=pos_size, + atr_multiplier_tp1=tp1_mult, + atr_multiplier_tp2=tp2_mult, + atr_multiplier_sl=sl_mult, + take_profit_1_size_percent=tp1_close, + trailing_atr_multiplier=trail_mult, + max_bars_per_trade=max_bars, + ) + + # Quality filter (matches comprehensive_sweep.py) + quality_filter = { + 'min_adx': 15, + 'min_volume_ratio': vol_min, + } + + # Run simulation + try: + results = simulate_money_line( + data_slice.data, + data_slice.symbol, + inputs, + config, + quality_filter + ) + + # Extract metrics + trades = len(results.trades) + win_rate = results.win_rate if trades > 0 else 0 + total_pnl = results.total_pnl + pnl_per_1k = (total_pnl / pos_size * 1000) if pos_size > 0 else 0 + profit_factor = results.profit_factor if hasattr(results, 'profit_factor') else 0 + max_drawdown = abs(results.max_drawdown) if hasattr(results, 'max_drawdown') else 0 + sharpe = results.sharpe_ratio if hasattr(results, 'sharpe_ratio') else 0 + + return (config_id, trades, win_rate, total_pnl, pnl_per_1k, + profit_factor, max_drawdown, sharpe, params) + + except Exception as e: + print(f"Error testing config {config_id}: {e}") + return (config_id, 0, 0, 0, 0, 0, 0, 0, params) + +def process_chunk(chunk_spec_path: str): + """Process parameter chunk specified in JSON file""" + + # Load chunk specification + with open(chunk_spec_path, 'r') as f: + spec = json.load(f) + + chunk_id = spec['chunk_id'] + chunk_start = spec['chunk_start'] + chunk_end = spec['chunk_end'] + grid = spec['grid'] + num_workers = spec['num_workers'] + + print(f"🎯 Processing chunk: {chunk_id}") + print(f"πŸ“Š Range: {chunk_start:,} to {chunk_end:,} ({chunk_end - chunk_start:,} combinations)") + print(f"βš™οΈ Workers: {num_workers} cores") + print() + + # Load data (same as comprehensive_sweep.py) + data_path = Path(__file__).parent.parent / 'data' / 'solusdt_5m.csv' + print(f"πŸ“ˆ Loading data from {data_path}...") + data_slice = load_csv(str(data_path)) + print(f"βœ… Loaded {len(data_slice.data):,} rows") + print() + + # Generate ALL parameter combinations (same order as comprehensive_sweep.py) + param_lists = [ + grid['flip_thresholds'], + grid['ma_gaps'], + grid['adx_mins'], + grid['long_pos_maxs'], + grid['short_pos_mins'], + grid['cooldowns'], + grid['position_sizes'], + grid['tp1_multipliers'], + grid['tp2_multipliers'], + grid['sl_multipliers'], + grid['tp1_close_percents'], + grid['trailing_multipliers'], + grid['vol_mins'], + grid['max_bars_list'], + ] + + print("πŸ”’ Generating parameter combinations...") + all_combos = list(itertools.product(*param_lists)) + total_combos = len(all_combos) + print(f"βœ… Generated {total_combos:,} total combinations") + + # Extract chunk slice + chunk_combos = all_combos[chunk_start:chunk_end] + print(f"βœ‚οΈ Extracted chunk slice: {len(chunk_combos):,} combinations") + print() + + # Prepare arguments for test_config + args_list = [ + (chunk_start + i, combo, data_slice) + for i, combo in enumerate(chunk_combos) + ] + + # Run multiprocessing sweep (same as comprehensive_sweep.py) + print(f"πŸš€ Starting sweep with {num_workers} workers...") + print() + + results = [] + completed = 0 + best_pnl = float('-inf') + best_config = None + + with mp.Pool(processes=num_workers) as pool: + for result in pool.imap_unordered(test_config, args_list, chunksize=10): + results.append(result) + completed += 1 + + # Track best + if result[4] > best_pnl: # pnl_per_1k + best_pnl = result[4] + best_config = result + + # Progress every 100 configs + if completed % 100 == 0: + pct = (completed / len(chunk_combos)) * 100 + print(f"⏳ Progress: {completed:,}/{len(chunk_combos):,} ({pct:.1f}%) - " + f"Best so far: ${best_pnl:.2f}/1k") + + print() + print(f"βœ… Chunk {chunk_id} complete!") + print(f"πŸ“Š Tested {len(results):,} configurations") + print(f"πŸ† Best P&L: ${best_pnl:.2f} per $1k") + print() + + # Sort by profitability + results.sort(key=lambda x: x[4], reverse=True) + + # Save results to CSV (same format as comprehensive_sweep.py) + output_file = Path(__file__).parent.parent / f'chunk_{chunk_id}_results.csv' + + print(f"πŸ’Ύ Saving results to {output_file}...") + + with open(output_file, 'w', newline='') as f: + writer = csv.writer(f) + + # Header + writer.writerow([ + 'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k', + 'profit_factor', 'max_drawdown', 'sharpe_ratio', + 'flip_threshold', 'ma_gap', 'adx_min', 'long_pos_max', 'short_pos_min', + 'cooldown', 'position_size', 'tp1_mult', 'tp2_mult', 'sl_mult', + 'tp1_close_pct', 'trailing_mult', 'vol_min', 'max_bars' + ]) + + # Write all results + for rank, result in enumerate(results, 1): + config_id, trades, win_rate, total_pnl, pnl_per_1k, \ + profit_factor, max_drawdown, sharpe, params = result + + writer.writerow([ + rank, trades, f'{win_rate:.4f}', f'{total_pnl:.2f}', f'{pnl_per_1k:.2f}', + f'{profit_factor:.3f}', f'{max_drawdown:.2f}', f'{sharpe:.3f}', + *params + ]) + + print(f"βœ… Results saved!") + print() + + # Print top 10 + print("πŸ† Top 10 configurations:") + print() + for i, result in enumerate(results[:10], 1): + config_id, trades, win_rate, total_pnl, pnl_per_1k, \ + profit_factor, max_drawdown, sharpe, params = result + + print(f"{i:2d}. ${pnl_per_1k:7.2f}/1k | " + f"{trades:4d} trades | {win_rate*100:5.1f}% WR | " + f"PF {profit_factor:.2f} | DD {max_drawdown:.1f}%") + + print() + print(f"βœ… Chunk {chunk_id} processing complete!") + + return output_file + +def main(): + """Worker entry point""" + if len(sys.argv) < 2: + print("Usage: python3 distributed_worker.py ") + sys.exit(1) + + chunk_spec_path = sys.argv[1] + + if not Path(chunk_spec_path).exists(): + print(f"Error: Chunk spec file not found: {chunk_spec_path}") + sys.exit(1) + + print("=" * 80) + print("πŸ”§ DISTRIBUTED WORKER") + print("=" * 80) + print() + + start_time = datetime.now() + + output_file = process_chunk(chunk_spec_path) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + print() + print("=" * 80) + print(f"⏱️ Total time: {duration:.1f} seconds ({duration/60:.1f} minutes)") + print(f"πŸ“„ Results: {output_file}") + print("=" * 80) + +if __name__ == '__main__': + main() diff --git a/cluster/distributed_worker_bd_clean.py b/cluster/distributed_worker_bd_clean.py new file mode 100644 index 0000000..98ef88d --- /dev/null +++ b/cluster/distributed_worker_bd_clean.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +""" +Distributed Worker for Comprehensive Sweep + +Runs on EPYC server (bd-host01), executes parameter sweep chunk using existing +comprehensive_sweep.py architecture (simulator.py + MoneyLineInputs). + +Integration with Existing System: +- Uses same simulator.py, indicators, data_loader +- Works with existing .venv Python environment +- Outputs same CSV format as comprehensive_sweep.py +- Can run standalone or as part of distributed cluster + +Usage: + python3 distributed_worker.py /path/to/chunk_spec.json +""" + +import sys +import json +import itertools +import multiprocessing as mp +from pathlib import Path +from datetime import datetime +import csv + +# Import from bd-host01 directory structure +# Script runs from /home/backtest_dual/backtest/backtester/scripts/ +# simulator.py imports use 'backtester.indicators.money_line' format +# So we need to add /home/backtest_dual/backtest/ to sys.path (3 parents up) +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from backtester.simulator import simulate_money_line, MoneyLineInputs, TradeConfig +from backtester.data_loader import load_csv + +def test_config(args): + """Test single parameter configuration (matches comprehensive_sweep.py signature)""" + config_id, params, data_slice = args + + # Unpack parameters (14-dimensional grid) + flip_thresh, ma_gap, adx_min, long_pos, short_pos, cooldown, \ + pos_size, tp1_mult, tp2_mult, sl_mult, tp1_close, trail_mult, \ + vol_min, max_bars = params + + # Create MoneyLineInputs + inputs = MoneyLineInputs( + flip_threshold_percent=flip_thresh, + ma_gap_threshold=ma_gap, + momentum_min_adx=adx_min, + momentum_long_max_pos=long_pos, + momentum_short_min_pos=short_pos, + cooldown_bars=cooldown, + momentum_spacing=3, # Fixed (not in grid) + momentum_cooldown=2, # Fixed (not in grid) + ) + + # Create TradeConfig + config = TradeConfig( + position_size=pos_size, + atr_multiplier_tp1=tp1_mult, + atr_multiplier_tp2=tp2_mult, + atr_multiplier_sl=sl_mult, + take_profit_1_size_percent=tp1_close, + trailing_atr_multiplier=trail_mult, + max_bars_per_trade=max_bars, + ) + + # Quality filter (matches comprehensive_sweep.py) + quality_filter = { + 'min_adx': 15, + 'min_volume_ratio': vol_min, + } + + # Run simulation + try: + results = simulate_money_line( + data_slice.data, + data_slice.symbol, + inputs, + config, + quality_filter + ) + + # Extract metrics + trades = len(results.trades) + win_rate = results.win_rate if trades > 0 else 0 + total_pnl = results.total_pnl + pnl_per_1k = (total_pnl / pos_size * 1000) if pos_size > 0 else 0 + profit_factor = results.profit_factor if hasattr(results, 'profit_factor') else 0 + max_drawdown = abs(results.max_drawdown) if hasattr(results, 'max_drawdown') else 0 + sharpe = results.sharpe_ratio if hasattr(results, 'sharpe_ratio') else 0 + + return (config_id, trades, win_rate, total_pnl, pnl_per_1k, + profit_factor, max_drawdown, sharpe, params) + + except Exception as e: + print(f"Error testing config {config_id}: {e}") + return (config_id, 0, 0, 0, 0, 0, 0, 0, params) + +def process_chunk(chunk_spec_path: str): + """Process parameter chunk specified in JSON file""" + + # Load chunk specification + with open(chunk_spec_path, 'r') as f: + spec = json.load(f) + + chunk_id = spec['chunk_id'] + chunk_start = spec['chunk_start'] + chunk_end = spec['chunk_end'] + grid = spec['grid'] + num_workers = spec['num_workers'] + + # Limit to 70% of available cores (user request) + max_workers = max(1, int(num_workers * 0.7)) + + print(f"🎯 Processing chunk: {chunk_id}") + print(f"πŸ“Š Range: {chunk_start:,} to {chunk_end:,} ({chunk_end - chunk_start:,} combinations)") + print(f"βš™οΈ Workers: {max_workers} cores (70% of {num_workers} available)") + print() + + # Load data (same as comprehensive_sweep.py) + data_path = Path(__file__).parent.parent / 'data' / 'solusdt_5m.csv' + print(f"πŸ“ˆ Loading data from {data_path}...") + # bd-host01's load_csv requires symbol and timeframe arguments + data_slice = load_csv(data_path, 'solusdt', '5m') + print(f"βœ… Loaded {len(data_slice.data):,} rows") + print() + + # Generate ALL parameter combinations (same order as comprehensive_sweep.py) + param_lists = [ + grid['flip_thresholds'], + grid['ma_gaps'], + grid['adx_mins'], + grid['long_pos_maxs'], + grid['short_pos_mins'], + grid['cooldowns'], + grid['position_sizes'], + grid['tp1_multipliers'], + grid['tp2_multipliers'], + grid['sl_multipliers'], + grid['tp1_close_percents'], + grid['trailing_multipliers'], + grid['vol_mins'], + grid['max_bars_list'], + ] + + print("πŸ”’ Generating parameter combinations...") + all_combos = list(itertools.product(*param_lists)) + total_combos = len(all_combos) + print(f"βœ… Generated {total_combos:,} total combinations") + + # Extract chunk slice + chunk_combos = all_combos[chunk_start:chunk_end] + print(f"βœ‚οΈ Extracted chunk slice: {len(chunk_combos):,} combinations") + print() + + # Prepare arguments for test_config + args_list = [ + (chunk_start + i, combo, data_slice) + for i, combo in enumerate(chunk_combos) + ] + + # Run multiprocessing sweep (same as comprehensive_sweep.py) + print(f"πŸš€ Starting sweep with {num_workers} workers...") + print() + + results = [] + completed = 0 + best_pnl = float('-inf') + best_config = None + + with mp.Pool(processes=max_workers) as pool: + for result in pool.imap_unordered(test_config, args_list, chunksize=10): + results.append(result) + completed += 1 + + # Track best + if result[4] > best_pnl: # pnl_per_1k + best_pnl = result[4] + best_config = result + + # Progress every 100 configs + if completed % 100 == 0: + pct = (completed / len(chunk_combos)) * 100 + print(f"⏳ Progress: {completed:,}/{len(chunk_combos):,} ({pct:.1f}%) - " + f"Best so far: ${best_pnl:.2f}/1k") + + print() + print(f"βœ… Chunk {chunk_id} complete!") + print(f"πŸ“Š Tested {len(results):,} configurations") + print(f"πŸ† Best P&L: ${best_pnl:.2f} per $1k") + print() + + # Sort by profitability + results.sort(key=lambda x: x[4], reverse=True) + + # Save results to CSV (same format as comprehensive_sweep.py) + output_file = Path(__file__).parent.parent / f'chunk_{chunk_id}_results.csv' + + print(f"πŸ’Ύ Saving results to {output_file}...") + + with open(output_file, 'w', newline='') as f: + writer = csv.writer(f) + + # Header + writer.writerow([ + 'rank', 'trades', 'win_rate', 'total_pnl', 'pnl_per_1k', + 'profit_factor', 'max_drawdown', 'sharpe_ratio', + 'flip_threshold', 'ma_gap', 'adx_min', 'long_pos_max', 'short_pos_min', + 'cooldown', 'position_size', 'tp1_mult', 'tp2_mult', 'sl_mult', + 'tp1_close_pct', 'trailing_mult', 'vol_min', 'max_bars' + ]) + + # Write all results + for rank, result in enumerate(results, 1): + config_id, trades, win_rate, total_pnl, pnl_per_1k, \ + profit_factor, max_drawdown, sharpe, params = result + + writer.writerow([ + rank, trades, f'{win_rate:.4f}', f'{total_pnl:.2f}', f'{pnl_per_1k:.2f}', + f'{profit_factor:.3f}', f'{max_drawdown:.2f}', f'{sharpe:.3f}', + *params + ]) + + print(f"βœ… Results saved!") + print() + + # Print top 10 + print("πŸ† Top 10 configurations:") + print() + for i, result in enumerate(results[:10], 1): + config_id, trades, win_rate, total_pnl, pnl_per_1k, \ + profit_factor, max_drawdown, sharpe, params = result + + print(f"{i:2d}. ${pnl_per_1k:7.2f}/1k | " + f"{trades:4d} trades | {win_rate*100:5.1f}% WR | " + f"PF {profit_factor:.2f} | DD {max_drawdown:.1f}%") + + print() + print(f"βœ… Chunk {chunk_id} processing complete!") + + return output_file + +def main(): + """Worker entry point""" + if len(sys.argv) < 2: + print("Usage: python3 distributed_worker.py ") + sys.exit(1) + + chunk_spec_path = sys.argv[1] + + if not Path(chunk_spec_path).exists(): + print(f"Error: Chunk spec file not found: {chunk_spec_path}") + sys.exit(1) + + print("=" * 80) + print("πŸ”§ DISTRIBUTED WORKER") + print("=" * 80) + print() + + start_time = datetime.now() + + output_file = process_chunk(chunk_spec_path) + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + print() + print("=" * 80) + print(f"⏱️ Total time: {duration:.1f} seconds ({duration/60:.1f} minutes)") + print(f"πŸ“„ Results: {output_file}") + print("=" * 80) + +if __name__ == '__main__': + main() diff --git a/cluster/monitor_bd_host01.sh b/cluster/monitor_bd_host01.sh new file mode 100755 index 0000000..d4e708f --- /dev/null +++ b/cluster/monitor_bd_host01.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Monitor bd-host01 worker progress + +echo "==================================" +echo "BD-HOST01 WORKER MONITOR" +echo "==================================" +echo + +echo "=== CPU Usage ===" +ssh root@10.10.254.106 "ssh root@10.20.254.100 'top -bn1 | grep \"Cpu(s)\"'" +echo + +echo "=== Load Average ===" +ssh root@10.10.254.106 "ssh root@10.20.254.100 'uptime'" +echo + +echo "=== Worker Processes ===" +WORKER_COUNT=$(ssh root@10.10.254.106 "ssh root@10.20.254.100 'ps aux | grep distributed_worker | grep -v grep | wc -l'") +echo "Active workers: $WORKER_COUNT" +echo + +echo "=== Output Files ===" +ssh root@10.10.254.106 "ssh root@10.20.254.100 'ls -lh /home/backtest_dual/backtest/chunk_*_results.csv 2>/dev/null || echo \"Still processing - no results file yet\"'" +echo + +echo "=== Latest Log Lines ===" +ssh root@10.10.254.106 "ssh root@10.20.254.100 'tail -10 /tmp/v9_chunk_000000.log'" +echo + +if [ "$WORKER_COUNT" -eq 0 ]; then + echo "⚠️ Worker finished or crashed!" + echo "Check full log: ssh root@10.10.254.106 \"ssh root@10.20.254.100 'cat /tmp/v9_chunk_000000.log'\"" +else + echo "βœ… Worker is running - processing 10,000 parameter combinations" + echo " This will take 10-30 minutes depending on complexity" +fi