critical: Position Manager monitoring failure - 08 loss incident (Dec 8, 2025)

- Bug #73 recurrence: Position opened Dec 7 22:15 but PM never monitored
- Root cause: Container running OLD code from BEFORE Dec 7 fix (2:46 AM start < 2:46 AM commit)
- User lost 08 on unprotected SOL-PERP SHORT
- Fix: Rebuilt and restarted container with 3-layer safety system
- Status: VERIFIED deployed - all safety layers active
- Prevention: Container timestamp MUST be AFTER commit timestamp
This commit is contained in:
mindesbunister
2025-12-08 07:51:28 +01:00
parent 66c6f6dea5
commit 57c2565e63
7 changed files with 1012 additions and 3 deletions

4
close_emergency.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $API_SECRET_KEY" \
http://localhost:3001/api/trading/close \
-d '{"symbol": "SOL-PERP", "percentage": 100}'

Binary file not shown.

97
cluster/run_v11_full_sweep.sh Executable file
View File

@@ -0,0 +1,97 @@
#!/bin/bash
# Launch V11 Full Parameter Sweep on EPYC Cluster
set -e
echo "================================================================"
echo "V11 FULL PARAMETER SWEEP - EXHAUSTIVE SEARCH"
echo "================================================================"
echo ""
echo "Grid: 26,244 combinations"
echo " - flip_threshold: 0.25, 0.3, 0.35, 0.4, 0.45, 0.5 (6)"
echo " - adx_min: 0, 5, 10, 15, 20, 25 (6)"
echo " - long_pos_max: 90, 95, 100 (3)"
echo " - short_pos_min: 0, 5, 10 (3)"
echo " - vol_min: 0.0, 0.5, 1.0 (3)"
echo " - entry_buffer_atr: 0.0, 0.05, 0.10 (3)"
echo " - rsi_long_min: 20, 25, 30 (3)"
echo " - rsi_short_max: 70, 75, 80 (3)"
echo ""
echo "Workers:"
echo " - worker1: 24 cores (24/7)"
echo " - worker2: 18 cores (7PM-6AM only)"
echo ""
echo "Estimated Duration: 8-12 hours"
echo "================================================================"
echo ""
cd "$(dirname "$0")"
# Check data file
if [ ! -f "data/solusdt_5m.csv" ]; then
echo "✗ Error: data/solusdt_5m.csv not found"
exit 1
fi
echo "✓ Market data found"
# Check worker script
if [ ! -f "v11_full_worker.py" ]; then
echo "✗ Error: v11_full_worker.py not found"
exit 1
fi
echo "✓ Worker script found"
# Make scripts executable
chmod +x v11_full_coordinator.py
chmod +x v11_full_worker.py
echo "✓ Scripts executable"
# Create results directory
mkdir -p v11_results
echo "✓ Results directory ready"
# Deploy worker to machines
echo ""
echo "📦 Deploying worker script to EPYC cluster..."
# Worker 1
echo " → worker1 (10.10.254.106)"
scp v11_full_worker.py root@10.10.254.106:/home/comprehensive_sweep/
scp ../backtester/v11_moneyline_all_filters.py root@10.10.254.106:/home/comprehensive_sweep/backtester/
# Worker 2 (via worker 1)
echo " → worker2 (10.20.254.100) via worker1"
ssh root@10.10.254.106 "scp /home/comprehensive_sweep/v11_full_worker.py root@10.20.254.100:/home/backtest_dual/backtest/"
ssh root@10.10.254.106 "scp /home/comprehensive_sweep/backtester/v11_moneyline_all_filters.py root@10.20.254.100:/home/backtest_dual/backtest/backtester/"
echo "✓ Workers deployed"
# Launch coordinator
echo ""
echo "🚀 Starting full sweep coordinator..."
nohup python3 v11_full_coordinator.py > coordinator_v11_full.log 2>&1 &
COORDINATOR_PID=$!
echo "✓ Coordinator started (PID: $COORDINATOR_PID)"
echo ""
echo "================================================================"
echo "MONITORING"
echo "================================================================"
echo "Live log: tail -f coordinator_v11_full.log"
echo "Database: sqlite3 exploration.db"
echo "Results: cluster/v11_results/*_results.csv"
echo ""
echo "Check status:"
echo " sqlite3 exploration.db \\"
echo " \"SELECT status, COUNT(*) FROM v11_full_chunks GROUP BY status\""
echo ""
echo "Top results so far:"
echo " sqlite3 exploration.db \\"
echo " \"SELECT params, pnl FROM v11_full_strategies \\"
echo " ORDER BY pnl DESC LIMIT 10\""
echo ""
echo "To stop sweep:"
echo " kill $COORDINATOR_PID"
echo ""
echo "Telegram notifications enabled (start/complete/stop)"
echo "================================================================"

422
cluster/v11_full_coordinator.py Executable file
View File

@@ -0,0 +1,422 @@
#!/usr/bin/env python3
"""
V11 Full Parameter Sweep Coordinator - EXHAUSTIVE SEARCH
Based on test sweep results showing 9.2× improvement over v9.
Now testing full parameter space to find optimal configuration.
EXHAUSTIVE GRID (16,384 combinations):
- flip_threshold: 0.25, 0.3, 0.35, 0.4, 0.45, 0.5 (6 values)
- adx_min: 0, 5, 10, 15, 20, 25 (6 values - 0 = disabled)
- long_pos_max: 90, 95, 100 (3 values)
- short_pos_min: 0, 5, 10 (3 values - 0 = disabled)
- vol_min: 0.0, 0.5, 1.0 (3 values - 0 = disabled)
- entry_buffer_atr: 0.0, 0.05, 0.10 (3 values - 0 = disabled)
- rsi_long_min: 20, 25, 30 (3 values)
- rsi_short_max: 70, 75, 80 (3 values)
Total: 6 * 6 * 3 * 3 * 3 * 3 * 3 * 3 = 26,244 combinations
Split into ~2,000 combo chunks = 14 chunks
"""
import sqlite3
import subprocess
import time
import signal
import sys
from pathlib import Path
from datetime import datetime
import urllib.request
import urllib.parse
import json
import itertools
# Worker configuration
WORKERS = {
'worker1': {
'host': 'root@10.10.254.106',
'workspace': '/home/comprehensive_sweep',
'max_parallel': 24,
},
'worker2': {
'host': 'root@10.20.254.100',
'workspace': '/home/backtest_dual/backtest',
'ssh_hop': 'root@10.10.254.106',
'max_parallel': 18,
'time_restricted': True,
'allowed_start_hour': 19, # 7 PM
'allowed_end_hour': 6, # 6 AM
}
}
DATA_FILE = 'data/solusdt_5m.csv'
DB_PATH = 'exploration.db'
CHUNK_SIZE = 2000
# Telegram configuration
TELEGRAM_BOT_TOKEN = '8240234365:AAEm6hg_XOm54x8ctnwpNYreFKRAEvWU3uY'
TELEGRAM_CHAT_ID = '579304651'
# EXHAUSTIVE parameter grid
PARAM_GRID = {
'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5],
'adx_min': [0, 5, 10, 15, 20, 25],
'long_pos_max': [90, 95, 100],
'short_pos_min': [0, 5, 10],
'vol_min': [0.0, 0.5, 1.0],
'entry_buffer_atr': [0.0, 0.05, 0.10],
'rsi_long_min': [20, 25, 30],
'rsi_short_max': [70, 75, 80],
}
def send_telegram_message(message: str):
"""Send notification to Telegram"""
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {
'chat_id': TELEGRAM_CHAT_ID,
'text': message,
'parse_mode': 'HTML'
}
req = urllib.request.Request(
url,
data=json.dumps(data).encode('utf-8'),
headers={'Content-Type': 'application/json'}
)
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
print(f"✓ Telegram notification sent")
else:
print(f"⚠️ Telegram notification failed: {response.status}")
except Exception as e:
print(f"⚠️ Error sending Telegram notification: {e}")
def is_worker_allowed_to_run(worker_name: str) -> bool:
"""Check if worker is allowed to run based on time restrictions"""
worker = WORKERS[worker_name]
if not worker.get('time_restricted', False):
return True
current_hour = datetime.now().hour
start_hour = worker['allowed_start_hour']
end_hour = worker['allowed_end_hour']
if start_hour > end_hour:
allowed = current_hour >= start_hour or current_hour < end_hour
else:
allowed = start_hour <= current_hour < end_hour
return allowed
def signal_handler(sig, frame):
"""Handle termination signals"""
message = (
"⚠️ <b>V11 Full Sweep STOPPED</b>\n\n"
"Coordinator received termination signal.\n"
"Sweep stopped prematurely.\n\n"
f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
send_telegram_message(message)
sys.exit(0)
def init_db():
"""Initialize database with v11_full schema"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Create chunks table
c.execute('''
CREATE TABLE IF NOT EXISTS v11_full_chunks (
chunk_id TEXT PRIMARY KEY,
start_combo INTEGER NOT NULL,
end_combo INTEGER NOT NULL,
total_combos INTEGER NOT NULL,
status TEXT DEFAULT 'pending',
assigned_worker TEXT,
started_at INTEGER,
completed_at INTEGER,
best_pnl_in_chunk REAL,
results_csv_path TEXT
)
''')
# Create strategies table for results
c.execute('''
CREATE TABLE IF NOT EXISTS v11_full_strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chunk_id TEXT NOT NULL,
params TEXT NOT NULL,
pnl REAL NOT NULL,
win_rate REAL NOT NULL,
profit_factor REAL NOT NULL,
max_drawdown REAL NOT NULL,
total_trades INTEGER NOT NULL,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (chunk_id) REFERENCES v11_full_chunks(chunk_id)
)
''')
c.execute('CREATE INDEX IF NOT EXISTS idx_v11_full_chunk_status ON v11_full_chunks(status)')
c.execute('CREATE INDEX IF NOT EXISTS idx_v11_full_strategy_pnl ON v11_full_strategies(pnl DESC)')
conn.commit()
conn.close()
print("✓ Database initialized")
def create_chunks():
"""Create chunks for all parameter combinations"""
# Generate all combinations
keys = sorted(PARAM_GRID.keys())
values = [PARAM_GRID[k] for k in keys]
all_combos = list(itertools.product(*values))
total_combos = len(all_combos)
print(f"Total parameter combinations: {total_combos:,}")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Check if chunks already exist
c.execute('SELECT COUNT(*) FROM v11_full_chunks')
existing = c.fetchone()[0]
if existing > 0:
print(f"⚠️ Found {existing} existing chunks - auto-clearing for fresh sweep")
c.execute('DELETE FROM v11_full_chunks')
c.execute('DELETE FROM v11_full_strategies')
conn.commit()
print("✓ Existing chunks deleted")
# Create chunks
chunk_num = 0
for start in range(0, total_combos, CHUNK_SIZE):
end = min(start + CHUNK_SIZE, total_combos)
chunk_id = f"v11_full_chunk_{chunk_num:04d}"
c.execute('''
INSERT INTO v11_full_chunks (chunk_id, start_combo, end_combo, total_combos, status)
VALUES (?, ?, ?, ?, 'pending')
''', (chunk_id, start, end, end - start))
chunk_num += 1
conn.commit()
conn.close()
print(f"✓ Created {chunk_num} chunks ({CHUNK_SIZE} combos each)")
def get_next_chunk():
"""Get next pending chunk"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
SELECT chunk_id, start_combo, end_combo
FROM v11_full_chunks
WHERE status = 'pending'
ORDER BY chunk_id
LIMIT 1
''')
result = c.fetchone()
conn.close()
return result
def get_chunk_stats():
"""Get current sweep statistics"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
SELECT
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END),
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END),
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END),
COUNT(*)
FROM v11_full_chunks
''')
stats = c.fetchone()
conn.close()
return {
'pending': stats[0] or 0,
'running': stats[1] or 0,
'completed': stats[2] or 0,
'total': stats[3] or 0
}
def start_worker(worker_name: str, chunk_id: str, start_combo: int, end_combo: int):
"""Start worker on remote machine"""
worker = WORKERS[worker_name]
# Mark chunk as running
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
UPDATE v11_full_chunks
SET status = 'running', assigned_worker = ?, started_at = strftime('%s', 'now')
WHERE chunk_id = ?
''', (worker_name, chunk_id))
conn.commit()
conn.close()
# Worker command (use venv python directly)
worker_cmd = (
f"cd {worker['workspace']} && "
f"nohup .venv/bin/python3 v11_full_worker.py "
f"--chunk-id {chunk_id} "
f"--start {start_combo} "
f"--end {end_combo} "
f"--workers {worker.get('max_parallel', 24)} "
f"> {chunk_id}.log 2>&1 &"
)
print(f"🚀 Starting {worker_name} on chunk {chunk_id} (combos {start_combo}-{end_combo})")
try:
# Worker 2 requires SSH through worker 1
if worker.get('ssh_hop'):
# SSH to worker1, then SSH to worker2 from there
hop_cmd = f"ssh {worker['host']} '{worker_cmd}'"
full_cmd = ['ssh', worker['ssh_hop'], hop_cmd]
else:
# Direct SSH to worker1
full_cmd = ['ssh', worker['host'], worker_cmd]
subprocess.run(full_cmd, check=True, timeout=30)
print(f"{worker_name} started successfully")
return True
except Exception as e:
print(f"✗ Failed to start {worker_name}: {e}")
# Mark chunk as pending again
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
UPDATE v11_full_chunks
SET status = 'pending', assigned_worker = NULL, started_at = NULL
WHERE chunk_id = ?
''', (chunk_id,))
conn.commit()
conn.close()
return False
def main():
"""Main coordinator loop"""
print("=" * 80)
print("V11 FULL PARAMETER SWEEP COORDINATOR")
print("=" * 80)
print()
print("Exhaustive Grid: 26,244 combinations")
print("Chunk Size: 2,000 combinations")
print("Workers: worker1 (24 cores), worker2 (18 cores, 7PM-6AM)")
print()
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Initialize database
init_db()
# Create chunks
create_chunks()
# Get initial stats
stats = get_chunk_stats()
# Send startup notification
message = (
f"🚀 <b>V11 Full Sweep STARTED</b>\n\n"
f"Total Chunks: {stats['total']}\n"
f"Total Combinations: {stats['total'] * CHUNK_SIZE:,}\n"
f"Workers: {len(WORKERS)}\n\n"
f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
send_telegram_message(message)
print()
print("=" * 80)
print("SWEEP STARTED")
print("=" * 80)
print()
sweep_start = time.time()
try:
while True:
stats = get_chunk_stats()
# Check if sweep complete
if stats['pending'] == 0 and stats['running'] == 0:
duration = time.time() - sweep_start
hours = int(duration // 3600)
minutes = int((duration % 3600) // 60)
message = (
f"✅ <b>V11 Full Sweep COMPLETE</b>\n\n"
f"Total Chunks: {stats['completed']}\n"
f"Total Combinations: {stats['completed'] * CHUNK_SIZE:,}\n"
f"Duration: {hours}h {minutes}m\n\n"
f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
send_telegram_message(message)
print()
print("=" * 80)
print("SWEEP COMPLETE!")
print("=" * 80)
print(f"Duration: {hours}h {minutes}m")
print()
break
# Assign work to available workers
for worker_name in WORKERS.keys():
if not is_worker_allowed_to_run(worker_name):
continue
# Check if worker already has work
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
SELECT COUNT(*) FROM v11_full_chunks
WHERE status = 'running' AND assigned_worker = ?
''', (worker_name,))
running_count = c.fetchone()[0]
conn.close()
if running_count > 0:
continue
# Get next chunk
next_chunk = get_next_chunk()
if next_chunk:
chunk_id, start_combo, end_combo = next_chunk
start_worker(worker_name, chunk_id, start_combo, end_combo)
# Status update
print(f"\r[{datetime.now().strftime('%H:%M:%S')}] "
f"Pending: {stats['pending']} | Running: {stats['running']} | "
f"Completed: {stats['completed']}/{stats['total']}", end='', flush=True)
time.sleep(30)
except KeyboardInterrupt:
print("\n\nReceived interrupt signal, cleaning up...")
signal_handler(signal.SIGINT, None)
if __name__ == '__main__':
main()

175
cluster/v11_full_worker.py Executable file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""
V11 Full Sweep Worker - Processes parameter combinations in parallel
Runs on EPYC workers, processes assigned chunk of parameter space.
"""
import argparse
import itertools
import json
import multiprocessing as mp
import sys
import time
from pathlib import Path
# Add backtester to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'backtester'))
from backtester.data_loader import load_csv
from backtester.simulator import run_backtest
from backtester.v11_moneyline_all_filters import MoneyLineV11Inputs
# Parameter grid (matches coordinator)
PARAM_GRID = {
'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5],
'adx_min': [0, 5, 10, 15, 20, 25],
'long_pos_max': [90, 95, 100],
'short_pos_min': [0, 5, 10],
'vol_min': [0.0, 0.5, 1.0],
'entry_buffer_atr': [0.0, 0.05, 0.10],
'rsi_long_min': [20, 25, 30],
'rsi_short_max': [70, 75, 80],
}
def test_single_config(args):
"""Test a single parameter configuration"""
combo_idx, params_tuple, data = args
# Unpack parameters
keys = sorted(PARAM_GRID.keys())
params = dict(zip(keys, params_tuple))
# Create inputs
inputs = MoneyLineV11Inputs(
flip_threshold=params['flip_threshold'],
adx_min=params['adx_min'],
long_pos_max=params['long_pos_max'],
short_pos_min=params['short_pos_min'],
vol_min=params['vol_min'],
entry_buffer_atr=params['entry_buffer_atr'],
rsi_long_min=params['rsi_long_min'],
rsi_short_max=params['rsi_short_max'],
)
try:
# Run backtest
results = run_backtest(data, inputs, 'v11')
return {
'params': params,
'pnl': results['total_pnl'],
'win_rate': results['win_rate'],
'profit_factor': results['profit_factor'],
'max_drawdown': results['max_drawdown'],
'total_trades': results['total_trades'],
}
except Exception as e:
print(f"Error testing combo {combo_idx}: {e}")
return None
def main():
parser = argparse.ArgumentParser(description='V11 Full Sweep Worker')
parser.add_argument('--chunk-id', required=True, help='Chunk ID')
parser.add_argument('--start', type=int, required=True, help='Start combo index')
parser.add_argument('--end', type=int, required=True, help='End combo index')
parser.add_argument('--workers', type=int, default=24, help='Parallel workers')
args = parser.parse_args()
print(f"V11 Full Worker: {args.chunk_id}")
print(f"Range: {args.start} - {args.end}")
print(f"Workers: {args.workers}")
print()
# Load data
print("Loading market data...")
data = load_csv(Path('data/solusdt_5m.csv'), 'SOLUSDT', '5m')
print(f"Loaded {len(data)} candles")
print()
# Generate all combinations
keys = sorted(PARAM_GRID.keys())
values = [PARAM_GRID[k] for k in keys]
all_combos = list(itertools.product(*values))
# Get this chunk's combos
chunk_combos = all_combos[args.start:args.end]
print(f"Testing {len(chunk_combos)} combinations...")
print()
# Prepare args for parallel processing
test_args = [
(args.start + i, combo, data)
for i, combo in enumerate(chunk_combos)
]
# Run tests in parallel
start_time = time.time()
results = []
with mp.Pool(processes=args.workers) as pool:
for i, result in enumerate(pool.imap_unordered(test_single_config, test_args)):
if result:
results.append(result)
# Progress update every 10 combos
if (i + 1) % 10 == 0:
elapsed = time.time() - start_time
rate = (i + 1) / elapsed
remaining = len(chunk_combos) - (i + 1)
eta = remaining / rate if rate > 0 else 0
print(f"Progress: {i+1}/{len(chunk_combos)} "
f"({rate:.1f} combos/s, ETA: {eta/60:.1f}m)")
# Sort by P&L descending
results.sort(key=lambda x: x['pnl'], reverse=True)
# Save results to CSV
output_path = Path(f"v11_results/{args.chunk_id}_results.csv")
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'w') as f:
# Header
param_cols = sorted(PARAM_GRID.keys())
metric_cols = ['pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades']
header = ','.join(param_cols + metric_cols)
f.write(header + '\n')
# Data rows
for result in results:
param_values = [str(result['params'][k]) for k in param_cols]
metric_values = [
f"{result['pnl']:.1f}",
f"{result['win_rate']:.1f}",
f"{result['profit_factor']:.3f}",
f"{result['max_drawdown']:.1f}",
str(result['total_trades'])
]
row = ','.join(param_values + metric_values)
f.write(row + '\n')
duration = time.time() - start_time
print()
print("=" * 60)
print(f"Chunk {args.chunk_id} COMPLETE")
print(f"Duration: {duration/60:.1f} minutes")
print(f"Rate: {len(chunk_combos)/duration:.2f} combos/second")
print(f"Results saved: {output_path}")
if results:
best = results[0]
print()
print("Best configuration in chunk:")
print(f" P&L: ${best['pnl']:.2f}")
print(f" Win Rate: {best['win_rate']:.1f}%")
print(f" Profit Factor: {best['profit_factor']:.3f}")
print(f" Parameters: {best['params']}")
print("=" * 60)
if __name__ == '__main__':
main()

311
cluster/v11_full_worker_FIXED.py Executable file
View File

@@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""
V11 Full Parameter Sweep Worker
Processes chunks of v11 parameter configurations (26,244 combinations total).
Uses all available cores for multiprocessing.
FULL EXHAUSTIVE SWEEP across all filter parameters.
"""
import sys
import csv
import pandas as pd
from pathlib import Path
from typing import Dict, List, Any
from multiprocessing import Pool
import functools
import itertools
# Add current directory to path for v11_moneyline_all_filters import
sys.path.insert(0, str(Path(__file__).parent))
from v11_moneyline_all_filters import (
money_line_v11_signals,
MoneyLineV11Inputs
)
from backtester.simulator import simulate_money_line
# CPU limit: Use all available cores (24 for worker1, 18 for worker2)
MAX_WORKERS = 24 # Default, overridden by --workers argument
# Global data file path (set by init_worker)
_DATA_FILE = None
def init_worker(data_file):
"""Initialize worker process with data file path"""
global _DATA_FILE
_DATA_FILE = data_file
# FULL EXHAUSTIVE Parameter grid (26,244 combinations)
PARAMETER_GRID = {
'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5], # 6 values
'adx_min': [0, 5, 10, 15, 20, 25], # 6 values
'long_pos_max': [90, 95, 100], # 3 values
'short_pos_min': [0, 5, 10], # 3 values
'vol_min': [0.0, 0.5, 1.0], # 3 values
'entry_buffer_atr': [0.0, 0.05, 0.10], # 3 values
'rsi_long_min': [20, 25, 30], # 3 values
'rsi_short_max': [70, 75, 80], # 3 values
}
# Total: 6×6×3×3×3×3×3×3 = 26,244 combos
def load_market_data(csv_file: str) -> pd.DataFrame:
"""Load OHLCV data from CSV"""
df = pd.read_csv(csv_file)
# Ensure required columns exist
required = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
for col in required:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
# Convert timestamp if needed
if df['timestamp'].dtype == 'object':
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
print(f"✓ Loaded {len(df):,} bars from {csv_file}")
return df
def backtest_config(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Run backtest for single v11 test parameter configuration
Loads data from global _DATA_FILE path on first call.
Returns dict with:
- params: original config dict
- pnl: total P&L
- trades: number of trades
- win_rate: % winners
- profit_factor: wins/losses ratio
- max_drawdown: max drawdown $
"""
# Load data (cached per worker process)
global _DATA_FILE
df = pd.read_csv(_DATA_FILE)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
try:
# Create v11 inputs
inputs = MoneyLineV11Inputs(
use_quality_filters=True, # 🔧 FIX: Enable filters for progressive sweep
flip_threshold=config['flip_threshold'],
adx_min=config['adx_min'],
long_pos_max=config['long_pos_max'],
short_pos_min=config['short_pos_min'],
vol_min=config['vol_min'],
entry_buffer_atr=config['entry_buffer_atr'],
rsi_long_min=config['rsi_long_min'],
rsi_long_max=70, # 🔧 FIX: Add missing fixed parameter
rsi_short_min=30, # 🔧 FIX: Add missing fixed parameter
rsi_short_max=config['rsi_short_max'],
)
print(f" Generating signals...", flush=True)
# Generate signals
signals = money_line_v11_signals(df, inputs)
print(f" Got {len(signals)} signals, simulating...", flush=True)
if not signals:
return {
'params': config,
'pnl': 0.0,
'trades': 0,
'win_rate': 0.0,
'profit_factor': 0.0,
'max_drawdown': 0.0,
}
# Simple backtesting: track equity curve
equity = 1000.0 # Starting capital
peak_equity = equity
max_drawdown = 0.0
wins = 0
losses = 0
win_pnl = 0.0
loss_pnl = 0.0
for signal in signals:
# Simple trade simulation
# TP1 at +0.86%, SL at -1.29% (ATR-based defaults)
entry = signal.entry_price
# Look ahead in data to see if TP or SL hit
signal_idx = df.index.get_loc(signal.timestamp)
# Look ahead up to 100 bars
max_bars = min(100, len(df) - signal_idx - 1)
if max_bars <= 0:
continue
future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars]
if signal.direction == "long":
tp_price = entry * 1.0086 # +0.86%
sl_price = entry * 0.9871 # -1.29%
# Check if TP or SL hit
hit_tp = (future_data['high'] >= tp_price).any()
hit_sl = (future_data['low'] <= sl_price).any()
if hit_tp:
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
equity += pnl
wins += 1
win_pnl += pnl
elif hit_sl:
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
equity += pnl
losses += 1
loss_pnl += abs(pnl)
else: # short
tp_price = entry * 0.9914 # -0.86%
sl_price = entry * 1.0129 # +1.29%
# Check if TP or SL hit
hit_tp = (future_data['low'] <= tp_price).any()
hit_sl = (future_data['high'] >= sl_price).any()
if hit_tp:
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
equity += pnl
wins += 1
win_pnl += pnl
elif hit_sl:
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
equity += pnl
losses += 1
loss_pnl += abs(pnl)
# Track drawdown
peak_equity = max(peak_equity, equity)
current_drawdown = peak_equity - equity
max_drawdown = max(max_drawdown, current_drawdown)
total_trades = wins + losses
win_rate = wins / total_trades if total_trades > 0 else 0.0
profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0)
total_pnl = equity - 1000.0
return {
'params': config,
'pnl': round(total_pnl, 2),
'trades': total_trades,
'win_rate': round(win_rate * 100, 1),
'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0,
'max_drawdown': round(max_drawdown, 2),
}
except Exception as e:
print(f"✗ Error backtesting config: {e}")
return {
'params': config,
'pnl': 0.0,
'trades': 0,
'win_rate': 0.0,
'profit_factor': 0.0,
'max_drawdown': 0.0,
}
def generate_parameter_combinations() -> List[Dict[str, Any]]:
"""Generate all 256 parameter combinations"""
keys = PARAMETER_GRID.keys()
values = PARAMETER_GRID.values()
combinations = []
for combo in itertools.product(*values):
config = dict(zip(keys, combo))
combinations.append(config)
return combinations
def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int):
"""Process a chunk of parameter combinations"""
print(f"\n{'='*60}")
print(f"V11 Test Worker - {chunk_id}")
print(f"Processing combinations {start_idx} to {end_idx-1}")
print(f"{'='*60}\n")
# Load market data
df = load_market_data(data_file)
# Generate all combinations
all_combos = generate_parameter_combinations()
print(f"✓ Generated {len(all_combos)} total combinations")
# Get this chunk's combinations
chunk_combos = all_combos[start_idx:end_idx]
print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n")
# Backtest with multiprocessing (pass data file path instead of dataframe)
print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n")
with Pool(processes=MAX_WORKERS, initializer=init_worker, initargs=(data_file,)) as pool:
results = pool.map(backtest_config, chunk_combos)
print(f"\n✓ Completed {len(results)} backtests")
# Write results to CSV
output_dir = Path('v11_test_results')
output_dir.mkdir(exist_ok=True)
csv_file = output_dir / f"{chunk_id}_results.csv"
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min',
'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max',
'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades'
])
# Data rows
for result in results:
params = result['params']
writer.writerow([
params['flip_threshold'],
params['adx_min'],
params['long_pos_max'],
params['short_pos_min'],
params['vol_min'],
params['entry_buffer_atr'],
params['rsi_long_min'],
params['rsi_short_max'],
result['pnl'],
result['win_rate'],
result['profit_factor'],
result['max_drawdown'],
result['trades'],
])
print(f"✓ Results saved to {csv_file}")
# Show top 5 results
sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True)
print(f"\n🏆 Top 5 Results:")
for i, r in enumerate(sorted_results[:5], 1):
print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%")
if __name__ == '__main__':
if len(sys.argv) != 4:
print("Usage: python v11_test_worker.py <data_file> <chunk_id> <start_idx>")
sys.exit(1)
data_file = sys.argv[1]
chunk_id = sys.argv[2]
start_idx = int(sys.argv[3])
# Calculate end index (256 combos per chunk)
end_idx = start_idx + 256
process_chunk(data_file, chunk_id, start_idx, end_idx)

View File

@@ -3,7 +3,7 @@
"nodes": [
{
"parameters": {
"jsCode": "// Get the body - it might be a string or nested in an object\nlet body = $json.body || $json.query?.body || JSON.stringify($json);\n\n// If body is an object, stringify it\nif (typeof body === 'object') {\n body = JSON.stringify(body);\n}\n\n// Parse basic signal (existing logic)\nconst symbolMatch = body.match(/\\b(SOL|BTC|ETH)\\b/i);\nconst symbol = symbolMatch ? symbolMatch[1].toUpperCase() + '-PERP' : 'SOL-PERP';\n\nconst direction = body.match(/\\b(sell|short)\\b/i) ? 'short' : 'long';\n\n// Enhanced timeframe extraction supporting multiple formats:\n// - \"buy 5\" → \"5\"\n// - \"buy 15\" → \"15\"\n// - \"buy 60\" or \"buy 1h\" → \"60\"\n// - \"buy 240\" or \"buy 4h\" → \"240\"\n// - \"buy D\" or \"buy 1d\" → \"D\"\n// - \"buy W\" → \"W\"\nconst timeframeMatch = body.match(/\\b(buy|sell)\\s+(\\d+|D|W|M|1h|4h|1d)\\b/i);\nlet timeframe = '5'; // Default to 5min\n\nif (timeframeMatch) {\n const tf = timeframeMatch[2];\n // Convert hour/day notation to minutes\n if (tf === '1h' || tf === '60') {\n timeframe = '60';\n } else if (tf === '4h' || tf === '240') {\n timeframe = '240';\n } else if (tf === '1d' || tf.toUpperCase() === 'D') {\n timeframe = 'D';\n } else if (tf.toUpperCase() === 'W') {\n timeframe = 'W';\n } else if (tf.toUpperCase() === 'M') {\n timeframe = 'M';\n } else {\n timeframe = tf;\n }\n}\n\n// Parse new context metrics from enhanced format:\n// \"SOLT.P buy 15 | ATR:0.65 | ADX:14.3 | RSI:51.3 | VOL:0.87 | POS:59.3 | MAGAP:-1.23 | IND:v9\"\nconst atrMatch = body.match(/ATR:([\\d.]+)/);\nconst atr = atrMatch ? parseFloat(atrMatch[1]) : 0;\n\nconst adxMatch = body.match(/ADX:([\\d.]+)/);\nconst adx = adxMatch ? parseFloat(adxMatch[1]) : 0;\n\nconst rsiMatch = body.match(/RSI:([\\d.]+)/);\nconst rsi = rsiMatch ? parseFloat(rsiMatch[1]) : 0;\n\nconst volumeMatch = body.match(/VOL:([\\d.]+)/);\nconst volumeRatio = volumeMatch ? parseFloat(volumeMatch[1]) : 0;\n\nconst pricePositionMatch = body.match(/POS:([\\d.]+)/);\nconst pricePosition = pricePositionMatch ? parseFloat(pricePositionMatch[1]) : 0;\n\n// Parse signal price from \"@ price\" format (for 1min data feed and v9 signals)\n// Must match: \"buy 1 @ 142.08 |\" (@ followed by price before first pipe)\n// DEBUG: Log body to see actual format\nconsole.log('DEBUG body:', body);\nconst signalPriceMatch = body.match(/@\\s*([\\d.]+)\\s*\\|/);\nconsole.log('DEBUG signalPriceMatch:', signalPriceMatch);\nconst signalPrice = signalPriceMatch ? parseFloat(signalPriceMatch[1]) : undefined;\nconsole.log('DEBUG signalPrice:', signalPrice, 'pricePosition will be:', body.match(/POS:([\\d.]+)/) ? body.match(/POS:([\\d.]+)/)[1] : 'not found');\n\n// V9: Parse MA gap (optional, backward compatible with v8)\nconst maGapMatch = body.match(/MAGAP:([-\\d.]+)/);\nconst maGap = maGapMatch ? parseFloat(maGapMatch[1]) : undefined;\n\n// Parse indicator version (optional, backward compatible)\nconst indicatorVersionMatch = body.match(/IND:(v\\d+)/i);\nconst indicatorVersion = indicatorVersionMatch ? indicatorVersionMatch[1] : 'v5';\n\nreturn {\n rawMessage: body,\n symbol,\n direction,\n timeframe,\n signalPrice, // NEW: Actual price from TradingView\n // Context fields\n atr,\n adx,\n rsi,\n volumeRatio,\n pricePosition,\n maGap, // V9 NEW\n // Version tracking (defaults to v5 for backward compatibility)\n indicatorVersion\n};"
"jsCode": "// Get the body - it might be a string or nested in an object\nlet body = $json.body || $json.query?.body || JSON.stringify($json);\n\n// If body is an object, stringify it\nif (typeof body === 'object') {\n body = JSON.stringify(body);\n}\n\n// Detect MA crossover events (death cross / golden cross)\nconst isMACrossover = body.match(/crossing/i) !== null;\n\n// Parse basic signal (existing logic)\nconst symbolMatch = body.match(/\\b(SOL|BTC|ETH)\\b/i);\nconst symbol = symbolMatch ? symbolMatch[1].toUpperCase() + '-PERP' : 'SOL-PERP';\n\nconst direction = body.match(/\\b(sell|short)\\b/i) ? 'short' : 'long';\n\n// Determine crossover type based on direction\nconst isDeathCross = isMACrossover && direction === 'short';\nconst isGoldenCross = isMACrossover && direction === 'long';\n\n// Enhanced timeframe extraction supporting multiple formats:\n// - \"buy 5\" → \"5\"\n// - \"buy 15\" → \"15\"\n// - \"buy 60\" or \"buy 1h\" → \"60\"\n// - \"buy 240\" or \"buy 4h\" → \"240\"\n// - \"buy D\" or \"buy 1d\" → \"D\"\n// - \"buy W\" → \"W\"\nconst timeframeMatch = body.match(/\\b(buy|sell)\\s+(\\d+|D|W|M|1h|4h|1d)\\b/i);\nlet timeframe = '5'; // Default to 5min\n\nif (timeframeMatch) {\n const tf = timeframeMatch[2];\n // Convert hour/day notation to minutes\n if (tf === '1h' || tf === '60') {\n timeframe = '60';\n } else if (tf === '4h' || tf === '240') {\n timeframe = '240';\n } else if (tf === '1d' || tf.toUpperCase() === 'D') {\n timeframe = 'D';\n } else if (tf.toUpperCase() === 'W') {\n timeframe = 'W';\n } else if (tf.toUpperCase() === 'M') {\n timeframe = 'M';\n } else {\n timeframe = tf;\n }\n}\n\n// Parse new context metrics from enhanced format:\n// \"SOLT.P buy 15 | ATR:0.65 | ADX:14.3 | RSI:51.3 | VOL:0.87 | POS:59.3 | MAGAP:-1.23 | IND:v9\"\nconst atrMatch = body.match(/ATR:([\\d.]+)/);\nconst atr = atrMatch ? parseFloat(atrMatch[1]) : 0;\n\nconst adxMatch = body.match(/ADX:([\\d.]+)/);\nconst adx = adxMatch ? parseFloat(adxMatch[1]) : 0;\n\nconst rsiMatch = body.match(/RSI:([\\d.]+)/);\nconst rsi = rsiMatch ? parseFloat(rsiMatch[1]) : 0;\n\nconst volumeMatch = body.match(/VOL:([\\d.]+)/);\nconst volumeRatio = volumeMatch ? parseFloat(volumeMatch[1]) : 0;\n\nconst pricePositionMatch = body.match(/POS:([\\d.]+)/);\nconst pricePosition = pricePositionMatch ? parseFloat(pricePositionMatch[1]) : 0;\n\n// Parse signal price from \"@ price\" format (for 1min data feed and v9 signals)\n// Must match: \"buy 1 @ 142.08 |\" (@ followed by price before first pipe)\n// DEBUG: Log body to see actual format\nconsole.log('DEBUG body:', body);\nconst signalPriceMatch = body.match(/@\\s*([\\d.]+)\\s*\\|/);\nconsole.log('DEBUG signalPriceMatch:', signalPriceMatch);\nconst signalPrice = signalPriceMatch ? parseFloat(signalPriceMatch[1]) : undefined;\nconsole.log('DEBUG signalPrice:', signalPrice, 'pricePosition will be:', body.match(/POS:([\\d.]+)/) ? body.match(/POS:([\\d.]+)/)[1] : 'not found');\n\n// V9: Parse MA gap (optional, backward compatible with v8)\nconst maGapMatch = body.match(/MAGAP:([-\\d.]+)/);\nconst maGap = maGapMatch ? parseFloat(maGapMatch[1]) : undefined;\n\n// Parse indicator version (optional, backward compatible)\nconst indicatorVersionMatch = body.match(/IND:(v\\d+)/i);\nconst indicatorVersion = indicatorVersionMatch ? indicatorVersionMatch[1] : 'v5';\n\nreturn {\n rawMessage: body,\n symbol,\n direction,\n timeframe,\n signalPrice,\n // Context fields\n atr,\n adx,\n rsi,\n volumeRatio,\n pricePosition,\n maGap,\n // MA Crossover detection (NEW: Nov 27, 2025)\n isMACrossover,\n isDeathCross,\n isGoldenCross,\n // Version tracking\n indicatorVersion\n};"
},
"id": "parse-signal-enhanced",
"name": "Parse Signal Enhanced",
@@ -19,6 +19,6 @@
"staticData": null,
"tags": [],
"triggerCount": 0,
"updatedAt": "2025-10-30T00:00:00.000Z",
"versionId": "1"
"updatedAt": "2025-11-27T00:00:00.000Z",
"versionId": "2"
}