Files
trading_bot_v4/cluster/v11_full_worker.py
mindesbunister 57c2565e63 critical: Position Manager monitoring failure - 08 loss incident (Dec 8, 2025)
- Bug #73 recurrence: Position opened Dec 7 22:15 but PM never monitored
- Root cause: Container running OLD code from BEFORE Dec 7 fix (2:46 AM start < 2:46 AM commit)
- User lost 08 on unprotected SOL-PERP SHORT
- Fix: Rebuilt and restarted container with 3-layer safety system
- Status: VERIFIED deployed - all safety layers active
- Prevention: Container timestamp MUST be AFTER commit timestamp
2025-12-08 07:51:28 +01:00

176 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
V11 Full Sweep Worker - Processes parameter combinations in parallel
Runs on EPYC workers, processes assigned chunk of parameter space.
"""
import argparse
import itertools
import json
import multiprocessing as mp
import sys
import time
from pathlib import Path
# Add backtester to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'backtester'))
from backtester.data_loader import load_csv
from backtester.simulator import run_backtest
from backtester.v11_moneyline_all_filters import MoneyLineV11Inputs
# Parameter grid (matches coordinator)
PARAM_GRID = {
'flip_threshold': [0.25, 0.3, 0.35, 0.4, 0.45, 0.5],
'adx_min': [0, 5, 10, 15, 20, 25],
'long_pos_max': [90, 95, 100],
'short_pos_min': [0, 5, 10],
'vol_min': [0.0, 0.5, 1.0],
'entry_buffer_atr': [0.0, 0.05, 0.10],
'rsi_long_min': [20, 25, 30],
'rsi_short_max': [70, 75, 80],
}
def test_single_config(args):
"""Test a single parameter configuration"""
combo_idx, params_tuple, data = args
# Unpack parameters
keys = sorted(PARAM_GRID.keys())
params = dict(zip(keys, params_tuple))
# Create inputs
inputs = MoneyLineV11Inputs(
flip_threshold=params['flip_threshold'],
adx_min=params['adx_min'],
long_pos_max=params['long_pos_max'],
short_pos_min=params['short_pos_min'],
vol_min=params['vol_min'],
entry_buffer_atr=params['entry_buffer_atr'],
rsi_long_min=params['rsi_long_min'],
rsi_short_max=params['rsi_short_max'],
)
try:
# Run backtest
results = run_backtest(data, inputs, 'v11')
return {
'params': params,
'pnl': results['total_pnl'],
'win_rate': results['win_rate'],
'profit_factor': results['profit_factor'],
'max_drawdown': results['max_drawdown'],
'total_trades': results['total_trades'],
}
except Exception as e:
print(f"Error testing combo {combo_idx}: {e}")
return None
def main():
parser = argparse.ArgumentParser(description='V11 Full Sweep Worker')
parser.add_argument('--chunk-id', required=True, help='Chunk ID')
parser.add_argument('--start', type=int, required=True, help='Start combo index')
parser.add_argument('--end', type=int, required=True, help='End combo index')
parser.add_argument('--workers', type=int, default=24, help='Parallel workers')
args = parser.parse_args()
print(f"V11 Full Worker: {args.chunk_id}")
print(f"Range: {args.start} - {args.end}")
print(f"Workers: {args.workers}")
print()
# Load data
print("Loading market data...")
data = load_csv(Path('data/solusdt_5m.csv'), 'SOLUSDT', '5m')
print(f"Loaded {len(data)} candles")
print()
# Generate all combinations
keys = sorted(PARAM_GRID.keys())
values = [PARAM_GRID[k] for k in keys]
all_combos = list(itertools.product(*values))
# Get this chunk's combos
chunk_combos = all_combos[args.start:args.end]
print(f"Testing {len(chunk_combos)} combinations...")
print()
# Prepare args for parallel processing
test_args = [
(args.start + i, combo, data)
for i, combo in enumerate(chunk_combos)
]
# Run tests in parallel
start_time = time.time()
results = []
with mp.Pool(processes=args.workers) as pool:
for i, result in enumerate(pool.imap_unordered(test_single_config, test_args)):
if result:
results.append(result)
# Progress update every 10 combos
if (i + 1) % 10 == 0:
elapsed = time.time() - start_time
rate = (i + 1) / elapsed
remaining = len(chunk_combos) - (i + 1)
eta = remaining / rate if rate > 0 else 0
print(f"Progress: {i+1}/{len(chunk_combos)} "
f"({rate:.1f} combos/s, ETA: {eta/60:.1f}m)")
# Sort by P&L descending
results.sort(key=lambda x: x['pnl'], reverse=True)
# Save results to CSV
output_path = Path(f"v11_results/{args.chunk_id}_results.csv")
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'w') as f:
# Header
param_cols = sorted(PARAM_GRID.keys())
metric_cols = ['pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades']
header = ','.join(param_cols + metric_cols)
f.write(header + '\n')
# Data rows
for result in results:
param_values = [str(result['params'][k]) for k in param_cols]
metric_values = [
f"{result['pnl']:.1f}",
f"{result['win_rate']:.1f}",
f"{result['profit_factor']:.3f}",
f"{result['max_drawdown']:.1f}",
str(result['total_trades'])
]
row = ','.join(param_values + metric_values)
f.write(row + '\n')
duration = time.time() - start_time
print()
print("=" * 60)
print(f"Chunk {args.chunk_id} COMPLETE")
print(f"Duration: {duration/60:.1f} minutes")
print(f"Rate: {len(chunk_combos)/duration:.2f} combos/second")
print(f"Results saved: {output_path}")
if results:
best = results[0]
print()
print("Best configuration in chunk:")
print(f" P&L: ${best['pnl']:.2f}")
print(f" Win Rate: {best['win_rate']:.1f}%")
print(f" Profit Factor: {best['profit_factor']:.3f}")
print(f" Parameters: {best['params']}")
print("=" * 60)
if __name__ == '__main__':
main()