diff --git a/close_emergency.sh b/close_emergency.sh
new file mode 100755
index 0000000..cf624ab
--- /dev/null
+++ b/close_emergency.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $API_SECRET_KEY" \
+ http://localhost:3001/api/trading/close \
+ -d '{"symbol": "SOL-PERP", "percentage": 100}'
diff --git a/cluster/exploration.db b/cluster/exploration.db
index dcbcb97..16c86d0 100644
Binary files a/cluster/exploration.db and b/cluster/exploration.db differ
diff --git a/cluster/run_v11_full_sweep.sh b/cluster/run_v11_full_sweep.sh
new file mode 100755
index 0000000..a462659
--- /dev/null
+++ b/cluster/run_v11_full_sweep.sh
@@ -0,0 +1,97 @@
+#!/bin/bash
+# Launch V11 Full Parameter Sweep on EPYC Cluster
+
+set -e
+
+echo "================================================================"
+echo "V11 FULL PARAMETER SWEEP - EXHAUSTIVE SEARCH"
+echo "================================================================"
+echo ""
+echo "Grid: 26,244 combinations"
+echo " - flip_threshold: 0.25, 0.3, 0.35, 0.4, 0.45, 0.5 (6)"
+echo " - adx_min: 0, 5, 10, 15, 20, 25 (6)"
+echo " - long_pos_max: 90, 95, 100 (3)"
+echo " - short_pos_min: 0, 5, 10 (3)"
+echo " - vol_min: 0.0, 0.5, 1.0 (3)"
+echo " - entry_buffer_atr: 0.0, 0.05, 0.10 (3)"
+echo " - rsi_long_min: 20, 25, 30 (3)"
+echo " - rsi_short_max: 70, 75, 80 (3)"
+echo ""
+echo "Workers:"
+echo " - worker1: 24 cores (24/7)"
+echo " - worker2: 18 cores (7PM-6AM only)"
+echo ""
+echo "Estimated Duration: 8-12 hours"
+echo "================================================================"
+echo ""
+
+cd "$(dirname "$0")"
+
+# Check data file
+if [ ! -f "data/solusdt_5m.csv" ]; then
+ echo "✗ Error: data/solusdt_5m.csv not found"
+ exit 1
+fi
+echo "✓ Market data found"
+
+# Check worker script
+if [ ! -f "v11_full_worker.py" ]; then
+ echo "✗ Error: v11_full_worker.py not found"
+ exit 1
+fi
+echo "✓ Worker script found"
+
+# Make scripts executable
+chmod +x v11_full_coordinator.py
+chmod +x v11_full_worker.py
+echo "✓ Scripts executable"
+
+# Create results directory
+mkdir -p v11_results
+echo "✓ Results directory ready"
+
+# Deploy worker to machines
+echo ""
+echo "📦 Deploying worker script to EPYC cluster..."
+
+# Worker 1
+echo " → worker1 (10.10.254.106)"
+scp v11_full_worker.py root@10.10.254.106:/home/comprehensive_sweep/
+scp ../backtester/v11_moneyline_all_filters.py root@10.10.254.106:/home/comprehensive_sweep/backtester/
+
+# Worker 2 (via worker 1)
+echo " → worker2 (10.20.254.100) via worker1"
+ssh root@10.10.254.106 "scp /home/comprehensive_sweep/v11_full_worker.py root@10.20.254.100:/home/backtest_dual/backtest/"
+ssh root@10.10.254.106 "scp /home/comprehensive_sweep/backtester/v11_moneyline_all_filters.py root@10.20.254.100:/home/backtest_dual/backtest/backtester/"
+
+echo "✓ Workers deployed"
+
+# Launch coordinator
+echo ""
+echo "🚀 Starting full sweep coordinator..."
+nohup python3 v11_full_coordinator.py > coordinator_v11_full.log 2>&1 &
+COORDINATOR_PID=$!
+
+echo "✓ Coordinator started (PID: $COORDINATOR_PID)"
+echo ""
+echo "================================================================"
+echo "MONITORING"
+echo "================================================================"
+echo "Live log: tail -f coordinator_v11_full.log"
+echo "Database: sqlite3 exploration.db"
+echo "Results: cluster/v11_results/*_results.csv"
+echo ""
+echo "Check status:"
+echo " sqlite3 exploration.db \\"
+echo " \"SELECT status, COUNT(*) FROM v11_full_chunks GROUP BY status\""
+echo ""
+echo "Top results so far:"
+echo " sqlite3 exploration.db \\"
+echo " \"SELECT params, pnl FROM v11_full_strategies \\"
+echo " ORDER BY pnl DESC LIMIT 10\""
+echo ""
+echo "To stop sweep:"
+echo " kill $COORDINATOR_PID"
+echo ""
+echo "Telegram notifications enabled (start/complete/stop)"
+echo "================================================================"
diff --git a/cluster/v11_full_coordinator.py b/cluster/v11_full_coordinator.py
new file mode 100755
index 0000000..59d64cc
--- /dev/null
+++ b/cluster/v11_full_coordinator.py
@@ -0,0 +1,422 @@
+#!/usr/bin/env python3
+"""
+V11 Full Parameter Sweep Coordinator - EXHAUSTIVE SEARCH
+
+Based on test sweep results showing 9.2× improvement over v9.
+Now testing full parameter space to find optimal configuration.
+
+EXHAUSTIVE GRID (16,384 combinations):
+- flip_threshold: 0.25, 0.3, 0.35, 0.4, 0.45, 0.5 (6 values)
+- adx_min: 0, 5, 10, 15, 20, 25 (6 values - 0 = disabled)
+- long_pos_max: 90, 95, 100 (3 values)
+- short_pos_min: 0, 5, 10 (3 values - 0 = disabled)
+- vol_min: 0.0, 0.5, 1.0 (3 values - 0 = disabled)
+- entry_buffer_atr: 0.0, 0.05, 0.10 (3 values - 0 = disabled)
+- rsi_long_min: 20, 25, 30 (3 values)
+- rsi_short_max: 70, 75, 80 (3 values)
+
+Total: 6 * 6 * 3 * 3 * 3 * 3 * 3 * 3 = 26,244 combinations
+Split into ~2,000 combo chunks = 14 chunks
+"""
+
+import sqlite3
+import subprocess
+import time
+import signal
+import sys
+from pathlib import Path
+from datetime import datetime
+import urllib.request
+import urllib.parse
+import json
+import itertools
+
+# Worker configuration
+WORKERS = {
+ 'worker1': {
+ 'host': 'root@10.10.254.106',
+ 'workspace': '/home/comprehensive_sweep',
+ 'max_parallel': 24,
+ },
+ 'worker2': {
+ 'host': 'root@10.20.254.100',
+ 'workspace': '/home/backtest_dual/backtest',
+ 'ssh_hop': 'root@10.10.254.106',
+ 'max_parallel': 18,
+ 'time_restricted': True,
+ 'allowed_start_hour': 19, # 7 PM
+ 'allowed_end_hour': 6, # 6 AM
+ }
+}
+
+DATA_FILE = 'data/solusdt_5m.csv'
+DB_PATH = 'exploration.db'
+CHUNK_SIZE = 2000
+
+# Telegram configuration
+TELEGRAM_BOT_TOKEN = '8240234365:AAEm6hg_XOm54x8ctnwpNYreFKRAEvWU3uY'
+TELEGRAM_CHAT_ID = '579304651'
+
+# EXHAUSTIVE parameter grid
+PARAM_GRID = {
+ 'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5],
+ 'adx_min': [0, 5, 10, 15, 20, 25],
+ 'long_pos_max': [90, 95, 100],
+ 'short_pos_min': [0, 5, 10],
+ 'vol_min': [0.0, 0.5, 1.0],
+ 'entry_buffer_atr': [0.0, 0.05, 0.10],
+ 'rsi_long_min': [20, 25, 30],
+ 'rsi_short_max': [70, 75, 80],
+}
+
+def send_telegram_message(message: str):
+ """Send notification to Telegram"""
+ try:
+ url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
+ data = {
+ 'chat_id': TELEGRAM_CHAT_ID,
+ 'text': message,
+ 'parse_mode': 'HTML'
+ }
+
+ req = urllib.request.Request(
+ url,
+ data=json.dumps(data).encode('utf-8'),
+ headers={'Content-Type': 'application/json'}
+ )
+
+ with urllib.request.urlopen(req, timeout=10) as response:
+ if response.status == 200:
+ print(f"✓ Telegram notification sent")
+ else:
+ print(f"⚠️ Telegram notification failed: {response.status}")
+ except Exception as e:
+ print(f"⚠️ Error sending Telegram notification: {e}")
+
+
+def is_worker_allowed_to_run(worker_name: str) -> bool:
+ """Check if worker is allowed to run based on time restrictions"""
+ worker = WORKERS[worker_name]
+
+ if not worker.get('time_restricted', False):
+ return True
+
+ current_hour = datetime.now().hour
+ start_hour = worker['allowed_start_hour']
+ end_hour = worker['allowed_end_hour']
+
+ if start_hour > end_hour:
+ allowed = current_hour >= start_hour or current_hour < end_hour
+ else:
+ allowed = start_hour <= current_hour < end_hour
+
+ return allowed
+
+
+def signal_handler(sig, frame):
+ """Handle termination signals"""
+ message = (
+ "⚠️ V11 Full Sweep STOPPED\n\n"
+ "Coordinator received termination signal.\n"
+ "Sweep stopped prematurely.\n\n"
+ f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+ )
+ send_telegram_message(message)
+ sys.exit(0)
+
+
+def init_db():
+ """Initialize database with v11_full schema"""
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+
+ # Create chunks table
+ c.execute('''
+ CREATE TABLE IF NOT EXISTS v11_full_chunks (
+ chunk_id TEXT PRIMARY KEY,
+ start_combo INTEGER NOT NULL,
+ end_combo INTEGER NOT NULL,
+ total_combos INTEGER NOT NULL,
+ status TEXT DEFAULT 'pending',
+ assigned_worker TEXT,
+ started_at INTEGER,
+ completed_at INTEGER,
+ best_pnl_in_chunk REAL,
+ results_csv_path TEXT
+ )
+ ''')
+
+ # Create strategies table for results
+ c.execute('''
+ CREATE TABLE IF NOT EXISTS v11_full_strategies (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ chunk_id TEXT NOT NULL,
+ params TEXT NOT NULL,
+ pnl REAL NOT NULL,
+ win_rate REAL NOT NULL,
+ profit_factor REAL NOT NULL,
+ max_drawdown REAL NOT NULL,
+ total_trades INTEGER NOT NULL,
+ created_at INTEGER DEFAULT (strftime('%s', 'now')),
+ FOREIGN KEY (chunk_id) REFERENCES v11_full_chunks(chunk_id)
+ )
+ ''')
+
+ c.execute('CREATE INDEX IF NOT EXISTS idx_v11_full_chunk_status ON v11_full_chunks(status)')
+ c.execute('CREATE INDEX IF NOT EXISTS idx_v11_full_strategy_pnl ON v11_full_strategies(pnl DESC)')
+
+ conn.commit()
+ conn.close()
+ print("✓ Database initialized")
+
+
+def create_chunks():
+ """Create chunks for all parameter combinations"""
+ # Generate all combinations
+ keys = sorted(PARAM_GRID.keys())
+ values = [PARAM_GRID[k] for k in keys]
+ all_combos = list(itertools.product(*values))
+ total_combos = len(all_combos)
+
+ print(f"Total parameter combinations: {total_combos:,}")
+
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+
+ # Check if chunks already exist
+ c.execute('SELECT COUNT(*) FROM v11_full_chunks')
+ existing = c.fetchone()[0]
+
+ if existing > 0:
+ print(f"⚠️ Found {existing} existing chunks - auto-clearing for fresh sweep")
+ c.execute('DELETE FROM v11_full_chunks')
+ c.execute('DELETE FROM v11_full_strategies')
+ conn.commit()
+ print("✓ Existing chunks deleted")
+
+ # Create chunks
+ chunk_num = 0
+ for start in range(0, total_combos, CHUNK_SIZE):
+ end = min(start + CHUNK_SIZE, total_combos)
+ chunk_id = f"v11_full_chunk_{chunk_num:04d}"
+
+ c.execute('''
+ INSERT INTO v11_full_chunks (chunk_id, start_combo, end_combo, total_combos, status)
+ VALUES (?, ?, ?, ?, 'pending')
+ ''', (chunk_id, start, end, end - start))
+
+ chunk_num += 1
+
+ conn.commit()
+ conn.close()
+
+ print(f"✓ Created {chunk_num} chunks ({CHUNK_SIZE} combos each)")
+
+
+def get_next_chunk():
+ """Get next pending chunk"""
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+
+ c.execute('''
+ SELECT chunk_id, start_combo, end_combo
+ FROM v11_full_chunks
+ WHERE status = 'pending'
+ ORDER BY chunk_id
+ LIMIT 1
+ ''')
+
+ result = c.fetchone()
+ conn.close()
+
+ return result
+
+
+def get_chunk_stats():
+ """Get current sweep statistics"""
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+
+ c.execute('''
+ SELECT
+ SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END),
+ SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END),
+ SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END),
+ COUNT(*)
+ FROM v11_full_chunks
+ ''')
+
+ stats = c.fetchone()
+ conn.close()
+
+ return {
+ 'pending': stats[0] or 0,
+ 'running': stats[1] or 0,
+ 'completed': stats[2] or 0,
+ 'total': stats[3] or 0
+ }
+
+
+def start_worker(worker_name: str, chunk_id: str, start_combo: int, end_combo: int):
+ """Start worker on remote machine"""
+ worker = WORKERS[worker_name]
+
+ # Mark chunk as running
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+ c.execute('''
+ UPDATE v11_full_chunks
+ SET status = 'running', assigned_worker = ?, started_at = strftime('%s', 'now')
+ WHERE chunk_id = ?
+ ''', (worker_name, chunk_id))
+ conn.commit()
+ conn.close()
+
+ # Worker command (use venv python directly)
+ worker_cmd = (
+ f"cd {worker['workspace']} && "
+ f"nohup .venv/bin/python3 v11_full_worker.py "
+ f"--chunk-id {chunk_id} "
+ f"--start {start_combo} "
+ f"--end {end_combo} "
+ f"--workers {worker.get('max_parallel', 24)} "
+ f"> {chunk_id}.log 2>&1 &"
+ )
+
+ print(f"🚀 Starting {worker_name} on chunk {chunk_id} (combos {start_combo}-{end_combo})")
+
+ try:
+ # Worker 2 requires SSH through worker 1
+ if worker.get('ssh_hop'):
+ # SSH to worker1, then SSH to worker2 from there
+ hop_cmd = f"ssh {worker['host']} '{worker_cmd}'"
+ full_cmd = ['ssh', worker['ssh_hop'], hop_cmd]
+ else:
+ # Direct SSH to worker1
+ full_cmd = ['ssh', worker['host'], worker_cmd]
+
+ subprocess.run(full_cmd, check=True, timeout=30)
+ print(f"✓ {worker_name} started successfully")
+ return True
+ except Exception as e:
+ print(f"✗ Failed to start {worker_name}: {e}")
+ # Mark chunk as pending again
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+ c.execute('''
+ UPDATE v11_full_chunks
+ SET status = 'pending', assigned_worker = NULL, started_at = NULL
+ WHERE chunk_id = ?
+ ''', (chunk_id,))
+ conn.commit()
+ conn.close()
+ return False
+
+
+def main():
+ """Main coordinator loop"""
+ print("=" * 80)
+ print("V11 FULL PARAMETER SWEEP COORDINATOR")
+ print("=" * 80)
+ print()
+ print("Exhaustive Grid: 26,244 combinations")
+ print("Chunk Size: 2,000 combinations")
+ print("Workers: worker1 (24 cores), worker2 (18 cores, 7PM-6AM)")
+ print()
+
+ # Register signal handlers
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ # Initialize database
+ init_db()
+
+ # Create chunks
+ create_chunks()
+
+ # Get initial stats
+ stats = get_chunk_stats()
+
+ # Send startup notification
+ message = (
+ f"🚀 V11 Full Sweep STARTED\n\n"
+ f"Total Chunks: {stats['total']}\n"
+ f"Total Combinations: {stats['total'] * CHUNK_SIZE:,}\n"
+ f"Workers: {len(WORKERS)}\n\n"
+ f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+ )
+ send_telegram_message(message)
+
+ print()
+ print("=" * 80)
+ print("SWEEP STARTED")
+ print("=" * 80)
+ print()
+
+ sweep_start = time.time()
+
+ try:
+ while True:
+ stats = get_chunk_stats()
+
+ # Check if sweep complete
+ if stats['pending'] == 0 and stats['running'] == 0:
+ duration = time.time() - sweep_start
+ hours = int(duration // 3600)
+ minutes = int((duration % 3600) // 60)
+
+ message = (
+ f"✅ V11 Full Sweep COMPLETE\n\n"
+ f"Total Chunks: {stats['completed']}\n"
+ f"Total Combinations: {stats['completed'] * CHUNK_SIZE:,}\n"
+ f"Duration: {hours}h {minutes}m\n\n"
+ f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
+ )
+ send_telegram_message(message)
+
+ print()
+ print("=" * 80)
+ print("SWEEP COMPLETE!")
+ print("=" * 80)
+ print(f"Duration: {hours}h {minutes}m")
+ print()
+ break
+
+ # Assign work to available workers
+ for worker_name in WORKERS.keys():
+ if not is_worker_allowed_to_run(worker_name):
+ continue
+
+ # Check if worker already has work
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+ c.execute('''
+ SELECT COUNT(*) FROM v11_full_chunks
+ WHERE status = 'running' AND assigned_worker = ?
+ ''', (worker_name,))
+ running_count = c.fetchone()[0]
+ conn.close()
+
+ if running_count > 0:
+ continue
+
+ # Get next chunk
+ next_chunk = get_next_chunk()
+ if next_chunk:
+ chunk_id, start_combo, end_combo = next_chunk
+ start_worker(worker_name, chunk_id, start_combo, end_combo)
+
+ # Status update
+ print(f"\r[{datetime.now().strftime('%H:%M:%S')}] "
+ f"Pending: {stats['pending']} | Running: {stats['running']} | "
+ f"Completed: {stats['completed']}/{stats['total']}", end='', flush=True)
+
+ time.sleep(30)
+
+ except KeyboardInterrupt:
+ print("\n\nReceived interrupt signal, cleaning up...")
+ signal_handler(signal.SIGINT, None)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/cluster/v11_full_worker.py b/cluster/v11_full_worker.py
new file mode 100755
index 0000000..4535dee
--- /dev/null
+++ b/cluster/v11_full_worker.py
@@ -0,0 +1,175 @@
+#!/usr/bin/env python3
+"""
+V11 Full Sweep Worker - Processes parameter combinations in parallel
+
+Runs on EPYC workers, processes assigned chunk of parameter space.
+"""
+
+import argparse
+import itertools
+import json
+import multiprocessing as mp
+import sys
+import time
+from pathlib import Path
+
+# Add backtester to path
+sys.path.insert(0, str(Path(__file__).parent.parent / 'backtester'))
+
+from backtester.data_loader import load_csv
+from backtester.simulator import run_backtest
+from backtester.v11_moneyline_all_filters import MoneyLineV11Inputs
+
+# Parameter grid (matches coordinator)
+PARAM_GRID = {
+ 'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5],
+ 'adx_min': [0, 5, 10, 15, 20, 25],
+ 'long_pos_max': [90, 95, 100],
+ 'short_pos_min': [0, 5, 10],
+ 'vol_min': [0.0, 0.5, 1.0],
+ 'entry_buffer_atr': [0.0, 0.05, 0.10],
+ 'rsi_long_min': [20, 25, 30],
+ 'rsi_short_max': [70, 75, 80],
+}
+
+
+def test_single_config(args):
+ """Test a single parameter configuration"""
+ combo_idx, params_tuple, data = args
+
+ # Unpack parameters
+ keys = sorted(PARAM_GRID.keys())
+ params = dict(zip(keys, params_tuple))
+
+ # Create inputs
+ inputs = MoneyLineV11Inputs(
+ flip_threshold=params['flip_threshold'],
+ adx_min=params['adx_min'],
+ long_pos_max=params['long_pos_max'],
+ short_pos_min=params['short_pos_min'],
+ vol_min=params['vol_min'],
+ entry_buffer_atr=params['entry_buffer_atr'],
+ rsi_long_min=params['rsi_long_min'],
+ rsi_short_max=params['rsi_short_max'],
+ )
+
+ try:
+ # Run backtest
+ results = run_backtest(data, inputs, 'v11')
+
+ return {
+ 'params': params,
+ 'pnl': results['total_pnl'],
+ 'win_rate': results['win_rate'],
+ 'profit_factor': results['profit_factor'],
+ 'max_drawdown': results['max_drawdown'],
+ 'total_trades': results['total_trades'],
+ }
+ except Exception as e:
+ print(f"Error testing combo {combo_idx}: {e}")
+ return None
+
+
+def main():
+ parser = argparse.ArgumentParser(description='V11 Full Sweep Worker')
+ parser.add_argument('--chunk-id', required=True, help='Chunk ID')
+ parser.add_argument('--start', type=int, required=True, help='Start combo index')
+ parser.add_argument('--end', type=int, required=True, help='End combo index')
+ parser.add_argument('--workers', type=int, default=24, help='Parallel workers')
+ args = parser.parse_args()
+
+ print(f"V11 Full Worker: {args.chunk_id}")
+ print(f"Range: {args.start} - {args.end}")
+ print(f"Workers: {args.workers}")
+ print()
+
+ # Load data
+ print("Loading market data...")
+ data = load_csv(Path('data/solusdt_5m.csv'), 'SOLUSDT', '5m')
+ print(f"Loaded {len(data)} candles")
+ print()
+
+ # Generate all combinations
+ keys = sorted(PARAM_GRID.keys())
+ values = [PARAM_GRID[k] for k in keys]
+ all_combos = list(itertools.product(*values))
+
+ # Get this chunk's combos
+ chunk_combos = all_combos[args.start:args.end]
+ print(f"Testing {len(chunk_combos)} combinations...")
+ print()
+
+ # Prepare args for parallel processing
+ test_args = [
+ (args.start + i, combo, data)
+ for i, combo in enumerate(chunk_combos)
+ ]
+
+ # Run tests in parallel
+ start_time = time.time()
+ results = []
+
+ with mp.Pool(processes=args.workers) as pool:
+ for i, result in enumerate(pool.imap_unordered(test_single_config, test_args)):
+ if result:
+ results.append(result)
+
+ # Progress update every 10 combos
+ if (i + 1) % 10 == 0:
+ elapsed = time.time() - start_time
+ rate = (i + 1) / elapsed
+ remaining = len(chunk_combos) - (i + 1)
+ eta = remaining / rate if rate > 0 else 0
+
+ print(f"Progress: {i+1}/{len(chunk_combos)} "
+ f"({rate:.1f} combos/s, ETA: {eta/60:.1f}m)")
+
+ # Sort by P&L descending
+ results.sort(key=lambda x: x['pnl'], reverse=True)
+
+ # Save results to CSV
+ output_path = Path(f"v11_results/{args.chunk_id}_results.csv")
+ output_path.parent.mkdir(exist_ok=True)
+
+ with open(output_path, 'w') as f:
+ # Header
+ param_cols = sorted(PARAM_GRID.keys())
+ metric_cols = ['pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades']
+ header = ','.join(param_cols + metric_cols)
+ f.write(header + '\n')
+
+ # Data rows
+ for result in results:
+ param_values = [str(result['params'][k]) for k in param_cols]
+ metric_values = [
+ f"{result['pnl']:.1f}",
+ f"{result['win_rate']:.1f}",
+ f"{result['profit_factor']:.3f}",
+ f"{result['max_drawdown']:.1f}",
+ str(result['total_trades'])
+ ]
+ row = ','.join(param_values + metric_values)
+ f.write(row + '\n')
+
+ duration = time.time() - start_time
+ print()
+ print("=" * 60)
+ print(f"Chunk {args.chunk_id} COMPLETE")
+ print(f"Duration: {duration/60:.1f} minutes")
+ print(f"Rate: {len(chunk_combos)/duration:.2f} combos/second")
+ print(f"Results saved: {output_path}")
+
+ if results:
+ best = results[0]
+ print()
+ print("Best configuration in chunk:")
+ print(f" P&L: ${best['pnl']:.2f}")
+ print(f" Win Rate: {best['win_rate']:.1f}%")
+ print(f" Profit Factor: {best['profit_factor']:.3f}")
+ print(f" Parameters: {best['params']}")
+
+ print("=" * 60)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/cluster/v11_full_worker_FIXED.py b/cluster/v11_full_worker_FIXED.py
new file mode 100755
index 0000000..c092b9c
--- /dev/null
+++ b/cluster/v11_full_worker_FIXED.py
@@ -0,0 +1,311 @@
+#!/usr/bin/env python3
+"""
+V11 Full Parameter Sweep Worker
+
+Processes chunks of v11 parameter configurations (26,244 combinations total).
+Uses all available cores for multiprocessing.
+
+FULL EXHAUSTIVE SWEEP across all filter parameters.
+"""
+
+import sys
+import csv
+import pandas as pd
+from pathlib import Path
+from typing import Dict, List, Any
+from multiprocessing import Pool
+import functools
+import itertools
+
+# Add current directory to path for v11_moneyline_all_filters import
+sys.path.insert(0, str(Path(__file__).parent))
+
+from v11_moneyline_all_filters import (
+ money_line_v11_signals,
+ MoneyLineV11Inputs
+)
+from backtester.simulator import simulate_money_line
+
+# CPU limit: Use all available cores (24 for worker1, 18 for worker2)
+MAX_WORKERS = 24 # Default, overridden by --workers argument
+
+# Global data file path (set by init_worker)
+_DATA_FILE = None
+
+def init_worker(data_file):
+ """Initialize worker process with data file path"""
+ global _DATA_FILE
+ _DATA_FILE = data_file
+
+# FULL EXHAUSTIVE Parameter grid (26,244 combinations)
+PARAMETER_GRID = {
+ 'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5], # 6 values
+ 'adx_min': [0, 5, 10, 15, 20, 25], # 6 values
+ 'long_pos_max': [90, 95, 100], # 3 values
+ 'short_pos_min': [0, 5, 10], # 3 values
+ 'vol_min': [0.0, 0.5, 1.0], # 3 values
+ 'entry_buffer_atr': [0.0, 0.05, 0.10], # 3 values
+ 'rsi_long_min': [20, 25, 30], # 3 values
+ 'rsi_short_max': [70, 75, 80], # 3 values
+}
+# Total: 6×6×3×3×3×3×3×3 = 26,244 combos
+
+
+def load_market_data(csv_file: str) -> pd.DataFrame:
+ """Load OHLCV data from CSV"""
+ df = pd.read_csv(csv_file)
+
+ # Ensure required columns exist
+ required = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
+ for col in required:
+ if col not in df.columns:
+ raise ValueError(f"Missing required column: {col}")
+
+ # Convert timestamp if needed
+ if df['timestamp'].dtype == 'object':
+ df['timestamp'] = pd.to_datetime(df['timestamp'])
+
+ df = df.set_index('timestamp')
+ print(f"✓ Loaded {len(df):,} bars from {csv_file}")
+ return df
+
+
+def backtest_config(config: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Run backtest for single v11 test parameter configuration
+
+ Loads data from global _DATA_FILE path on first call.
+
+ Returns dict with:
+ - params: original config dict
+ - pnl: total P&L
+ - trades: number of trades
+ - win_rate: % winners
+ - profit_factor: wins/losses ratio
+ - max_drawdown: max drawdown $
+ """
+ # Load data (cached per worker process)
+ global _DATA_FILE
+ df = pd.read_csv(_DATA_FILE)
+ df['timestamp'] = pd.to_datetime(df['timestamp'])
+ df = df.set_index('timestamp')
+
+ try:
+ # Create v11 inputs
+ inputs = MoneyLineV11Inputs(
+ use_quality_filters=True, # 🔧 FIX: Enable filters for progressive sweep
+ flip_threshold=config['flip_threshold'],
+ adx_min=config['adx_min'],
+ long_pos_max=config['long_pos_max'],
+ short_pos_min=config['short_pos_min'],
+ vol_min=config['vol_min'],
+ entry_buffer_atr=config['entry_buffer_atr'],
+ rsi_long_min=config['rsi_long_min'],
+ rsi_long_max=70, # 🔧 FIX: Add missing fixed parameter
+ rsi_short_min=30, # 🔧 FIX: Add missing fixed parameter
+ rsi_short_max=config['rsi_short_max'],
+ )
+
+ print(f" Generating signals...", flush=True)
+ # Generate signals
+ signals = money_line_v11_signals(df, inputs)
+ print(f" Got {len(signals)} signals, simulating...", flush=True)
+
+ if not signals:
+ return {
+ 'params': config,
+ 'pnl': 0.0,
+ 'trades': 0,
+ 'win_rate': 0.0,
+ 'profit_factor': 0.0,
+ 'max_drawdown': 0.0,
+ }
+
+ # Simple backtesting: track equity curve
+ equity = 1000.0 # Starting capital
+ peak_equity = equity
+ max_drawdown = 0.0
+ wins = 0
+ losses = 0
+ win_pnl = 0.0
+ loss_pnl = 0.0
+
+ for signal in signals:
+ # Simple trade simulation
+ # TP1 at +0.86%, SL at -1.29% (ATR-based defaults)
+ entry = signal.entry_price
+
+ # Look ahead in data to see if TP or SL hit
+ signal_idx = df.index.get_loc(signal.timestamp)
+
+ # Look ahead up to 100 bars
+ max_bars = min(100, len(df) - signal_idx - 1)
+ if max_bars <= 0:
+ continue
+
+ future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars]
+
+ if signal.direction == "long":
+ tp_price = entry * 1.0086 # +0.86%
+ sl_price = entry * 0.9871 # -1.29%
+
+ # Check if TP or SL hit
+ hit_tp = (future_data['high'] >= tp_price).any()
+ hit_sl = (future_data['low'] <= sl_price).any()
+
+ if hit_tp:
+ pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
+ equity += pnl
+ wins += 1
+ win_pnl += pnl
+ elif hit_sl:
+ pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
+ equity += pnl
+ losses += 1
+ loss_pnl += abs(pnl)
+ else: # short
+ tp_price = entry * 0.9914 # -0.86%
+ sl_price = entry * 1.0129 # +1.29%
+
+ # Check if TP or SL hit
+ hit_tp = (future_data['low'] <= tp_price).any()
+ hit_sl = (future_data['high'] >= sl_price).any()
+
+ if hit_tp:
+ pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
+ equity += pnl
+ wins += 1
+ win_pnl += pnl
+ elif hit_sl:
+ pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
+ equity += pnl
+ losses += 1
+ loss_pnl += abs(pnl)
+
+ # Track drawdown
+ peak_equity = max(peak_equity, equity)
+ current_drawdown = peak_equity - equity
+ max_drawdown = max(max_drawdown, current_drawdown)
+
+ total_trades = wins + losses
+ win_rate = wins / total_trades if total_trades > 0 else 0.0
+ profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0)
+ total_pnl = equity - 1000.0
+
+ return {
+ 'params': config,
+ 'pnl': round(total_pnl, 2),
+ 'trades': total_trades,
+ 'win_rate': round(win_rate * 100, 1),
+ 'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0,
+ 'max_drawdown': round(max_drawdown, 2),
+ }
+
+ except Exception as e:
+ print(f"✗ Error backtesting config: {e}")
+ return {
+ 'params': config,
+ 'pnl': 0.0,
+ 'trades': 0,
+ 'win_rate': 0.0,
+ 'profit_factor': 0.0,
+ 'max_drawdown': 0.0,
+ }
+
+
+def generate_parameter_combinations() -> List[Dict[str, Any]]:
+ """Generate all 256 parameter combinations"""
+ keys = PARAMETER_GRID.keys()
+ values = PARAMETER_GRID.values()
+
+ combinations = []
+ for combo in itertools.product(*values):
+ config = dict(zip(keys, combo))
+ combinations.append(config)
+
+ return combinations
+
+
+def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int):
+ """Process a chunk of parameter combinations"""
+ print(f"\n{'='*60}")
+ print(f"V11 Test Worker - {chunk_id}")
+ print(f"Processing combinations {start_idx} to {end_idx-1}")
+ print(f"{'='*60}\n")
+
+ # Load market data
+ df = load_market_data(data_file)
+
+ # Generate all combinations
+ all_combos = generate_parameter_combinations()
+ print(f"✓ Generated {len(all_combos)} total combinations")
+
+ # Get this chunk's combinations
+ chunk_combos = all_combos[start_idx:end_idx]
+ print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n")
+
+ # Backtest with multiprocessing (pass data file path instead of dataframe)
+ print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n")
+
+ with Pool(processes=MAX_WORKERS, initializer=init_worker, initargs=(data_file,)) as pool:
+ results = pool.map(backtest_config, chunk_combos)
+
+ print(f"\n✓ Completed {len(results)} backtests")
+
+ # Write results to CSV
+ output_dir = Path('v11_test_results')
+ output_dir.mkdir(exist_ok=True)
+
+ csv_file = output_dir / f"{chunk_id}_results.csv"
+
+ with open(csv_file, 'w', newline='') as f:
+ writer = csv.writer(f)
+
+ # Header
+ writer.writerow([
+ 'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min',
+ 'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max',
+ 'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades'
+ ])
+
+ # Data rows
+ for result in results:
+ params = result['params']
+ writer.writerow([
+ params['flip_threshold'],
+ params['adx_min'],
+ params['long_pos_max'],
+ params['short_pos_min'],
+ params['vol_min'],
+ params['entry_buffer_atr'],
+ params['rsi_long_min'],
+ params['rsi_short_max'],
+ result['pnl'],
+ result['win_rate'],
+ result['profit_factor'],
+ result['max_drawdown'],
+ result['trades'],
+ ])
+
+ print(f"✓ Results saved to {csv_file}")
+
+ # Show top 5 results
+ sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True)
+ print(f"\n🏆 Top 5 Results:")
+ for i, r in enumerate(sorted_results[:5], 1):
+ print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%")
+
+
+if __name__ == '__main__':
+ if len(sys.argv) != 4:
+ print("Usage: python v11_test_worker.py ")
+ sys.exit(1)
+
+ data_file = sys.argv[1]
+ chunk_id = sys.argv[2]
+ start_idx = int(sys.argv[3])
+
+ # Calculate end index (256 combos per chunk)
+ end_idx = start_idx + 256
+
+ process_chunk(data_file, chunk_id, start_idx, end_idx)
diff --git a/workflows/trading/parse_signal_enhanced.json.backup b/workflows/trading/parse_signal_enhanced.json.backup
index 3bcfd3d..8f155f3 100644
--- a/workflows/trading/parse_signal_enhanced.json.backup
+++ b/workflows/trading/parse_signal_enhanced.json.backup
@@ -3,7 +3,7 @@
"nodes": [
{
"parameters": {
- "jsCode": "// Get the body - it might be a string or nested in an object\nlet body = $json.body || $json.query?.body || JSON.stringify($json);\n\n// If body is an object, stringify it\nif (typeof body === 'object') {\n body = JSON.stringify(body);\n}\n\n// Parse basic signal (existing logic)\nconst symbolMatch = body.match(/\\b(SOL|BTC|ETH)\\b/i);\nconst symbol = symbolMatch ? symbolMatch[1].toUpperCase() + '-PERP' : 'SOL-PERP';\n\nconst direction = body.match(/\\b(sell|short)\\b/i) ? 'short' : 'long';\n\n// Enhanced timeframe extraction supporting multiple formats:\n// - \"buy 5\" → \"5\"\n// - \"buy 15\" → \"15\"\n// - \"buy 60\" or \"buy 1h\" → \"60\"\n// - \"buy 240\" or \"buy 4h\" → \"240\"\n// - \"buy D\" or \"buy 1d\" → \"D\"\n// - \"buy W\" → \"W\"\nconst timeframeMatch = body.match(/\\b(buy|sell)\\s+(\\d+|D|W|M|1h|4h|1d)\\b/i);\nlet timeframe = '5'; // Default to 5min\n\nif (timeframeMatch) {\n const tf = timeframeMatch[2];\n // Convert hour/day notation to minutes\n if (tf === '1h' || tf === '60') {\n timeframe = '60';\n } else if (tf === '4h' || tf === '240') {\n timeframe = '240';\n } else if (tf === '1d' || tf.toUpperCase() === 'D') {\n timeframe = 'D';\n } else if (tf.toUpperCase() === 'W') {\n timeframe = 'W';\n } else if (tf.toUpperCase() === 'M') {\n timeframe = 'M';\n } else {\n timeframe = tf;\n }\n}\n\n// Parse new context metrics from enhanced format:\n// \"SOLT.P buy 15 | ATR:0.65 | ADX:14.3 | RSI:51.3 | VOL:0.87 | POS:59.3 | MAGAP:-1.23 | IND:v9\"\nconst atrMatch = body.match(/ATR:([\\d.]+)/);\nconst atr = atrMatch ? parseFloat(atrMatch[1]) : 0;\n\nconst adxMatch = body.match(/ADX:([\\d.]+)/);\nconst adx = adxMatch ? parseFloat(adxMatch[1]) : 0;\n\nconst rsiMatch = body.match(/RSI:([\\d.]+)/);\nconst rsi = rsiMatch ? parseFloat(rsiMatch[1]) : 0;\n\nconst volumeMatch = body.match(/VOL:([\\d.]+)/);\nconst volumeRatio = volumeMatch ? parseFloat(volumeMatch[1]) : 0;\n\nconst pricePositionMatch = body.match(/POS:([\\d.]+)/);\nconst pricePosition = pricePositionMatch ? parseFloat(pricePositionMatch[1]) : 0;\n\n// Parse signal price from \"@ price\" format (for 1min data feed and v9 signals)\n// Must match: \"buy 1 @ 142.08 |\" (@ followed by price before first pipe)\n// DEBUG: Log body to see actual format\nconsole.log('DEBUG body:', body);\nconst signalPriceMatch = body.match(/@\\s*([\\d.]+)\\s*\\|/);\nconsole.log('DEBUG signalPriceMatch:', signalPriceMatch);\nconst signalPrice = signalPriceMatch ? parseFloat(signalPriceMatch[1]) : undefined;\nconsole.log('DEBUG signalPrice:', signalPrice, 'pricePosition will be:', body.match(/POS:([\\d.]+)/) ? body.match(/POS:([\\d.]+)/)[1] : 'not found');\n\n// V9: Parse MA gap (optional, backward compatible with v8)\nconst maGapMatch = body.match(/MAGAP:([-\\d.]+)/);\nconst maGap = maGapMatch ? parseFloat(maGapMatch[1]) : undefined;\n\n// Parse indicator version (optional, backward compatible)\nconst indicatorVersionMatch = body.match(/IND:(v\\d+)/i);\nconst indicatorVersion = indicatorVersionMatch ? indicatorVersionMatch[1] : 'v5';\n\nreturn {\n rawMessage: body,\n symbol,\n direction,\n timeframe,\n signalPrice, // NEW: Actual price from TradingView\n // Context fields\n atr,\n adx,\n rsi,\n volumeRatio,\n pricePosition,\n maGap, // V9 NEW\n // Version tracking (defaults to v5 for backward compatibility)\n indicatorVersion\n};"
+ "jsCode": "// Get the body - it might be a string or nested in an object\nlet body = $json.body || $json.query?.body || JSON.stringify($json);\n\n// If body is an object, stringify it\nif (typeof body === 'object') {\n body = JSON.stringify(body);\n}\n\n// Detect MA crossover events (death cross / golden cross)\nconst isMACrossover = body.match(/crossing/i) !== null;\n\n// Parse basic signal (existing logic)\nconst symbolMatch = body.match(/\\b(SOL|BTC|ETH)\\b/i);\nconst symbol = symbolMatch ? symbolMatch[1].toUpperCase() + '-PERP' : 'SOL-PERP';\n\nconst direction = body.match(/\\b(sell|short)\\b/i) ? 'short' : 'long';\n\n// Determine crossover type based on direction\nconst isDeathCross = isMACrossover && direction === 'short';\nconst isGoldenCross = isMACrossover && direction === 'long';\n\n// Enhanced timeframe extraction supporting multiple formats:\n// - \"buy 5\" → \"5\"\n// - \"buy 15\" → \"15\"\n// - \"buy 60\" or \"buy 1h\" → \"60\"\n// - \"buy 240\" or \"buy 4h\" → \"240\"\n// - \"buy D\" or \"buy 1d\" → \"D\"\n// - \"buy W\" → \"W\"\nconst timeframeMatch = body.match(/\\b(buy|sell)\\s+(\\d+|D|W|M|1h|4h|1d)\\b/i);\nlet timeframe = '5'; // Default to 5min\n\nif (timeframeMatch) {\n const tf = timeframeMatch[2];\n // Convert hour/day notation to minutes\n if (tf === '1h' || tf === '60') {\n timeframe = '60';\n } else if (tf === '4h' || tf === '240') {\n timeframe = '240';\n } else if (tf === '1d' || tf.toUpperCase() === 'D') {\n timeframe = 'D';\n } else if (tf.toUpperCase() === 'W') {\n timeframe = 'W';\n } else if (tf.toUpperCase() === 'M') {\n timeframe = 'M';\n } else {\n timeframe = tf;\n }\n}\n\n// Parse new context metrics from enhanced format:\n// \"SOLT.P buy 15 | ATR:0.65 | ADX:14.3 | RSI:51.3 | VOL:0.87 | POS:59.3 | MAGAP:-1.23 | IND:v9\"\nconst atrMatch = body.match(/ATR:([\\d.]+)/);\nconst atr = atrMatch ? parseFloat(atrMatch[1]) : 0;\n\nconst adxMatch = body.match(/ADX:([\\d.]+)/);\nconst adx = adxMatch ? parseFloat(adxMatch[1]) : 0;\n\nconst rsiMatch = body.match(/RSI:([\\d.]+)/);\nconst rsi = rsiMatch ? parseFloat(rsiMatch[1]) : 0;\n\nconst volumeMatch = body.match(/VOL:([\\d.]+)/);\nconst volumeRatio = volumeMatch ? parseFloat(volumeMatch[1]) : 0;\n\nconst pricePositionMatch = body.match(/POS:([\\d.]+)/);\nconst pricePosition = pricePositionMatch ? parseFloat(pricePositionMatch[1]) : 0;\n\n// Parse signal price from \"@ price\" format (for 1min data feed and v9 signals)\n// Must match: \"buy 1 @ 142.08 |\" (@ followed by price before first pipe)\n// DEBUG: Log body to see actual format\nconsole.log('DEBUG body:', body);\nconst signalPriceMatch = body.match(/@\\s*([\\d.]+)\\s*\\|/);\nconsole.log('DEBUG signalPriceMatch:', signalPriceMatch);\nconst signalPrice = signalPriceMatch ? parseFloat(signalPriceMatch[1]) : undefined;\nconsole.log('DEBUG signalPrice:', signalPrice, 'pricePosition will be:', body.match(/POS:([\\d.]+)/) ? body.match(/POS:([\\d.]+)/)[1] : 'not found');\n\n// V9: Parse MA gap (optional, backward compatible with v8)\nconst maGapMatch = body.match(/MAGAP:([-\\d.]+)/);\nconst maGap = maGapMatch ? parseFloat(maGapMatch[1]) : undefined;\n\n// Parse indicator version (optional, backward compatible)\nconst indicatorVersionMatch = body.match(/IND:(v\\d+)/i);\nconst indicatorVersion = indicatorVersionMatch ? indicatorVersionMatch[1] : 'v5';\n\nreturn {\n rawMessage: body,\n symbol,\n direction,\n timeframe,\n signalPrice,\n // Context fields\n atr,\n adx,\n rsi,\n volumeRatio,\n pricePosition,\n maGap,\n // MA Crossover detection (NEW: Nov 27, 2025)\n isMACrossover,\n isDeathCross,\n isGoldenCross,\n // Version tracking\n indicatorVersion\n};"
},
"id": "parse-signal-enhanced",
"name": "Parse Signal Enhanced",
@@ -19,6 +19,6 @@
"staticData": null,
"tags": [],
"triggerCount": 0,
- "updatedAt": "2025-10-30T00:00:00.000Z",
- "versionId": "1"
+ "updatedAt": "2025-11-27T00:00:00.000Z",
+ "versionId": "2"
}