Files
trading_bot_v4/cluster/v11_full_worker_FIXED.py
mindesbunister b6d4a8f157 fix: Add Position Manager health monitoring system
CRITICAL FIXES FOR $1,000 LOSS BUG (Dec 8, 2025):

**Bug #1: Position Manager Never Actually Monitors**
- System logged 'Trade added' but never started monitoring
- isMonitoring stayed false despite having active trades
- Result: No TP/SL monitoring, no protection, uncontrolled losses

**Bug #2: Silent SL Placement Failures**
- placeExitOrders() returned SUCCESS but only 2/3 orders placed
- Missing SL order left $2,003 position completely unprotected
- No error logs, no indication anything was wrong

**Bug #3: Orphan Detection Cancelled Active Orders**
- Old orphaned position detection triggered on NEW position
- Cancelled TP/SL orders while leaving position open
- User opened trade WITH protection, system REMOVED protection

**SOLUTION: Health Monitoring System**

New file: lib/health/position-manager-health.ts
- Runs every 30 seconds to detect critical failures
- Checks: DB open trades vs PM monitoring status
- Checks: PM has trades but monitoring is OFF
- Checks: Missing SL/TP orders on open positions
- Checks: DB vs Drift position count mismatch
- Logs: CRITICAL alerts when bugs detected

Integration: lib/startup/init-position-manager.ts
- Health monitor starts automatically on server startup
- Runs alongside other critical services
- Provides continuous verification Position Manager works

Test: tests/integration/position-manager/monitoring-verification.test.ts
- Validates startMonitoring() actually calls priceMonitor.start()
- Validates isMonitoring flag set correctly
- Validates price updates trigger trade checks
- Validates monitoring stops when no trades remain

**Why This Matters:**
User lost $1,000+ because Position Manager said 'working' but wasn't.
This health system detects that failure within 30 seconds and alerts.

**Next Steps:**
1. Rebuild Docker container
2. Verify health monitor starts
3. Manually test: open position, wait 30s, check health logs
4. If issues found: Health monitor will alert immediately

This prevents the $1,000 loss bug from ever happening again.
2025-12-08 15:43:54 +01:00

316 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
V11 Full Parameter Sweep Worker
Processes chunks of v11 parameter configurations (26,244 combinations total).
Uses all available cores for multiprocessing.
FULL EXHAUSTIVE SWEEP across all filter parameters.
"""
import sys
import csv
import pandas as pd
from pathlib import Path
from typing import Dict, List, Any
from multiprocessing import Pool
import functools
import itertools
# Add current directory to path for v11_moneyline_all_filters import
sys.path.insert(0, str(Path(__file__).parent))
from v11_moneyline_all_filters import (
money_line_v11_signals,
MoneyLineV11Inputs
)
from backtester.simulator import simulate_money_line
# CPU limit: Use all available cores (24 for worker1, 18 for worker2)
MAX_WORKERS = 24 # Default, overridden by --workers argument
# Global data file path (set by init_worker)
_DATA_FILE = None
def init_worker(data_file):
"""Initialize worker process with data file path"""
global _DATA_FILE
_DATA_FILE = data_file
# FULL EXHAUSTIVE Parameter grid (26,244 combinations)
PARAMETER_GRID = {
'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5], # 6 values
'adx_min': [0, 5, 10, 15, 20, 25], # 6 values
'long_pos_max': [90, 95, 100], # 3 values
'short_pos_min': [0, 5, 10], # 3 values
'vol_min': [0.0, 0.5, 1.0], # 3 values
'entry_buffer_atr': [0.0, 0.05, 0.10], # 3 values
'rsi_long_min': [20, 25, 30], # 3 values
'rsi_short_max': [70, 75, 80], # 3 values
}
# Total: 6×6×3×3×3×3×3×3 = 26,244 combos
def load_market_data(csv_file: str) -> pd.DataFrame:
"""Load OHLCV data from CSV"""
df = pd.read_csv(csv_file)
# Ensure required columns exist
required = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
for col in required:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
# Convert timestamp if needed
if df['timestamp'].dtype == 'object':
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
print(f"✓ Loaded {len(df):,} bars from {csv_file}")
return df
def backtest_config(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Run backtest for single v11 test parameter configuration
Loads data from global _DATA_FILE path on first call.
Returns dict with:
- params: original config dict
- pnl: total P&L
- trades: number of trades
- win_rate: % winners
- profit_factor: wins/losses ratio
- max_drawdown: max drawdown $
"""
# Load data (cached per worker process)
global _DATA_FILE
df = pd.read_csv(_DATA_FILE)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
try:
# Create v11 inputs
inputs = MoneyLineV11Inputs(
use_quality_filters=True, # 🔧 FIX: Enable filters for progressive sweep
flip_threshold=config['flip_threshold'],
adx_min=config['adx_min'],
long_pos_max=config['long_pos_max'],
short_pos_min=config['short_pos_min'],
vol_min=config['vol_min'],
entry_buffer_atr=config['entry_buffer_atr'],
rsi_long_min=config['rsi_long_min'],
rsi_long_max=70, # 🔧 FIX: Add missing fixed parameter
rsi_short_min=30, # 🔧 FIX: Add missing fixed parameter
rsi_short_max=config['rsi_short_max'],
)
print(f" Generating signals...", flush=True)
# Generate signals
signals = money_line_v11_signals(df, inputs)
print(f" Got {len(signals)} signals, simulating...", flush=True)
if not signals:
return {
'params': config,
'pnl': 0.0,
'trades': 0,
'win_rate': 0.0,
'profit_factor': 0.0,
'max_drawdown': 0.0,
}
# Simple backtesting: track equity curve
equity = 1000.0 # Starting capital
peak_equity = equity
max_drawdown = 0.0
wins = 0
losses = 0
win_pnl = 0.0
loss_pnl = 0.0
for signal in signals:
# Simple trade simulation
# TP1 at +0.86%, SL at -1.29% (ATR-based defaults)
entry = signal.entry_price
# Look ahead in data to see if TP or SL hit
signal_idx = df.index.get_loc(signal.timestamp)
# Look ahead up to 100 bars
max_bars = min(100, len(df) - signal_idx - 1)
if max_bars <= 0:
continue
future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars]
if signal.direction == "long":
tp_price = entry * 1.0086 # +0.86%
sl_price = entry * 0.9871 # -1.29%
# Check if TP or SL hit
hit_tp = (future_data['high'] >= tp_price).any()
hit_sl = (future_data['low'] <= sl_price).any()
if hit_tp:
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
equity += pnl
wins += 1
win_pnl += pnl
elif hit_sl:
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
equity += pnl
losses += 1
loss_pnl += abs(pnl)
else: # short
tp_price = entry * 0.9914 # -0.86%
sl_price = entry * 1.0129 # +1.29%
# Check if TP or SL hit
hit_tp = (future_data['low'] <= tp_price).any()
hit_sl = (future_data['high'] >= sl_price).any()
if hit_tp:
pnl = 1000.0 * 0.0086 # $8.60 on $1000 position
equity += pnl
wins += 1
win_pnl += pnl
elif hit_sl:
pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position
equity += pnl
losses += 1
loss_pnl += abs(pnl)
# Track drawdown
peak_equity = max(peak_equity, equity)
current_drawdown = peak_equity - equity
max_drawdown = max(max_drawdown, current_drawdown)
total_trades = wins + losses
win_rate = wins / total_trades if total_trades > 0 else 0.0
profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0)
total_pnl = equity - 1000.0
return {
'params': config,
'pnl': round(total_pnl, 2),
'trades': total_trades,
'win_rate': round(win_rate * 100, 1),
'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0,
'max_drawdown': round(max_drawdown, 2),
}
except Exception as e:
print(f"✗ Error backtesting config: {e}")
return {
'params': config,
'pnl': 0.0,
'trades': 0,
'win_rate': 0.0,
'profit_factor': 0.0,
'max_drawdown': 0.0,
}
def generate_parameter_combinations() -> List[Dict[str, Any]]:
"""Generate all 256 parameter combinations"""
keys = PARAMETER_GRID.keys()
values = PARAMETER_GRID.values()
combinations = []
for combo in itertools.product(*values):
config = dict(zip(keys, combo))
combinations.append(config)
return combinations
def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int):
"""Process a chunk of parameter combinations"""
print(f"\n{'='*60}")
print(f"V11 Test Worker - {chunk_id}")
print(f"Processing combinations {start_idx} to {end_idx-1}")
print(f"{'='*60}\n")
# Load market data
df = load_market_data(data_file)
# Generate all combinations
all_combos = generate_parameter_combinations()
print(f"✓ Generated {len(all_combos)} total combinations")
# Get this chunk's combinations
chunk_combos = all_combos[start_idx:end_idx]
print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n")
# Backtest with multiprocessing (pass data file path instead of dataframe)
print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n")
with Pool(processes=MAX_WORKERS, initializer=init_worker, initargs=(data_file,)) as pool:
results = pool.map(backtest_config, chunk_combos)
print(f"\n✓ Completed {len(results)} backtests")
# Write results to CSV
output_dir = Path('v11_results')
output_dir.mkdir(exist_ok=True)
csv_file = output_dir / f"{chunk_id}_results.csv"
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min',
'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max',
'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades'
])
# Data rows
for result in results:
params = result['params']
writer.writerow([
params['flip_threshold'],
params['adx_min'],
params['long_pos_max'],
params['short_pos_min'],
params['vol_min'],
params['entry_buffer_atr'],
params['rsi_long_min'],
params['rsi_short_max'],
result['pnl'],
result['win_rate'],
result['profit_factor'],
result['max_drawdown'],
result['trades'],
])
print(f"✓ Results saved to {csv_file}")
# Show top 5 results
sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True)
print(f"\n🏆 Top 5 Results:")
for i, r in enumerate(sorted_results[:5], 1):
print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='V11 Full Sweep Worker')
parser.add_argument('--chunk-id', required=True, help='Chunk ID')
parser.add_argument('--start', type=int, required=True, help='Start combo index')
parser.add_argument('--end', type=int, required=True, help='End combo index')
parser.add_argument('--workers', type=int, default=24, help='Number of parallel workers')
args = parser.parse_args()
# Update MAX_WORKERS from argument
MAX_WORKERS = args.workers
data_file = 'data/solusdt_5m.csv'
process_chunk(data_file, args.chunk_id, args.start, args.end)