Files
trading_bot_v4/scripts/distributed_v9_advanced_worker.py
mindesbunister 7e1fe1cc30 feat: V9 advanced parameter sweep with MA gap filter (810K configs)
Parameter space expansion:
- Original 15 params: 101K configurations
- NEW: MA gap filter (3 dimensions) = 18× expansion
- Total: ~810,000 configurations across 4 time profiles
- Chunk size: 1,000 configs/chunk = ~810 chunks

MA Gap Filter parameters:
- use_ma_gap: True/False (2 values)
- ma_gap_min_long: -5.0%, 0%, +5.0% (3 values)
- ma_gap_min_short: -5.0%, 0%, +5.0% (3 values)

Implementation:
- money_line_v9.py: Full v9 indicator with MA gap logic
- v9_advanced_worker.py: Chunk processor (1,000 configs)
- v9_advanced_coordinator.py: Work distributor (2 EPYC workers)
- run_v9_advanced_sweep.sh: Startup script (generates + launches)

Infrastructure:
- Uses existing EPYC cluster (64 cores total)
- Worker1: bd-epyc-02 (32 threads)
- Worker2: bd-host01 (32 threads via SSH hop)
- Expected runtime: 70-80 hours
- Database: SQLite (chunk tracking + results)

Goal: Find optimal MA gap thresholds for filtering false breakouts
during MA whipsaw zones while preserving trend entries.
2025-12-01 18:11:47 +01:00

158 lines
5.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Distributed v9 advanced parameter sweep for cluster execution.
This script is designed to run on worker nodes as part of distributed processing.
The coordinator will split the 800K+ configurations into chunks and distribute
them across the cluster.
Expected per-worker throughput: ~300-500 configs/hour
Total runtime: 40-80 hours on 2-worker cluster
"""
import argparse
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
import pandas as pd
from tqdm import tqdm
from backtester.data_loader import load_data
from backtester.indicators.money_line_v9 import MoneyLineV9Inputs, money_line_v9_signals
from backtester.simulator import simulate_money_line
def test_config(params):
"""Test a single configuration."""
# Load data once (cached)
df = load_data("solusdt_5m.csv")
# Create inputs with parameters
inputs = MoneyLineV9Inputs(
# Basic optimized params (FIXED from previous sweep)
confirm_bars=0,
flip_threshold_percent=0.5,
cooldown_bars=3,
adx_min=21,
long_pos_max=75,
short_pos_min=20,
vol_min=1.0,
# ADVANCED OPTIMIZATION PARAMETERS:
atr_period=params['atr_period'],
multiplier=params['multiplier'],
adx_length=params['adx_length'],
rsi_length=params['rsi_length'],
rsi_long_min=params['rsi_long_min'],
rsi_long_max=params['rsi_long_max'],
rsi_short_min=params['rsi_short_min'],
rsi_short_max=params['rsi_short_max'],
vol_max=params['vol_max'],
entry_buffer_atr=params['entry_buffer_atr'],
use_heikin_ashi=params['use_heikin_ashi'],
use_ma_gap_filter=params['use_ma_gap_filter'],
ma_gap_long_min=params['ma_gap_long_min'],
ma_gap_short_max=params['ma_gap_short_max'],
)
try:
# Generate signals
signals = money_line_v9_signals(df, inputs)
# Simulate trades
results = simulate_money_line(df, signals)
return {
'atr_period': params['atr_period'],
'multiplier': params['multiplier'],
'adx_length': params['adx_length'],
'rsi_length': params['rsi_length'],
'rsi_long_min': params['rsi_long_min'],
'rsi_long_max': params['rsi_long_max'],
'rsi_short_min': params['rsi_short_min'],
'rsi_short_max': params['rsi_short_max'],
'vol_max': params['vol_max'],
'entry_buffer_atr': params['entry_buffer_atr'],
'use_heikin_ashi': params['use_heikin_ashi'],
'use_ma_gap_filter': params['use_ma_gap_filter'],
'ma_gap_long_min': params['ma_gap_long_min'],
'ma_gap_short_max': params['ma_gap_short_max'],
'pnl': results['total_pnl'],
'win_rate': results['win_rate'],
'profit_factor': results['profit_factor'],
'max_drawdown': results['max_drawdown'],
'total_trades': results['total_trades'],
}
except Exception as e:
print(f"Error testing config: {e}")
return {
'atr_period': params['atr_period'],
'multiplier': params['multiplier'],
'pnl': 0,
'win_rate': 0,
'profit_factor': 0,
'max_drawdown': 0,
'total_trades': 0,
}
def process_chunk(chunk_file: str, output_file: str):
"""
Process a chunk of parameter configurations.
Args:
chunk_file: CSV file with parameter combinations to test
output_file: CSV file to save results
"""
print(f"Loading chunk: {chunk_file}")
chunk_df = pd.read_csv(chunk_file)
print(f"Chunk size: {len(chunk_df)} configurations")
print()
results = []
for idx, row in tqdm(chunk_df.iterrows(), total=len(chunk_df), desc="Testing configs"):
params = row.to_dict()
result = test_config(params)
results.append(result)
# Save results
results_df = pd.DataFrame(results)
results_df.to_csv(output_file, index=False)
print(f"\nResults saved to: {output_file}")
# Print summary
print()
print("=" * 80)
print("CHUNK COMPLETE")
print("=" * 80)
print()
print(f"Configurations tested: {len(results_df)}")
print(f"Best PnL: ${results_df['pnl'].max():.2f}")
print(f"Mean PnL: ${results_df['pnl'].mean():.2f}")
print(f"Configurations with trades: {(results_df['total_trades'] > 0).sum()}")
print()
def main():
parser = argparse.ArgumentParser(description="Process v9 advanced sweep chunk")
parser.add_argument("chunk_file", help="Input CSV file with parameter combinations")
parser.add_argument("output_file", help="Output CSV file for results")
args = parser.parse_args()
print("=" * 80)
print("v9 ADVANCED SWEEP - CHUNK PROCESSOR")
print("=" * 80)
print()
process_chunk(args.chunk_file, args.output_file)
if __name__ == "__main__":
main()