diff --git a/cluster/DEPLOYMENT.md b/cluster/DEPLOYMENT.md new file mode 100644 index 0000000..041077d --- /dev/null +++ b/cluster/DEPLOYMENT.md @@ -0,0 +1,415 @@ +# šŸš€ Continuous Optimization Cluster - Deployment Guide + +## ⚔ Quick Deploy (5 minutes) + +```bash +cd /home/icke/traderv4/cluster + +# 1. Setup both EPYC servers +./setup_cluster.sh + +# 2. Start master controller +python3 master.py + +# 3. Monitor status (separate terminal) +watch -n 10 'python3 status.py' +``` + +--- + +## šŸ“‹ Prerequisites Checklist + +- [x] **SSH Access:** Keys configured for both EPYC servers + - `root@10.10.254.106` (pve-nu-monitor01) + - `root@10.20.254.100` (srv-bd-host01 via monitor01) + +- [x] **Python 3.7+** installed on all servers + +- [x] **OHLCV Data:** `backtester/data/solusdt_5m.csv` (139,678 rows) + +- [ ] **Backtester Code:** In `/home/icke/traderv4/backtester/` + - `backtester_core.py` + - `v9_moneyline_ma_gap.py` + - `moneyline_core.py` + +--- + +## šŸ—ļø Step-by-Step Setup + +### Step 1: Verify Backtester Works Locally + +```bash +cd /home/icke/traderv4/backtester + +# Test v9 backtest +python3 backtester_core.py \ + --data data/solusdt_5m.csv \ + --indicator v9 \ + --flip-threshold 0.6 \ + --ma-gap 0.35 \ + --momentum-adx 23 \ + --output json +``` + +**Expected output:** +```json +{ + "pnl": 192.50, + "trades": 569, + "win_rate": 60.98, + "profit_factor": 1.022, + "max_drawdown": 1360.58 +} +``` + +### Step 2: Deploy to EPYC Servers + +```bash +cd /home/icke/traderv4/cluster +./setup_cluster.sh +``` + +**This will:** +1. Create `/root/optimization-cluster` on both servers +2. Install Python venv + pandas/numpy +3. Copy backtester code +4. Copy worker.py script +5. Copy OHLCV data +6. Verify installation + +**Expected output:** +``` +šŸš€ Setting up optimization cluster... + +Setting up Worker 1 (root@10.10.254.106)... + šŸ“¦ Installing Python packages... + šŸ“ Copying backtester modules... + šŸ“„ Installing worker script... + šŸ“Š Copying OHLCV data... + āœ… Verifying installation... +āœ… Worker 1 (pve-nu-monitor01) setup complete + +Setting up Worker 2 (root@10.20.254.100)... + šŸ“¦ Installing Python packages... + šŸ“ Copying backtester modules... + šŸ“„ Installing worker script... + šŸ“Š Copying OHLCV data... + āœ… Verifying installation... +āœ… Worker 2 (srv-bd-host01) setup complete + +šŸŽ‰ Cluster setup complete! +``` + +### Step 3: Start Master Controller + +```bash +python3 master.py +``` + +**Master will:** +1. Initialize SQLite database (`strategies.db`) +2. Generate initial v9 parameter sweep (27 jobs) +3. Start monitoring loop (60-second intervals) +4. Assign jobs to idle workers +5. Collect and rank results + +**Expected output:** +``` +šŸš€ Cluster Master starting... +šŸ“Š Workers: 2 +šŸ’¾ Database: /home/icke/traderv4/cluster/strategies.db +šŸ“ Queue: /home/icke/traderv4/cluster/queue + +šŸ”§ Generating v9 parameter sweep jobs... +āœ… Created job: v9_moneyline_1701234567890 (priority 1) +āœ… Created job: v9_moneyline_1701234567891 (priority 1) +... +āœ… Created 27 v9 refinement jobs + +============================================================ +šŸ”„ Iteration 1 - 2025-11-29 15:30:00 + +šŸ“¤ Assigning v9_moneyline_1701234567890.json to worker1... +āœ… Job started on worker1 + +šŸ“¤ Assigning v9_moneyline_1701234567891.json to worker2... +āœ… Job started on worker2 + +šŸ“Š Status: 25 queued | 2 running | 0 completed +``` + +### Step 4: Monitor Progress + +**Terminal 1 - Master logs:** +```bash +cd /home/icke/traderv4/cluster +python3 master.py +``` + +**Terminal 2 - Status dashboard:** +```bash +watch -n 10 'python3 status.py' +``` + +**Terminal 3 - Queue size:** +```bash +watch -n 5 'ls -1 cluster/queue/*.json 2>/dev/null | wc -l' +``` + +--- + +## šŸ“Š Understanding Results + +### Status Dashboard Output + +``` +====================================================================== +šŸŽÆ OPTIMIZATION CLUSTER STATUS +====================================================================== +šŸ“… 2025-11-29 15:45:00 + +šŸ“‹ Queue: 15 jobs waiting + Running: 2 + Completed: 10 + +šŸ† TOP 5 STRATEGIES: +---------------------------------------------------------------------- +Rank Strategy P&L/1k Trades WR% PF +---------------------------------------------------------------------- +1 v9_flip0.7_ma0.40_adx25 $215.80 587 62.3% 1.18 +2 v9_flip0.6_ma0.35_adx23 $208.40 569 61.5% 1.12 +3 v9_flip0.7_ma0.35_adx25 $205.20 601 60.8% 1.09 +4 v9_flip0.6_ma0.40_adx21 $198.70 553 61.2% 1.07 +5 v9_flip0.5_ma0.35_adx23 $192.50 569 60.9% 1.02 + +šŸ“Š BASELINE COMPARISON: + v9 baseline: $192.00/1k (current production) + Best found: $215.80/1k (+12.4% improvement) āœ… + +====================================================================== +``` + +### Strategy Naming Convention + +Format: `{indicator}_{param1}_{param2}_{param3}...` + +Example: `v9_flip0.7_ma0.40_adx25` +- `v9`: Money Line indicator +- `flip0.7`: flip_threshold = 0.7 (70% EMA flip confirmation) +- `ma0.40`: ma_gap = 0.40 (MA50-MA200 gap bonus threshold) +- `adx25`: momentum_adx = 25 (ADX requirement for momentum filter) + +--- + +## šŸ”§ Troubleshooting + +### Problem: Workers not picking up jobs + +**Check worker health:** +```bash +ssh root@10.10.254.106 'pgrep -f backtester || echo IDLE' +ssh root@10.10.254.106 'ssh root@10.20.254.100 "pgrep -f backtester || echo IDLE"' +``` + +**View worker logs:** +```bash +ssh root@10.10.254.106 'tail -f /root/optimization-cluster/logs/worker.log' +``` + +**Restart worker manually:** +```bash +ssh root@10.10.254.106 'cd /root/optimization-cluster && python3 worker.py jobs/v9_moneyline_*.json' +``` + +### Problem: Jobs stuck in "running" status + +**Reset stale jobs (>30 minutes):** +```bash +sqlite3 cluster/strategies.db <$200/1k P&L configuration + +**Week 2-3 (New indicators):** +- Volume profile: 27 configurations +- Order flow: 27 configurations +- Market structure: 27 configurations +- Target: Find >$250/1k P&L strategy + +**Week 4+ (Advanced):** +- Multi-timeframe: 81 configurations +- ML-based scoring: 100+ hyperparameter combinations +- Target: Find >$300/1k P&L strategy + +--- + +## šŸ”’ Safety & Deployment + +### Validation Gates + +Before deploying strategy to production: + +1. **Trade Count:** ≄700 trades (statistical significance) +2. **Win Rate:** 63-68% realistic range +3. **Profit Factor:** ≄1.5 solid edge +4. **Max Drawdown:** <20% manageable +5. **Sharpe Ratio:** ≄1.0 risk-adjusted +6. **Consistency:** Top 3 for 7 days straight + +### Manual Deployment Process + +```bash +# 1. Review top strategy +sqlite3 cluster/strategies.db "SELECT * FROM strategies WHERE name = 'v9_flip0.7_ma0.40_adx25'" + +# 2. Extract parameters +# flip_threshold: 0.7 +# ma_gap: 0.40 +# momentum_adx: 25 + +# 3. Update TradingView indicator +# Edit moneyline_v9_ma_gap.pinescript +# Change parameters to winning values + +# 4. Update TradingView alerts +# Verify alerts fire with new parameters + +# 5. Monitor first 10 trades +# Ensure behavior matches backtest + +# 6. Full deployment after validation +``` + +### Auto-Deployment (Future) + +Once confident in system: + +1. Master marks top strategy as `status = 'staging'` +2. User reviews via web dashboard +3. User clicks "Deploy" button +4. System auto-updates TradingView via API +5. Alerts regenerated with new parameters +6. First 10 trades monitored closely + +--- + +## šŸ“ž Support + +- **Main docs:** `/home/icke/traderv4/.github/copilot-instructions.md` +- **Cluster README:** `/home/icke/traderv4/cluster/README.md` +- **Backtester docs:** `/home/icke/traderv4/backtester/README.md` (if exists) + +--- + +**Ready to deploy?** + +```bash +cd /home/icke/traderv4/cluster +./setup_cluster.sh +``` + +Let the machines find better strategies while you sleep! šŸš€ diff --git a/cluster/README.md b/cluster/README.md new file mode 100644 index 0000000..d8b6171 --- /dev/null +++ b/cluster/README.md @@ -0,0 +1,250 @@ +# Continuous Optimization Cluster + +24/7 automated strategy optimization across 2 EPYC servers (64 cores total). + +## šŸ—ļø Architecture + +``` +Master (your local machine) + ↓ Job Queue (file-based) + ↓ +Worker 1: pve-nu-monitor01 (22 workers @ 70% CPU) +Worker 2: srv-bd-host01 (22 workers @ 70% CPU) + ↓ +Results Database (SQLite) + ↓ +Top Strategies (auto-deployment ready) +``` + +## šŸš€ Quick Start + +### 1. Setup Cluster + +```bash +cd /home/icke/traderv4/cluster +chmod +x setup_cluster.sh +./setup_cluster.sh +``` + +This will: +- Create `/root/optimization-cluster` on both EPYC servers +- Install Python dependencies (pandas, numpy) +- Copy backtester code and OHLCV data +- Install worker scripts + +### 2. Start Master Controller + +```bash +python3 master.py +``` + +Master will: +- Generate initial job queue (v9 parameter sweep: 27 combinations) +- Monitor both workers every 60 seconds +- Assign jobs to idle workers +- Collect and rank results +- Display top performers + +### 3. Monitor Progress + +**Terminal 1 - Master logs:** +```bash +cd /home/icke/traderv4/cluster +python3 master.py +``` + +**Terminal 2 - Job queue:** +```bash +watch -n 5 'ls -1 cluster/queue/*.json 2>/dev/null | wc -l' +``` + +**Terminal 3 - Results:** +```bash +watch -n 10 'sqlite3 cluster/strategies.db "SELECT name, pnl_per_1k, trade_count, win_rate FROM strategies ORDER BY pnl_per_1k DESC LIMIT 5"' +``` + +## šŸ“Š Database Schema + +### strategies table +- `name`: Strategy identifier (e.g., "v9_flip0.6_ma0.35_adx23") +- `indicator_type`: Indicator family (v9_moneyline, volume_profile, etc.) +- `params`: JSON parameter configuration +- `pnl_per_1k`: Performance metric ($ PnL per $1k capital) +- `trade_count`: Total trades in backtest +- `win_rate`: Percentage winning trades +- `profit_factor`: Gross profit / gross loss +- `max_drawdown`: Largest peak-to-trough decline +- `status`: pending/running/completed/deployed + +### jobs table +- `job_file`: Filename in queue directory +- `priority`: 1 (high), 2 (medium), 3 (low) +- `worker_id`: Which worker is processing +- `status`: queued/running/completed/failed + +## šŸŽÆ Job Priorities + +**Priority 1 (HIGH):** Known good strategies +- v9 refinements (flip_threshold, ma_gap, momentum_adx) +- Proven concepts with minor tweaks + +**Priority 2 (MEDIUM):** New concepts +- Volume profile integration +- Order flow analysis +- Market structure detection + +**Priority 3 (LOW):** Experimental +- ML-based indicators +- Neural network predictions +- Complex multi-timeframe logic + +## šŸ“ˆ Adding New Strategies + +### Example: Test volume profile indicator + +```python +from cluster.master import ClusterMaster + +master = ClusterMaster() + +# Add volume profile jobs +for profile_window in [20, 50, 100]: + for entry_threshold in [0.6, 0.7, 0.8]: + params = { + 'profile_window': profile_window, + 'entry_threshold': entry_threshold, + 'stop_loss_atr': 3.0 + } + + master.queue.create_job( + 'volume_profile', + params, + priority=2 # MEDIUM priority + ) +``` + +## šŸ”’ Safety Features + +1. **Resource Limits:** Each worker respects 70% CPU cap +2. **Memory Management:** 4GB per worker, prevents OOM +3. **Disk Monitoring:** Auto-cleanup old results when space low +4. **Error Recovery:** Failed jobs automatically requeued +5. **Manual Approval:** Top strategies wait for user deployment + +## šŸ† Auto-Deployment Gates + +Strategy must pass ALL checks before auto-deployment: + +1. **Trade Count:** Minimum 700 trades (statistical significance) +2. **Win Rate:** 63-68% realistic range +3. **Profit Factor:** ≄1.5 (solid edge) +4. **Max Drawdown:** <20% manageable risk +5. **Sharpe Ratio:** ≄1.0 risk-adjusted returns +6. **Consistency:** Top 3 in rolling 7-day window + +## šŸ“‹ Operational Commands + +### View Queue Status +```bash +ls -lh cluster/queue/ +``` + +### Check Worker Health +```bash +ssh root@10.10.254.106 'pgrep -f backtester' +ssh root@10.10.254.106 'ssh root@10.20.254.100 "pgrep -f backtester"' +``` + +### View Top 10 Strategies +```bash +sqlite3 cluster/strategies.db <30 min) +sqlite3 cluster/strategies.db <$200/1k P&L + +**Phase 2 (Week 2-3):** Volume integration +- Volume profile entries +- Order flow imbalance detection +- Target: >$250/1k P&L + +**Phase 3 (Week 4+):** Advanced concepts +- Multi-timeframe confirmation +- Market structure analysis +- ML-based signal quality scoring +- Target: >$300/1k P&L + +## šŸ“ž Contact + +Questions? Check copilot-instructions.md or ask in main project chat. diff --git a/cluster/master.py b/cluster/master.py new file mode 100644 index 0000000..ae6659a --- /dev/null +++ b/cluster/master.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python3 +""" +Continuous Optimization Cluster - MASTER Controller +Manages job queue, coordinates workers, aggregates results +""" + +import json +import time +import os +import subprocess +from datetime import datetime +from pathlib import Path +import sqlite3 + +# Configuration +WORKERS = { + 'worker1': { + 'host': 'root@10.10.254.106', + 'cores': 22, # 70% of 32 + 'workspace': '/root/optimization-cluster', + }, + 'worker2': { + 'host': 'root@10.10.254.106', # Connect through monitor01 + 'ssh_hop': 'root@10.20.254.100', # Actual target + 'cores': 22, # 70% of 32 + 'workspace': '/root/optimization-cluster', + } +} + +CLUSTER_DIR = Path(__file__).parent +QUEUE_DIR = CLUSTER_DIR / 'queue' +RESULTS_DIR = CLUSTER_DIR / 'results' +DB_PATH = CLUSTER_DIR / 'strategies.db' + +# Job priorities +PRIORITY_HIGH = 1 # Known good strategies (v9 refinements) +PRIORITY_MEDIUM = 2 # New concepts (volume profiles, orderflow) +PRIORITY_LOW = 3 # Experimental (ML, neural networks) + +class StrategyDatabase: + """SQLite database for strategy performance tracking""" + + def __init__(self, db_path): + self.db_path = db_path + self.init_db() + + def init_db(self): + """Create tables if not exist""" + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + c.execute(''' + CREATE TABLE IF NOT EXISTS strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + indicator_type TEXT NOT NULL, + params JSON NOT NULL, + pnl_per_1k REAL, + trade_count INTEGER, + win_rate REAL, + profit_factor REAL, + max_drawdown REAL, + sharpe_ratio REAL, + tested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + status TEXT DEFAULT 'pending', + notes TEXT + ) + ''') + + c.execute(''' + CREATE TABLE IF NOT EXISTS backtest_results ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + strategy_id INTEGER, + config JSON NOT NULL, + pnl REAL, + trades INTEGER, + win_rate REAL, + completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (strategy_id) REFERENCES strategies (id) + ) + ''') + + c.execute(''' + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_file TEXT UNIQUE NOT NULL, + priority INTEGER DEFAULT 2, + worker_id TEXT, + status TEXT DEFAULT 'queued', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + started_at TIMESTAMP, + completed_at TIMESTAMP + ) + ''') + + conn.commit() + conn.close() + + def add_strategy(self, name, indicator_type, params): + """Add new strategy to test""" + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + try: + c.execute(''' + INSERT INTO strategies (name, indicator_type, params) + VALUES (?, ?, ?) + ''', (name, indicator_type, json.dumps(params))) + conn.commit() + return c.lastrowid + except sqlite3.IntegrityError: + # Strategy already exists + return None + finally: + conn.close() + + def get_top_strategies(self, limit=10): + """Get top performing strategies""" + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + c.execute(''' + SELECT name, indicator_type, pnl_per_1k, trade_count, + win_rate, profit_factor, max_drawdown + FROM strategies + WHERE status = 'completed' + ORDER BY pnl_per_1k DESC + LIMIT ? + ''', (limit,)) + + results = c.fetchall() + conn.close() + return results + +class JobQueue: + """Manages job queue and worker assignments""" + + def __init__(self, queue_dir, db): + self.queue_dir = Path(queue_dir) + self.queue_dir.mkdir(exist_ok=True) + self.db = db + + def create_job(self, indicator, params, priority=PRIORITY_MEDIUM): + """Create a new backtest job""" + job_id = f"{indicator}_{int(time.time() * 1000)}" + job_file = self.queue_dir / f"{job_id}.json" + + job = { + 'id': job_id, + 'indicator': indicator, + 'params': params, + 'priority': priority, + 'created_at': datetime.now().isoformat(), + 'status': 'queued' + } + + with open(job_file, 'w') as f: + json.dump(job, f, indent=2) + + # Log to database + conn = sqlite3.connect(self.db.db_path) + c = conn.cursor() + c.execute(''' + INSERT INTO jobs (job_file, priority, status) + VALUES (?, ?, 'queued') + ''', (job_file.name, priority)) + conn.commit() + conn.close() + + print(f"āœ… Created job: {job_id} (priority {priority})") + return job_id + + def get_next_job(self): + """Get highest priority job from queue""" + jobs = sorted( + self.queue_dir.glob("*.json"), + key=lambda f: json.load(open(f)).get('priority', 2) + ) + + if jobs: + return jobs[0] + return None + + def mark_running(self, job_file, worker_id): + """Mark job as running""" + with open(job_file, 'r') as f: + job = json.load(f) + + job['status'] = 'running' + job['worker_id'] = worker_id + job['started_at'] = datetime.now().isoformat() + + with open(job_file, 'w') as f: + json.dump(job, f, indent=2) + + # Update database + conn = sqlite3.connect(self.db.db_path) + c = conn.cursor() + c.execute(''' + UPDATE jobs + SET status = 'running', worker_id = ?, started_at = CURRENT_TIMESTAMP + WHERE job_file = ? + ''', (worker_id, job_file.name)) + conn.commit() + conn.close() + +class WorkerManager: + """Manages worker coordination""" + + def __init__(self, workers): + self.workers = workers + + def check_worker_status(self, worker_id): + """Check if worker is idle""" + worker = self.workers[worker_id] + + # Check via SSH if worker has running jobs + cmd = f"ssh {worker['host']}" + if 'ssh_hop' in worker: + cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}" + + cmd += " 'pgrep -f backtester || echo IDLE'" + + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5) + return 'IDLE' in result.stdout + except: + return False + + def assign_job(self, worker_id, job_file): + """Assign job to worker""" + worker = self.workers[worker_id] + + print(f"šŸ“¤ Assigning {job_file.name} to {worker_id}...") + + # Copy job to worker + cmd = f"scp {job_file} {worker['host']}:{worker['workspace']}/jobs/" + subprocess.run(cmd, shell=True) + + # Trigger worker execution + cmd = f"ssh {worker['host']}" + if 'ssh_hop' in worker: + cmd = f"ssh {worker['host']} ssh {worker['ssh_hop']}" + + cmd += f" 'cd {worker['workspace']} && python3 worker.py {job_file.name} > logs/worker.log 2>&1 &'" + subprocess.run(cmd, shell=True) + + print(f"āœ… Job started on {worker_id}") + +class ClusterMaster: + """Main cluster controller""" + + def __init__(self): + self.db = StrategyDatabase(DB_PATH) + self.queue = JobQueue(QUEUE_DIR, self.db) + self.workers = WorkerManager(WORKERS) + self.results_dir = Path(RESULTS_DIR) + self.results_dir.mkdir(exist_ok=True) + + def generate_v9_jobs(self): + """Generate v9 parameter sweep jobs""" + print("šŸ”§ Generating v9 parameter sweep jobs...") + + # Refined parameter grid based on baseline results + flip_thresholds = [0.5, 0.6, 0.7] + ma_gaps = [0.30, 0.35, 0.40] + momentum_adx = [21, 23, 25] + + job_count = 0 + for flip in flip_thresholds: + for ma_gap in ma_gaps: + for adx in momentum_adx: + params = { + 'flip_threshold': flip, + 'ma_gap': ma_gap, + 'momentum_adx': adx, + 'long_pos': 70, + 'short_pos': 25, + 'cooldown_bars': 2 + } + + self.queue.create_job('v9_moneyline', params, PRIORITY_HIGH) + job_count += 1 + + print(f"āœ… Created {job_count} v9 refinement jobs") + + def run_forever(self): + """Main control loop - runs 24/7""" + print("šŸš€ Cluster Master starting...") + print(f"šŸ“Š Workers: {len(WORKERS)}") + print(f"šŸ’¾ Database: {DB_PATH}") + print(f"šŸ“ Queue: {QUEUE_DIR}") + + # Generate initial jobs if queue empty + if not list(QUEUE_DIR.glob("*.json")): + self.generate_v9_jobs() + + iteration = 0 + while True: + iteration += 1 + print(f"\n{'='*60}") + print(f"šŸ”„ Iteration {iteration} - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + # Check for completed results + self.collect_results() + + # Assign jobs to idle workers + for worker_id in WORKERS: + if self.workers.check_worker_status(worker_id): + job = self.queue.get_next_job() + if job: + self.queue.mark_running(job, worker_id) + self.workers.assign_job(worker_id, job) + + # Show status + self.show_status() + + # Sleep before next iteration + time.sleep(60) # Check every minute + + def collect_results(self): + """Collect completed results from workers""" + for result_file in self.results_dir.glob("*.json"): + try: + with open(result_file, 'r') as f: + result = json.load(f) + + # Store in database + self.store_result(result) + + # Archive result + archive_dir = self.results_dir / 'archive' + archive_dir.mkdir(exist_ok=True) + result_file.rename(archive_dir / result_file.name) + + print(f"āœ… Processed result: {result['job_id']}") + except Exception as e: + print(f"āŒ Error processing {result_file}: {e}") + + def store_result(self, result): + """Store backtest result in database""" + # Implementation here + pass + + def show_status(self): + """Show current cluster status""" + queued = len(list(QUEUE_DIR.glob("*.json"))) + + conn = sqlite3.connect(self.db.db_path) + c = conn.cursor() + + c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'running'") + running = c.fetchone()[0] + + c.execute("SELECT COUNT(*) FROM jobs WHERE status = 'completed'") + completed = c.fetchone()[0] + + conn.close() + + print(f"šŸ“Š Status: {queued} queued | {running} running | {completed} completed") + + # Show top strategies + top = self.db.get_top_strategies(3) + if top: + print("\nšŸ† Top 3 Strategies:") + for i, strat in enumerate(top, 1): + print(f" {i}. {strat[0]}: ${strat[2]:.2f}/1k ({strat[3]} trades, {strat[4]:.1f}% WR)") + +if __name__ == '__main__': + master = ClusterMaster() + master.run_forever() diff --git a/cluster/setup_cluster.sh b/cluster/setup_cluster.sh new file mode 100755 index 0000000..7788702 --- /dev/null +++ b/cluster/setup_cluster.sh @@ -0,0 +1,99 @@ +#!/bin/bash +# Setup optimization cluster on both EPYC servers + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +echo "šŸš€ Setting up optimization cluster..." + +# Configuration +WORKER1_HOST="root@10.10.254.106" +WORKER2_HOP="$WORKER1_HOST" +WORKER2_HOST="root@10.20.254.100" +WORKSPACE="/root/optimization-cluster" + +# Colors +GREEN='\033[0;32m' +BLUE='\033[0;34m' +NC='\033[0m' + +setup_worker() { + local HOST=$1 + local NAME=$2 + local VIA_HOP=${3:-} + + echo -e "\n${BLUE}Setting up $NAME ($HOST)...${NC}" + + # Build SSH command + SSH_CMD="ssh $HOST" + if [ -n "$VIA_HOP" ]; then + SSH_CMD="ssh $VIA_HOP ssh $HOST" + fi + + # Create workspace + $SSH_CMD "mkdir -p $WORKSPACE/{jobs,results,data,backtester,logs}" + + # Install Python dependencies + echo " šŸ“¦ Installing Python packages..." + $SSH_CMD "cd $WORKSPACE && python3 -m venv .venv" + $SSH_CMD "cd $WORKSPACE && .venv/bin/pip install pandas numpy" + + # Copy backtester code + echo " šŸ“ Copying backtester modules..." + if [ -n "$VIA_HOP" ]; then + # Two-hop transfer: local -> worker1 -> worker2 + scp -r "$SCRIPT_DIR/../backtester/"* "$VIA_HOP:$WORKSPACE/backtester/" > /dev/null 2>&1 + $SSH_CMD "rsync -a $WORKSPACE/backtester/ $HOST:$WORKSPACE/backtester/" + else + scp -r "$SCRIPT_DIR/../backtester/"* "$HOST:$WORKSPACE/backtester/" > /dev/null 2>&1 + fi + + # Copy worker script + echo " šŸ“„ Installing worker script..." + if [ -n "$VIA_HOP" ]; then + scp "$SCRIPT_DIR/worker.py" "$VIA_HOP:$WORKSPACE/" > /dev/null 2>&1 + $SSH_CMD "scp $WORKSPACE/worker.py $HOST:$WORKSPACE/" + else + scp "$SCRIPT_DIR/worker.py" "$HOST:$WORKSPACE/" > /dev/null 2>&1 + fi + + $SSH_CMD "chmod +x $WORKSPACE/worker.py" + + # Copy data file + echo " šŸ“Š Copying OHLCV data..." + if [ -f "$SCRIPT_DIR/../backtester/data/solusdt_5m.csv" ]; then + if [ -n "$VIA_HOP" ]; then + scp "$SCRIPT_DIR/../backtester/data/solusdt_5m.csv" "$VIA_HOP:$WORKSPACE/data/" > /dev/null 2>&1 + $SSH_CMD "scp $WORKSPACE/data/solusdt_5m.csv $HOST:$WORKSPACE/data/" + else + scp "$SCRIPT_DIR/../backtester/data/solusdt_5m.csv" "$HOST:$WORKSPACE/data/" > /dev/null 2>&1 + fi + else + echo " āš ļø Warning: solusdt_5m.csv not found, download manually" + fi + + # Verify setup + echo " āœ… Verifying installation..." + $SSH_CMD "cd $WORKSPACE && ls -lah" + + echo -e "${GREEN}āœ… $NAME setup complete${NC}" +} + +# Setup Worker 1 (direct connection) +setup_worker "$WORKER1_HOST" "Worker 1 (pve-nu-monitor01)" + +# Setup Worker 2 (via Worker 1 hop) +setup_worker "$WORKER2_HOST" "Worker 2 (srv-bd-host01)" "$WORKER1_HOST" + +echo -e "\n${GREEN}šŸŽ‰ Cluster setup complete!${NC}" +echo "" +echo "Next steps:" +echo " 1. Start master controller:" +echo " cd $SCRIPT_DIR && python3 master.py" +echo "" +echo " 2. Monitor cluster status:" +echo " watch -n 5 'ls -1 cluster/queue/*.json 2>/dev/null | wc -l'" +echo "" +echo " 3. View results:" +echo " sqlite3 cluster/strategies.db 'SELECT * FROM strategies ORDER BY pnl_per_1k DESC LIMIT 10'" diff --git a/cluster/status.py b/cluster/status.py new file mode 100644 index 0000000..ca6b0fa --- /dev/null +++ b/cluster/status.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +""" +Quick cluster status dashboard +""" + +import sqlite3 +from pathlib import Path +from datetime import datetime + +CLUSTER_DIR = Path(__file__).parent +DB_PATH = CLUSTER_DIR / 'strategies.db' +QUEUE_DIR = CLUSTER_DIR / 'queue' + +def show_status(): + """Display cluster status""" + + print("="*70) + print("šŸŽÆ OPTIMIZATION CLUSTER STATUS") + print("="*70) + print(f"šŸ“… {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + + # Job queue status + queued_jobs = list(QUEUE_DIR.glob("*.json")) if QUEUE_DIR.exists() else [] + print(f"šŸ“‹ Queue: {len(queued_jobs)} jobs waiting") + + # Database status (if exists) + if DB_PATH.exists(): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + + # Job stats + c.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status") + job_stats = dict(c.fetchall()) + + print(f" Running: {job_stats.get('running', 0)}") + print(f" Completed: {job_stats.get('completed', 0)}") + + # Top strategies + c.execute(""" + SELECT name, pnl_per_1k, trade_count, win_rate, profit_factor + FROM strategies + WHERE status = 'completed' AND pnl_per_1k IS NOT NULL + ORDER BY pnl_per_1k DESC + LIMIT 5 + """) + + top_strats = c.fetchall() + + if top_strats: + print("\nšŸ† TOP 5 STRATEGIES:") + print("-" * 70) + print(f"{'Rank':<6} {'Strategy':<30} {'P&L/1k':<12} {'Trades':<8} {'WR%':<8} {'PF':<6}") + print("-" * 70) + + for i, (name, pnl, trades, wr, pf) in enumerate(top_strats, 1): + print(f"{i:<6} {name[:30]:<30} ${pnl:<11.2f} {trades:<8} {wr:<7.1f}% {pf:<6.2f}") + else: + print("\nā³ No completed strategies yet...") + + # Current baseline + print("\nšŸ“Š BASELINE COMPARISON:") + print(f" v9 baseline: $192.00/1k (current production)") + + if top_strats: + best_pnl = top_strats[0][1] + improvement = ((best_pnl - 192) / 192) * 100 + + if improvement > 0: + print(f" Best found: ${best_pnl:.2f}/1k ({improvement:+.1f}% improvement) āœ…") + else: + print(f" Best found: ${best_pnl:.2f}/1k ({improvement:+.1f}%)") + + conn.close() + else: + print("\nāš ļø Database not initialized. Run setup_cluster.sh first.") + + print("\n" + "="*70) + +if __name__ == '__main__': + try: + show_status() + except KeyboardInterrupt: + print("\n") diff --git a/cluster/worker.py b/cluster/worker.py new file mode 100644 index 0000000..2d77a09 --- /dev/null +++ b/cluster/worker.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +""" +Continuous Optimization Cluster - WORKER +Executes backtesting jobs assigned by master +""" + +import json +import sys +import time +import subprocess +from datetime import datetime +from pathlib import Path +import os + +# Configuration +WORKSPACE = Path('/root/optimization-cluster') +JOBS_DIR = WORKSPACE / 'jobs' +RESULTS_DIR = WORKSPACE / 'results' +DATA_DIR = WORKSPACE / 'data' +BACKTESTER_DIR = WORKSPACE / 'backtester' + +# Worker settings +MAX_PARALLEL = 22 # 70% of 32 cores +MEMORY_PER_JOB = '4G' # 4GB per worker + +class BacktestWorker: + """Executes individual backtest jobs""" + + def __init__(self, job_file): + self.job_file = Path(job_file) + self.job = self.load_job() + self.result = None + + def load_job(self): + """Load job specification""" + with open(JOBS_DIR / self.job_file, 'r') as f: + return json.load(f) + + def execute(self): + """Run the backtest""" + print(f"šŸ”¬ Starting backtest: {self.job['id']}") + print(f" Indicator: {self.job['indicator']}") + print(f" Params: {json.dumps(self.job['params'], indent=2)}") + + start_time = time.time() + + try: + # Determine which backtester to use + if self.job['indicator'] == 'v9_moneyline': + result = self.run_v9_backtest() + else: + raise ValueError(f"Unknown indicator: {self.job['indicator']}") + + duration = time.time() - start_time + + # Prepare result + self.result = { + 'job_id': self.job['id'], + 'indicator': self.job['indicator'], + 'params': self.job['params'], + 'pnl': result['pnl'], + 'trades': result['trades'], + 'win_rate': result['win_rate'], + 'profit_factor': result['profit_factor'], + 'max_drawdown': result['max_drawdown'], + 'sharpe_ratio': result.get('sharpe_ratio', 0), + 'duration_seconds': duration, + 'completed_at': datetime.now().isoformat() + } + + # Save result + self.save_result() + + print(f"āœ… Backtest complete: ${result['pnl']:.2f} PnL, {result['trades']} trades") + print(f" Duration: {duration:.1f}s") + + return True + + except Exception as e: + print(f"āŒ Backtest failed: {e}") + + # Save error result + self.result = { + 'job_id': self.job['id'], + 'error': str(e), + 'completed_at': datetime.now().isoformat() + } + self.save_result() + + return False + + def run_v9_backtest(self): + """Execute v9 moneyline backtest""" + params = self.job['params'] + + # Build command + cmd = [ + 'python3', + str(BACKTESTER_DIR / 'backtester_core.py'), + '--data', str(DATA_DIR / 'solusdt_5m.csv'), + '--indicator', 'v9', + '--flip-threshold', str(params['flip_threshold']), + '--ma-gap', str(params['ma_gap']), + '--momentum-adx', str(params['momentum_adx']), + '--long-pos', str(params.get('long_pos', 70)), + '--short-pos', str(params.get('short_pos', 25)), + '--cooldown', str(params.get('cooldown_bars', 2)), + '--output', 'json' + ] + + # Execute + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + + if result.returncode != 0: + raise RuntimeError(f"Backtest failed: {result.stderr}") + + # Parse result + return json.loads(result.stdout) + + def save_result(self): + """Save result for master to collect""" + result_file = RESULTS_DIR / f"{self.job['id']}_result.json" + + with open(result_file, 'w') as f: + json.dump(self.result, f, indent=2) + + # Copy to master (via rsync) + master_host = os.environ.get('MASTER_HOST', 'icke@95.216.52.28') + master_dir = f"{master_host}:/home/icke/traderv4/cluster/results/" + + subprocess.run(['rsync', '-a', str(result_file), master_dir]) + + # Clean up job file + (JOBS_DIR / self.job_file).unlink() + +def setup_workspace(): + """Initialize worker workspace""" + WORKSPACE.mkdir(exist_ok=True) + JOBS_DIR.mkdir(exist_ok=True) + RESULTS_DIR.mkdir(exist_ok=True) + DATA_DIR.mkdir(exist_ok=True) + (WORKSPACE / 'logs').mkdir(exist_ok=True) + + print(f"āœ… Workspace ready: {WORKSPACE}") + +def main(): + """Main worker entry point""" + if len(sys.argv) < 2: + print("Usage: worker.py ") + sys.exit(1) + + job_file = sys.argv[1] + + # Setup + setup_workspace() + + # Execute job + worker = BacktestWorker(job_file) + success = worker.execute() + + sys.exit(0 if success else 1) + +if __name__ == '__main__': + main()