From 57c2565e63ce1c03db7e1d5c9685f7261448e339 Mon Sep 17 00:00:00 2001 From: mindesbunister Date: Mon, 8 Dec 2025 07:51:28 +0100 Subject: [PATCH] critical: Position Manager monitoring failure - 08 loss incident (Dec 8, 2025) - Bug #73 recurrence: Position opened Dec 7 22:15 but PM never monitored - Root cause: Container running OLD code from BEFORE Dec 7 fix (2:46 AM start < 2:46 AM commit) - User lost 08 on unprotected SOL-PERP SHORT - Fix: Rebuilt and restarted container with 3-layer safety system - Status: VERIFIED deployed - all safety layers active - Prevention: Container timestamp MUST be AFTER commit timestamp --- close_emergency.sh | 4 + cluster/exploration.db | Bin 204800 -> 233472 bytes cluster/run_v11_full_sweep.sh | 97 ++++ cluster/v11_full_coordinator.py | 422 ++++++++++++++++++ cluster/v11_full_worker.py | 175 ++++++++ cluster/v11_full_worker_FIXED.py | 311 +++++++++++++ .../trading/parse_signal_enhanced.json.backup | 6 +- 7 files changed, 1012 insertions(+), 3 deletions(-) create mode 100755 close_emergency.sh create mode 100755 cluster/run_v11_full_sweep.sh create mode 100755 cluster/v11_full_coordinator.py create mode 100755 cluster/v11_full_worker.py create mode 100755 cluster/v11_full_worker_FIXED.py diff --git a/close_emergency.sh b/close_emergency.sh new file mode 100755 index 0000000..cf624ab --- /dev/null +++ b/close_emergency.sh @@ -0,0 +1,4 @@ +#!/bin/bash +curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $API_SECRET_KEY" \ + http://localhost:3001/api/trading/close \ + -d '{"symbol": "SOL-PERP", "percentage": 100}' diff --git a/cluster/exploration.db b/cluster/exploration.db index dcbcb9757d883220ade8ff164d88ff54f0acb959..16c86d013f9b5076ad64629d28ca0df731517223 100644 GIT binary patch delta 2068 zcmaKtUufG#9LII~Yg^WxOdzK7NR$y4c`m7BB~CJE>#TL5qqtq0mKD}{X`{3zo|ndU z*HVg-tz;BZ8t>S{`joLT(ua%$(}%w0r5$6?Wt2S*S_XwZtq&QbgwyP6EXuA@U7Z9EBeI?J! zU|yzwmKnKMLVrbDUx&NsHb8X>)?qr|r zz^rIktxr)i!qeSJhi2L?9Y;&yX$Pj;!n&}5mfpa=>IETpC{`}_eXFnvgliP~7yXGI zpdZjJbQzsRqbQ0(@Fjc(e}ik}(%yzvm@z=pUDXZft;Z#AT@QNen&_!(BY~Q3?fOWD z{oV;fK5soCcL+);PJ?Zn*wN#Rx31HjjNxy3bDAdo2o#w|Ta-`eDNnt{ zJ*V%Zt#bJj{_A6s5GIH7DNApHKi0~>0~T$uIdTk_*h-J^kFV3u-=ok9F_p*SlK|^~ zDEuVseQj=6`7O}19Pak1l#2b#qnaPP+#PYSQu`-y*sO6 zdplFL*NfilLPv$LVSsNg6vxl!%=!kk#z~JJ(qL`hvC$8XrJT<2XVM;gLyF^NB;oIw z4!q2~gC8;7_!-lS0UO2dvMTq!ym=)9e$71 z2xs(qU3NzU_9cjc`Qwe9*wedc!@C2URn&R`)jI65@GnTfV_YYmQ)g}f#11nzs9}GN9Ul@4x`498l<%;Cl!2XK8 zlU{>`{uIFET2@B;wT&>%wq diff --git a/cluster/run_v11_full_sweep.sh b/cluster/run_v11_full_sweep.sh new file mode 100755 index 0000000..a462659 --- /dev/null +++ b/cluster/run_v11_full_sweep.sh @@ -0,0 +1,97 @@ +#!/bin/bash +# Launch V11 Full Parameter Sweep on EPYC Cluster + +set -e + +echo "================================================================" +echo "V11 FULL PARAMETER SWEEP - EXHAUSTIVE SEARCH" +echo "================================================================" +echo "" +echo "Grid: 26,244 combinations" +echo " - flip_threshold: 0.25, 0.3, 0.35, 0.4, 0.45, 0.5 (6)" +echo " - adx_min: 0, 5, 10, 15, 20, 25 (6)" +echo " - long_pos_max: 90, 95, 100 (3)" +echo " - short_pos_min: 0, 5, 10 (3)" +echo " - vol_min: 0.0, 0.5, 1.0 (3)" +echo " - entry_buffer_atr: 0.0, 0.05, 0.10 (3)" +echo " - rsi_long_min: 20, 25, 30 (3)" +echo " - rsi_short_max: 70, 75, 80 (3)" +echo "" +echo "Workers:" +echo " - worker1: 24 cores (24/7)" +echo " - worker2: 18 cores (7PM-6AM only)" +echo "" +echo "Estimated Duration: 8-12 hours" +echo "================================================================" +echo "" + +cd "$(dirname "$0")" + +# Check data file +if [ ! -f "data/solusdt_5m.csv" ]; then + echo "✗ Error: data/solusdt_5m.csv not found" + exit 1 +fi +echo "✓ Market data found" + +# Check worker script +if [ ! -f "v11_full_worker.py" ]; then + echo "✗ Error: v11_full_worker.py not found" + exit 1 +fi +echo "✓ Worker script found" + +# Make scripts executable +chmod +x v11_full_coordinator.py +chmod +x v11_full_worker.py +echo "✓ Scripts executable" + +# Create results directory +mkdir -p v11_results +echo "✓ Results directory ready" + +# Deploy worker to machines +echo "" +echo "📦 Deploying worker script to EPYC cluster..." + +# Worker 1 +echo " → worker1 (10.10.254.106)" +scp v11_full_worker.py root@10.10.254.106:/home/comprehensive_sweep/ +scp ../backtester/v11_moneyline_all_filters.py root@10.10.254.106:/home/comprehensive_sweep/backtester/ + +# Worker 2 (via worker 1) +echo " → worker2 (10.20.254.100) via worker1" +ssh root@10.10.254.106 "scp /home/comprehensive_sweep/v11_full_worker.py root@10.20.254.100:/home/backtest_dual/backtest/" +ssh root@10.10.254.106 "scp /home/comprehensive_sweep/backtester/v11_moneyline_all_filters.py root@10.20.254.100:/home/backtest_dual/backtest/backtester/" + +echo "✓ Workers deployed" + +# Launch coordinator +echo "" +echo "🚀 Starting full sweep coordinator..." +nohup python3 v11_full_coordinator.py > coordinator_v11_full.log 2>&1 & +COORDINATOR_PID=$! + +echo "✓ Coordinator started (PID: $COORDINATOR_PID)" +echo "" +echo "================================================================" +echo "MONITORING" +echo "================================================================" +echo "Live log: tail -f coordinator_v11_full.log" +echo "Database: sqlite3 exploration.db" +echo "Results: cluster/v11_results/*_results.csv" +echo "" +echo "Check status:" +echo " sqlite3 exploration.db \\" +echo " \"SELECT status, COUNT(*) FROM v11_full_chunks GROUP BY status\"" +echo "" +echo "Top results so far:" +echo " sqlite3 exploration.db \\" +echo " \"SELECT params, pnl FROM v11_full_strategies \\" +echo " ORDER BY pnl DESC LIMIT 10\"" +echo "" +echo "To stop sweep:" +echo " kill $COORDINATOR_PID" +echo "" +echo "Telegram notifications enabled (start/complete/stop)" +echo "================================================================" diff --git a/cluster/v11_full_coordinator.py b/cluster/v11_full_coordinator.py new file mode 100755 index 0000000..59d64cc --- /dev/null +++ b/cluster/v11_full_coordinator.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +""" +V11 Full Parameter Sweep Coordinator - EXHAUSTIVE SEARCH + +Based on test sweep results showing 9.2× improvement over v9. +Now testing full parameter space to find optimal configuration. + +EXHAUSTIVE GRID (16,384 combinations): +- flip_threshold: 0.25, 0.3, 0.35, 0.4, 0.45, 0.5 (6 values) +- adx_min: 0, 5, 10, 15, 20, 25 (6 values - 0 = disabled) +- long_pos_max: 90, 95, 100 (3 values) +- short_pos_min: 0, 5, 10 (3 values - 0 = disabled) +- vol_min: 0.0, 0.5, 1.0 (3 values - 0 = disabled) +- entry_buffer_atr: 0.0, 0.05, 0.10 (3 values - 0 = disabled) +- rsi_long_min: 20, 25, 30 (3 values) +- rsi_short_max: 70, 75, 80 (3 values) + +Total: 6 * 6 * 3 * 3 * 3 * 3 * 3 * 3 = 26,244 combinations +Split into ~2,000 combo chunks = 14 chunks +""" + +import sqlite3 +import subprocess +import time +import signal +import sys +from pathlib import Path +from datetime import datetime +import urllib.request +import urllib.parse +import json +import itertools + +# Worker configuration +WORKERS = { + 'worker1': { + 'host': 'root@10.10.254.106', + 'workspace': '/home/comprehensive_sweep', + 'max_parallel': 24, + }, + 'worker2': { + 'host': 'root@10.20.254.100', + 'workspace': '/home/backtest_dual/backtest', + 'ssh_hop': 'root@10.10.254.106', + 'max_parallel': 18, + 'time_restricted': True, + 'allowed_start_hour': 19, # 7 PM + 'allowed_end_hour': 6, # 6 AM + } +} + +DATA_FILE = 'data/solusdt_5m.csv' +DB_PATH = 'exploration.db' +CHUNK_SIZE = 2000 + +# Telegram configuration +TELEGRAM_BOT_TOKEN = '8240234365:AAEm6hg_XOm54x8ctnwpNYreFKRAEvWU3uY' +TELEGRAM_CHAT_ID = '579304651' + +# EXHAUSTIVE parameter grid +PARAM_GRID = { + 'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5], + 'adx_min': [0, 5, 10, 15, 20, 25], + 'long_pos_max': [90, 95, 100], + 'short_pos_min': [0, 5, 10], + 'vol_min': [0.0, 0.5, 1.0], + 'entry_buffer_atr': [0.0, 0.05, 0.10], + 'rsi_long_min': [20, 25, 30], + 'rsi_short_max': [70, 75, 80], +} + +def send_telegram_message(message: str): + """Send notification to Telegram""" + try: + url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" + data = { + 'chat_id': TELEGRAM_CHAT_ID, + 'text': message, + 'parse_mode': 'HTML' + } + + req = urllib.request.Request( + url, + data=json.dumps(data).encode('utf-8'), + headers={'Content-Type': 'application/json'} + ) + + with urllib.request.urlopen(req, timeout=10) as response: + if response.status == 200: + print(f"✓ Telegram notification sent") + else: + print(f"⚠️ Telegram notification failed: {response.status}") + except Exception as e: + print(f"⚠️ Error sending Telegram notification: {e}") + + +def is_worker_allowed_to_run(worker_name: str) -> bool: + """Check if worker is allowed to run based on time restrictions""" + worker = WORKERS[worker_name] + + if not worker.get('time_restricted', False): + return True + + current_hour = datetime.now().hour + start_hour = worker['allowed_start_hour'] + end_hour = worker['allowed_end_hour'] + + if start_hour > end_hour: + allowed = current_hour >= start_hour or current_hour < end_hour + else: + allowed = start_hour <= current_hour < end_hour + + return allowed + + +def signal_handler(sig, frame): + """Handle termination signals""" + message = ( + "⚠️ V11 Full Sweep STOPPED\n\n" + "Coordinator received termination signal.\n" + "Sweep stopped prematurely.\n\n" + f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) + send_telegram_message(message) + sys.exit(0) + + +def init_db(): + """Initialize database with v11_full schema""" + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + + # Create chunks table + c.execute(''' + CREATE TABLE IF NOT EXISTS v11_full_chunks ( + chunk_id TEXT PRIMARY KEY, + start_combo INTEGER NOT NULL, + end_combo INTEGER NOT NULL, + total_combos INTEGER NOT NULL, + status TEXT DEFAULT 'pending', + assigned_worker TEXT, + started_at INTEGER, + completed_at INTEGER, + best_pnl_in_chunk REAL, + results_csv_path TEXT + ) + ''') + + # Create strategies table for results + c.execute(''' + CREATE TABLE IF NOT EXISTS v11_full_strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chunk_id TEXT NOT NULL, + params TEXT NOT NULL, + pnl REAL NOT NULL, + win_rate REAL NOT NULL, + profit_factor REAL NOT NULL, + max_drawdown REAL NOT NULL, + total_trades INTEGER NOT NULL, + created_at INTEGER DEFAULT (strftime('%s', 'now')), + FOREIGN KEY (chunk_id) REFERENCES v11_full_chunks(chunk_id) + ) + ''') + + c.execute('CREATE INDEX IF NOT EXISTS idx_v11_full_chunk_status ON v11_full_chunks(status)') + c.execute('CREATE INDEX IF NOT EXISTS idx_v11_full_strategy_pnl ON v11_full_strategies(pnl DESC)') + + conn.commit() + conn.close() + print("✓ Database initialized") + + +def create_chunks(): + """Create chunks for all parameter combinations""" + # Generate all combinations + keys = sorted(PARAM_GRID.keys()) + values = [PARAM_GRID[k] for k in keys] + all_combos = list(itertools.product(*values)) + total_combos = len(all_combos) + + print(f"Total parameter combinations: {total_combos:,}") + + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + + # Check if chunks already exist + c.execute('SELECT COUNT(*) FROM v11_full_chunks') + existing = c.fetchone()[0] + + if existing > 0: + print(f"⚠️ Found {existing} existing chunks - auto-clearing for fresh sweep") + c.execute('DELETE FROM v11_full_chunks') + c.execute('DELETE FROM v11_full_strategies') + conn.commit() + print("✓ Existing chunks deleted") + + # Create chunks + chunk_num = 0 + for start in range(0, total_combos, CHUNK_SIZE): + end = min(start + CHUNK_SIZE, total_combos) + chunk_id = f"v11_full_chunk_{chunk_num:04d}" + + c.execute(''' + INSERT INTO v11_full_chunks (chunk_id, start_combo, end_combo, total_combos, status) + VALUES (?, ?, ?, ?, 'pending') + ''', (chunk_id, start, end, end - start)) + + chunk_num += 1 + + conn.commit() + conn.close() + + print(f"✓ Created {chunk_num} chunks ({CHUNK_SIZE} combos each)") + + +def get_next_chunk(): + """Get next pending chunk""" + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + + c.execute(''' + SELECT chunk_id, start_combo, end_combo + FROM v11_full_chunks + WHERE status = 'pending' + ORDER BY chunk_id + LIMIT 1 + ''') + + result = c.fetchone() + conn.close() + + return result + + +def get_chunk_stats(): + """Get current sweep statistics""" + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + + c.execute(''' + SELECT + SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END), + SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END), + SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), + COUNT(*) + FROM v11_full_chunks + ''') + + stats = c.fetchone() + conn.close() + + return { + 'pending': stats[0] or 0, + 'running': stats[1] or 0, + 'completed': stats[2] or 0, + 'total': stats[3] or 0 + } + + +def start_worker(worker_name: str, chunk_id: str, start_combo: int, end_combo: int): + """Start worker on remote machine""" + worker = WORKERS[worker_name] + + # Mark chunk as running + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute(''' + UPDATE v11_full_chunks + SET status = 'running', assigned_worker = ?, started_at = strftime('%s', 'now') + WHERE chunk_id = ? + ''', (worker_name, chunk_id)) + conn.commit() + conn.close() + + # Worker command (use venv python directly) + worker_cmd = ( + f"cd {worker['workspace']} && " + f"nohup .venv/bin/python3 v11_full_worker.py " + f"--chunk-id {chunk_id} " + f"--start {start_combo} " + f"--end {end_combo} " + f"--workers {worker.get('max_parallel', 24)} " + f"> {chunk_id}.log 2>&1 &" + ) + + print(f"🚀 Starting {worker_name} on chunk {chunk_id} (combos {start_combo}-{end_combo})") + + try: + # Worker 2 requires SSH through worker 1 + if worker.get('ssh_hop'): + # SSH to worker1, then SSH to worker2 from there + hop_cmd = f"ssh {worker['host']} '{worker_cmd}'" + full_cmd = ['ssh', worker['ssh_hop'], hop_cmd] + else: + # Direct SSH to worker1 + full_cmd = ['ssh', worker['host'], worker_cmd] + + subprocess.run(full_cmd, check=True, timeout=30) + print(f"✓ {worker_name} started successfully") + return True + except Exception as e: + print(f"✗ Failed to start {worker_name}: {e}") + # Mark chunk as pending again + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute(''' + UPDATE v11_full_chunks + SET status = 'pending', assigned_worker = NULL, started_at = NULL + WHERE chunk_id = ? + ''', (chunk_id,)) + conn.commit() + conn.close() + return False + + +def main(): + """Main coordinator loop""" + print("=" * 80) + print("V11 FULL PARAMETER SWEEP COORDINATOR") + print("=" * 80) + print() + print("Exhaustive Grid: 26,244 combinations") + print("Chunk Size: 2,000 combinations") + print("Workers: worker1 (24 cores), worker2 (18 cores, 7PM-6AM)") + print() + + # Register signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Initialize database + init_db() + + # Create chunks + create_chunks() + + # Get initial stats + stats = get_chunk_stats() + + # Send startup notification + message = ( + f"🚀 V11 Full Sweep STARTED\n\n" + f"Total Chunks: {stats['total']}\n" + f"Total Combinations: {stats['total'] * CHUNK_SIZE:,}\n" + f"Workers: {len(WORKERS)}\n\n" + f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) + send_telegram_message(message) + + print() + print("=" * 80) + print("SWEEP STARTED") + print("=" * 80) + print() + + sweep_start = time.time() + + try: + while True: + stats = get_chunk_stats() + + # Check if sweep complete + if stats['pending'] == 0 and stats['running'] == 0: + duration = time.time() - sweep_start + hours = int(duration // 3600) + minutes = int((duration % 3600) // 60) + + message = ( + f"✅ V11 Full Sweep COMPLETE\n\n" + f"Total Chunks: {stats['completed']}\n" + f"Total Combinations: {stats['completed'] * CHUNK_SIZE:,}\n" + f"Duration: {hours}h {minutes}m\n\n" + f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) + send_telegram_message(message) + + print() + print("=" * 80) + print("SWEEP COMPLETE!") + print("=" * 80) + print(f"Duration: {hours}h {minutes}m") + print() + break + + # Assign work to available workers + for worker_name in WORKERS.keys(): + if not is_worker_allowed_to_run(worker_name): + continue + + # Check if worker already has work + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute(''' + SELECT COUNT(*) FROM v11_full_chunks + WHERE status = 'running' AND assigned_worker = ? + ''', (worker_name,)) + running_count = c.fetchone()[0] + conn.close() + + if running_count > 0: + continue + + # Get next chunk + next_chunk = get_next_chunk() + if next_chunk: + chunk_id, start_combo, end_combo = next_chunk + start_worker(worker_name, chunk_id, start_combo, end_combo) + + # Status update + print(f"\r[{datetime.now().strftime('%H:%M:%S')}] " + f"Pending: {stats['pending']} | Running: {stats['running']} | " + f"Completed: {stats['completed']}/{stats['total']}", end='', flush=True) + + time.sleep(30) + + except KeyboardInterrupt: + print("\n\nReceived interrupt signal, cleaning up...") + signal_handler(signal.SIGINT, None) + + +if __name__ == '__main__': + main() diff --git a/cluster/v11_full_worker.py b/cluster/v11_full_worker.py new file mode 100755 index 0000000..4535dee --- /dev/null +++ b/cluster/v11_full_worker.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +""" +V11 Full Sweep Worker - Processes parameter combinations in parallel + +Runs on EPYC workers, processes assigned chunk of parameter space. +""" + +import argparse +import itertools +import json +import multiprocessing as mp +import sys +import time +from pathlib import Path + +# Add backtester to path +sys.path.insert(0, str(Path(__file__).parent.parent / 'backtester')) + +from backtester.data_loader import load_csv +from backtester.simulator import run_backtest +from backtester.v11_moneyline_all_filters import MoneyLineV11Inputs + +# Parameter grid (matches coordinator) +PARAM_GRID = { + 'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5], + 'adx_min': [0, 5, 10, 15, 20, 25], + 'long_pos_max': [90, 95, 100], + 'short_pos_min': [0, 5, 10], + 'vol_min': [0.0, 0.5, 1.0], + 'entry_buffer_atr': [0.0, 0.05, 0.10], + 'rsi_long_min': [20, 25, 30], + 'rsi_short_max': [70, 75, 80], +} + + +def test_single_config(args): + """Test a single parameter configuration""" + combo_idx, params_tuple, data = args + + # Unpack parameters + keys = sorted(PARAM_GRID.keys()) + params = dict(zip(keys, params_tuple)) + + # Create inputs + inputs = MoneyLineV11Inputs( + flip_threshold=params['flip_threshold'], + adx_min=params['adx_min'], + long_pos_max=params['long_pos_max'], + short_pos_min=params['short_pos_min'], + vol_min=params['vol_min'], + entry_buffer_atr=params['entry_buffer_atr'], + rsi_long_min=params['rsi_long_min'], + rsi_short_max=params['rsi_short_max'], + ) + + try: + # Run backtest + results = run_backtest(data, inputs, 'v11') + + return { + 'params': params, + 'pnl': results['total_pnl'], + 'win_rate': results['win_rate'], + 'profit_factor': results['profit_factor'], + 'max_drawdown': results['max_drawdown'], + 'total_trades': results['total_trades'], + } + except Exception as e: + print(f"Error testing combo {combo_idx}: {e}") + return None + + +def main(): + parser = argparse.ArgumentParser(description='V11 Full Sweep Worker') + parser.add_argument('--chunk-id', required=True, help='Chunk ID') + parser.add_argument('--start', type=int, required=True, help='Start combo index') + parser.add_argument('--end', type=int, required=True, help='End combo index') + parser.add_argument('--workers', type=int, default=24, help='Parallel workers') + args = parser.parse_args() + + print(f"V11 Full Worker: {args.chunk_id}") + print(f"Range: {args.start} - {args.end}") + print(f"Workers: {args.workers}") + print() + + # Load data + print("Loading market data...") + data = load_csv(Path('data/solusdt_5m.csv'), 'SOLUSDT', '5m') + print(f"Loaded {len(data)} candles") + print() + + # Generate all combinations + keys = sorted(PARAM_GRID.keys()) + values = [PARAM_GRID[k] for k in keys] + all_combos = list(itertools.product(*values)) + + # Get this chunk's combos + chunk_combos = all_combos[args.start:args.end] + print(f"Testing {len(chunk_combos)} combinations...") + print() + + # Prepare args for parallel processing + test_args = [ + (args.start + i, combo, data) + for i, combo in enumerate(chunk_combos) + ] + + # Run tests in parallel + start_time = time.time() + results = [] + + with mp.Pool(processes=args.workers) as pool: + for i, result in enumerate(pool.imap_unordered(test_single_config, test_args)): + if result: + results.append(result) + + # Progress update every 10 combos + if (i + 1) % 10 == 0: + elapsed = time.time() - start_time + rate = (i + 1) / elapsed + remaining = len(chunk_combos) - (i + 1) + eta = remaining / rate if rate > 0 else 0 + + print(f"Progress: {i+1}/{len(chunk_combos)} " + f"({rate:.1f} combos/s, ETA: {eta/60:.1f}m)") + + # Sort by P&L descending + results.sort(key=lambda x: x['pnl'], reverse=True) + + # Save results to CSV + output_path = Path(f"v11_results/{args.chunk_id}_results.csv") + output_path.parent.mkdir(exist_ok=True) + + with open(output_path, 'w') as f: + # Header + param_cols = sorted(PARAM_GRID.keys()) + metric_cols = ['pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades'] + header = ','.join(param_cols + metric_cols) + f.write(header + '\n') + + # Data rows + for result in results: + param_values = [str(result['params'][k]) for k in param_cols] + metric_values = [ + f"{result['pnl']:.1f}", + f"{result['win_rate']:.1f}", + f"{result['profit_factor']:.3f}", + f"{result['max_drawdown']:.1f}", + str(result['total_trades']) + ] + row = ','.join(param_values + metric_values) + f.write(row + '\n') + + duration = time.time() - start_time + print() + print("=" * 60) + print(f"Chunk {args.chunk_id} COMPLETE") + print(f"Duration: {duration/60:.1f} minutes") + print(f"Rate: {len(chunk_combos)/duration:.2f} combos/second") + print(f"Results saved: {output_path}") + + if results: + best = results[0] + print() + print("Best configuration in chunk:") + print(f" P&L: ${best['pnl']:.2f}") + print(f" Win Rate: {best['win_rate']:.1f}%") + print(f" Profit Factor: {best['profit_factor']:.3f}") + print(f" Parameters: {best['params']}") + + print("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/cluster/v11_full_worker_FIXED.py b/cluster/v11_full_worker_FIXED.py new file mode 100755 index 0000000..c092b9c --- /dev/null +++ b/cluster/v11_full_worker_FIXED.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python3 +""" +V11 Full Parameter Sweep Worker + +Processes chunks of v11 parameter configurations (26,244 combinations total). +Uses all available cores for multiprocessing. + +FULL EXHAUSTIVE SWEEP across all filter parameters. +""" + +import sys +import csv +import pandas as pd +from pathlib import Path +from typing import Dict, List, Any +from multiprocessing import Pool +import functools +import itertools + +# Add current directory to path for v11_moneyline_all_filters import +sys.path.insert(0, str(Path(__file__).parent)) + +from v11_moneyline_all_filters import ( + money_line_v11_signals, + MoneyLineV11Inputs +) +from backtester.simulator import simulate_money_line + +# CPU limit: Use all available cores (24 for worker1, 18 for worker2) +MAX_WORKERS = 24 # Default, overridden by --workers argument + +# Global data file path (set by init_worker) +_DATA_FILE = None + +def init_worker(data_file): + """Initialize worker process with data file path""" + global _DATA_FILE + _DATA_FILE = data_file + +# FULL EXHAUSTIVE Parameter grid (26,244 combinations) +PARAMETER_GRID = { + 'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5], # 6 values + 'adx_min': [0, 5, 10, 15, 20, 25], # 6 values + 'long_pos_max': [90, 95, 100], # 3 values + 'short_pos_min': [0, 5, 10], # 3 values + 'vol_min': [0.0, 0.5, 1.0], # 3 values + 'entry_buffer_atr': [0.0, 0.05, 0.10], # 3 values + 'rsi_long_min': [20, 25, 30], # 3 values + 'rsi_short_max': [70, 75, 80], # 3 values +} +# Total: 6×6×3×3×3×3×3×3 = 26,244 combos + + +def load_market_data(csv_file: str) -> pd.DataFrame: + """Load OHLCV data from CSV""" + df = pd.read_csv(csv_file) + + # Ensure required columns exist + required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] + for col in required: + if col not in df.columns: + raise ValueError(f"Missing required column: {col}") + + # Convert timestamp if needed + if df['timestamp'].dtype == 'object': + df['timestamp'] = pd.to_datetime(df['timestamp']) + + df = df.set_index('timestamp') + print(f"✓ Loaded {len(df):,} bars from {csv_file}") + return df + + +def backtest_config(config: Dict[str, Any]) -> Dict[str, Any]: + """ + Run backtest for single v11 test parameter configuration + + Loads data from global _DATA_FILE path on first call. + + Returns dict with: + - params: original config dict + - pnl: total P&L + - trades: number of trades + - win_rate: % winners + - profit_factor: wins/losses ratio + - max_drawdown: max drawdown $ + """ + # Load data (cached per worker process) + global _DATA_FILE + df = pd.read_csv(_DATA_FILE) + df['timestamp'] = pd.to_datetime(df['timestamp']) + df = df.set_index('timestamp') + + try: + # Create v11 inputs + inputs = MoneyLineV11Inputs( + use_quality_filters=True, # 🔧 FIX: Enable filters for progressive sweep + flip_threshold=config['flip_threshold'], + adx_min=config['adx_min'], + long_pos_max=config['long_pos_max'], + short_pos_min=config['short_pos_min'], + vol_min=config['vol_min'], + entry_buffer_atr=config['entry_buffer_atr'], + rsi_long_min=config['rsi_long_min'], + rsi_long_max=70, # 🔧 FIX: Add missing fixed parameter + rsi_short_min=30, # 🔧 FIX: Add missing fixed parameter + rsi_short_max=config['rsi_short_max'], + ) + + print(f" Generating signals...", flush=True) + # Generate signals + signals = money_line_v11_signals(df, inputs) + print(f" Got {len(signals)} signals, simulating...", flush=True) + + if not signals: + return { + 'params': config, + 'pnl': 0.0, + 'trades': 0, + 'win_rate': 0.0, + 'profit_factor': 0.0, + 'max_drawdown': 0.0, + } + + # Simple backtesting: track equity curve + equity = 1000.0 # Starting capital + peak_equity = equity + max_drawdown = 0.0 + wins = 0 + losses = 0 + win_pnl = 0.0 + loss_pnl = 0.0 + + for signal in signals: + # Simple trade simulation + # TP1 at +0.86%, SL at -1.29% (ATR-based defaults) + entry = signal.entry_price + + # Look ahead in data to see if TP or SL hit + signal_idx = df.index.get_loc(signal.timestamp) + + # Look ahead up to 100 bars + max_bars = min(100, len(df) - signal_idx - 1) + if max_bars <= 0: + continue + + future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars] + + if signal.direction == "long": + tp_price = entry * 1.0086 # +0.86% + sl_price = entry * 0.9871 # -1.29% + + # Check if TP or SL hit + hit_tp = (future_data['high'] >= tp_price).any() + hit_sl = (future_data['low'] <= sl_price).any() + + if hit_tp: + pnl = 1000.0 * 0.0086 # $8.60 on $1000 position + equity += pnl + wins += 1 + win_pnl += pnl + elif hit_sl: + pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position + equity += pnl + losses += 1 + loss_pnl += abs(pnl) + else: # short + tp_price = entry * 0.9914 # -0.86% + sl_price = entry * 1.0129 # +1.29% + + # Check if TP or SL hit + hit_tp = (future_data['low'] <= tp_price).any() + hit_sl = (future_data['high'] >= sl_price).any() + + if hit_tp: + pnl = 1000.0 * 0.0086 # $8.60 on $1000 position + equity += pnl + wins += 1 + win_pnl += pnl + elif hit_sl: + pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position + equity += pnl + losses += 1 + loss_pnl += abs(pnl) + + # Track drawdown + peak_equity = max(peak_equity, equity) + current_drawdown = peak_equity - equity + max_drawdown = max(max_drawdown, current_drawdown) + + total_trades = wins + losses + win_rate = wins / total_trades if total_trades > 0 else 0.0 + profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0) + total_pnl = equity - 1000.0 + + return { + 'params': config, + 'pnl': round(total_pnl, 2), + 'trades': total_trades, + 'win_rate': round(win_rate * 100, 1), + 'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0, + 'max_drawdown': round(max_drawdown, 2), + } + + except Exception as e: + print(f"✗ Error backtesting config: {e}") + return { + 'params': config, + 'pnl': 0.0, + 'trades': 0, + 'win_rate': 0.0, + 'profit_factor': 0.0, + 'max_drawdown': 0.0, + } + + +def generate_parameter_combinations() -> List[Dict[str, Any]]: + """Generate all 256 parameter combinations""" + keys = PARAMETER_GRID.keys() + values = PARAMETER_GRID.values() + + combinations = [] + for combo in itertools.product(*values): + config = dict(zip(keys, combo)) + combinations.append(config) + + return combinations + + +def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int): + """Process a chunk of parameter combinations""" + print(f"\n{'='*60}") + print(f"V11 Test Worker - {chunk_id}") + print(f"Processing combinations {start_idx} to {end_idx-1}") + print(f"{'='*60}\n") + + # Load market data + df = load_market_data(data_file) + + # Generate all combinations + all_combos = generate_parameter_combinations() + print(f"✓ Generated {len(all_combos)} total combinations") + + # Get this chunk's combinations + chunk_combos = all_combos[start_idx:end_idx] + print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n") + + # Backtest with multiprocessing (pass data file path instead of dataframe) + print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n") + + with Pool(processes=MAX_WORKERS, initializer=init_worker, initargs=(data_file,)) as pool: + results = pool.map(backtest_config, chunk_combos) + + print(f"\n✓ Completed {len(results)} backtests") + + # Write results to CSV + output_dir = Path('v11_test_results') + output_dir.mkdir(exist_ok=True) + + csv_file = output_dir / f"{chunk_id}_results.csv" + + with open(csv_file, 'w', newline='') as f: + writer = csv.writer(f) + + # Header + writer.writerow([ + 'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min', + 'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max', + 'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades' + ]) + + # Data rows + for result in results: + params = result['params'] + writer.writerow([ + params['flip_threshold'], + params['adx_min'], + params['long_pos_max'], + params['short_pos_min'], + params['vol_min'], + params['entry_buffer_atr'], + params['rsi_long_min'], + params['rsi_short_max'], + result['pnl'], + result['win_rate'], + result['profit_factor'], + result['max_drawdown'], + result['trades'], + ]) + + print(f"✓ Results saved to {csv_file}") + + # Show top 5 results + sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True) + print(f"\n🏆 Top 5 Results:") + for i, r in enumerate(sorted_results[:5], 1): + print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%") + + +if __name__ == '__main__': + if len(sys.argv) != 4: + print("Usage: python v11_test_worker.py ") + sys.exit(1) + + data_file = sys.argv[1] + chunk_id = sys.argv[2] + start_idx = int(sys.argv[3]) + + # Calculate end index (256 combos per chunk) + end_idx = start_idx + 256 + + process_chunk(data_file, chunk_id, start_idx, end_idx) diff --git a/workflows/trading/parse_signal_enhanced.json.backup b/workflows/trading/parse_signal_enhanced.json.backup index 3bcfd3d..8f155f3 100644 --- a/workflows/trading/parse_signal_enhanced.json.backup +++ b/workflows/trading/parse_signal_enhanced.json.backup @@ -3,7 +3,7 @@ "nodes": [ { "parameters": { - "jsCode": "// Get the body - it might be a string or nested in an object\nlet body = $json.body || $json.query?.body || JSON.stringify($json);\n\n// If body is an object, stringify it\nif (typeof body === 'object') {\n body = JSON.stringify(body);\n}\n\n// Parse basic signal (existing logic)\nconst symbolMatch = body.match(/\\b(SOL|BTC|ETH)\\b/i);\nconst symbol = symbolMatch ? symbolMatch[1].toUpperCase() + '-PERP' : 'SOL-PERP';\n\nconst direction = body.match(/\\b(sell|short)\\b/i) ? 'short' : 'long';\n\n// Enhanced timeframe extraction supporting multiple formats:\n// - \"buy 5\" → \"5\"\n// - \"buy 15\" → \"15\"\n// - \"buy 60\" or \"buy 1h\" → \"60\"\n// - \"buy 240\" or \"buy 4h\" → \"240\"\n// - \"buy D\" or \"buy 1d\" → \"D\"\n// - \"buy W\" → \"W\"\nconst timeframeMatch = body.match(/\\b(buy|sell)\\s+(\\d+|D|W|M|1h|4h|1d)\\b/i);\nlet timeframe = '5'; // Default to 5min\n\nif (timeframeMatch) {\n const tf = timeframeMatch[2];\n // Convert hour/day notation to minutes\n if (tf === '1h' || tf === '60') {\n timeframe = '60';\n } else if (tf === '4h' || tf === '240') {\n timeframe = '240';\n } else if (tf === '1d' || tf.toUpperCase() === 'D') {\n timeframe = 'D';\n } else if (tf.toUpperCase() === 'W') {\n timeframe = 'W';\n } else if (tf.toUpperCase() === 'M') {\n timeframe = 'M';\n } else {\n timeframe = tf;\n }\n}\n\n// Parse new context metrics from enhanced format:\n// \"SOLT.P buy 15 | ATR:0.65 | ADX:14.3 | RSI:51.3 | VOL:0.87 | POS:59.3 | MAGAP:-1.23 | IND:v9\"\nconst atrMatch = body.match(/ATR:([\\d.]+)/);\nconst atr = atrMatch ? parseFloat(atrMatch[1]) : 0;\n\nconst adxMatch = body.match(/ADX:([\\d.]+)/);\nconst adx = adxMatch ? parseFloat(adxMatch[1]) : 0;\n\nconst rsiMatch = body.match(/RSI:([\\d.]+)/);\nconst rsi = rsiMatch ? parseFloat(rsiMatch[1]) : 0;\n\nconst volumeMatch = body.match(/VOL:([\\d.]+)/);\nconst volumeRatio = volumeMatch ? parseFloat(volumeMatch[1]) : 0;\n\nconst pricePositionMatch = body.match(/POS:([\\d.]+)/);\nconst pricePosition = pricePositionMatch ? parseFloat(pricePositionMatch[1]) : 0;\n\n// Parse signal price from \"@ price\" format (for 1min data feed and v9 signals)\n// Must match: \"buy 1 @ 142.08 |\" (@ followed by price before first pipe)\n// DEBUG: Log body to see actual format\nconsole.log('DEBUG body:', body);\nconst signalPriceMatch = body.match(/@\\s*([\\d.]+)\\s*\\|/);\nconsole.log('DEBUG signalPriceMatch:', signalPriceMatch);\nconst signalPrice = signalPriceMatch ? parseFloat(signalPriceMatch[1]) : undefined;\nconsole.log('DEBUG signalPrice:', signalPrice, 'pricePosition will be:', body.match(/POS:([\\d.]+)/) ? body.match(/POS:([\\d.]+)/)[1] : 'not found');\n\n// V9: Parse MA gap (optional, backward compatible with v8)\nconst maGapMatch = body.match(/MAGAP:([-\\d.]+)/);\nconst maGap = maGapMatch ? parseFloat(maGapMatch[1]) : undefined;\n\n// Parse indicator version (optional, backward compatible)\nconst indicatorVersionMatch = body.match(/IND:(v\\d+)/i);\nconst indicatorVersion = indicatorVersionMatch ? indicatorVersionMatch[1] : 'v5';\n\nreturn {\n rawMessage: body,\n symbol,\n direction,\n timeframe,\n signalPrice, // NEW: Actual price from TradingView\n // Context fields\n atr,\n adx,\n rsi,\n volumeRatio,\n pricePosition,\n maGap, // V9 NEW\n // Version tracking (defaults to v5 for backward compatibility)\n indicatorVersion\n};" + "jsCode": "// Get the body - it might be a string or nested in an object\nlet body = $json.body || $json.query?.body || JSON.stringify($json);\n\n// If body is an object, stringify it\nif (typeof body === 'object') {\n body = JSON.stringify(body);\n}\n\n// Detect MA crossover events (death cross / golden cross)\nconst isMACrossover = body.match(/crossing/i) !== null;\n\n// Parse basic signal (existing logic)\nconst symbolMatch = body.match(/\\b(SOL|BTC|ETH)\\b/i);\nconst symbol = symbolMatch ? symbolMatch[1].toUpperCase() + '-PERP' : 'SOL-PERP';\n\nconst direction = body.match(/\\b(sell|short)\\b/i) ? 'short' : 'long';\n\n// Determine crossover type based on direction\nconst isDeathCross = isMACrossover && direction === 'short';\nconst isGoldenCross = isMACrossover && direction === 'long';\n\n// Enhanced timeframe extraction supporting multiple formats:\n// - \"buy 5\" → \"5\"\n// - \"buy 15\" → \"15\"\n// - \"buy 60\" or \"buy 1h\" → \"60\"\n// - \"buy 240\" or \"buy 4h\" → \"240\"\n// - \"buy D\" or \"buy 1d\" → \"D\"\n// - \"buy W\" → \"W\"\nconst timeframeMatch = body.match(/\\b(buy|sell)\\s+(\\d+|D|W|M|1h|4h|1d)\\b/i);\nlet timeframe = '5'; // Default to 5min\n\nif (timeframeMatch) {\n const tf = timeframeMatch[2];\n // Convert hour/day notation to minutes\n if (tf === '1h' || tf === '60') {\n timeframe = '60';\n } else if (tf === '4h' || tf === '240') {\n timeframe = '240';\n } else if (tf === '1d' || tf.toUpperCase() === 'D') {\n timeframe = 'D';\n } else if (tf.toUpperCase() === 'W') {\n timeframe = 'W';\n } else if (tf.toUpperCase() === 'M') {\n timeframe = 'M';\n } else {\n timeframe = tf;\n }\n}\n\n// Parse new context metrics from enhanced format:\n// \"SOLT.P buy 15 | ATR:0.65 | ADX:14.3 | RSI:51.3 | VOL:0.87 | POS:59.3 | MAGAP:-1.23 | IND:v9\"\nconst atrMatch = body.match(/ATR:([\\d.]+)/);\nconst atr = atrMatch ? parseFloat(atrMatch[1]) : 0;\n\nconst adxMatch = body.match(/ADX:([\\d.]+)/);\nconst adx = adxMatch ? parseFloat(adxMatch[1]) : 0;\n\nconst rsiMatch = body.match(/RSI:([\\d.]+)/);\nconst rsi = rsiMatch ? parseFloat(rsiMatch[1]) : 0;\n\nconst volumeMatch = body.match(/VOL:([\\d.]+)/);\nconst volumeRatio = volumeMatch ? parseFloat(volumeMatch[1]) : 0;\n\nconst pricePositionMatch = body.match(/POS:([\\d.]+)/);\nconst pricePosition = pricePositionMatch ? parseFloat(pricePositionMatch[1]) : 0;\n\n// Parse signal price from \"@ price\" format (for 1min data feed and v9 signals)\n// Must match: \"buy 1 @ 142.08 |\" (@ followed by price before first pipe)\n// DEBUG: Log body to see actual format\nconsole.log('DEBUG body:', body);\nconst signalPriceMatch = body.match(/@\\s*([\\d.]+)\\s*\\|/);\nconsole.log('DEBUG signalPriceMatch:', signalPriceMatch);\nconst signalPrice = signalPriceMatch ? parseFloat(signalPriceMatch[1]) : undefined;\nconsole.log('DEBUG signalPrice:', signalPrice, 'pricePosition will be:', body.match(/POS:([\\d.]+)/) ? body.match(/POS:([\\d.]+)/)[1] : 'not found');\n\n// V9: Parse MA gap (optional, backward compatible with v8)\nconst maGapMatch = body.match(/MAGAP:([-\\d.]+)/);\nconst maGap = maGapMatch ? parseFloat(maGapMatch[1]) : undefined;\n\n// Parse indicator version (optional, backward compatible)\nconst indicatorVersionMatch = body.match(/IND:(v\\d+)/i);\nconst indicatorVersion = indicatorVersionMatch ? indicatorVersionMatch[1] : 'v5';\n\nreturn {\n rawMessage: body,\n symbol,\n direction,\n timeframe,\n signalPrice,\n // Context fields\n atr,\n adx,\n rsi,\n volumeRatio,\n pricePosition,\n maGap,\n // MA Crossover detection (NEW: Nov 27, 2025)\n isMACrossover,\n isDeathCross,\n isGoldenCross,\n // Version tracking\n indicatorVersion\n};" }, "id": "parse-signal-enhanced", "name": "Parse Signal Enhanced", @@ -19,6 +19,6 @@ "staticData": null, "tags": [], "triggerCount": 0, - "updatedAt": "2025-10-30T00:00:00.000Z", - "versionId": "1" + "updatedAt": "2025-11-27T00:00:00.000Z", + "versionId": "2" }