#!/usr/bin/env python3 """ Parameter sweep for v9 Money Line with RSI Divergence filter. Tests same parameter grid as vanilla v9 but adds RSI divergence filtering to all trades. Compares if divergence improves results across all parameter combinations or just the baseline. """ import sys import pandas as pd import numpy as np from multiprocessing import Pool, cpu_count from datetime import datetime import argparse # Add project root to path sys.path.insert(0, '/home/icke/traderv4') from backtester.data_loader import load_csv, DataSlice from backtester.simulator import simulate_money_line, TradeConfig from backtester.indicators.money_line import MoneyLineInputs from pathlib import Path def calculate_rsi(series, period=14): """Calculate RSI indicator.""" delta = series.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi def detect_rsi_divergence(df, trade, lookback=20): """ Detect RSI divergence for a trade. Returns True if divergence detected, False otherwise. Bullish divergence (LONG): Price makes lower low, RSI makes higher low Bearish divergence (SHORT): Price makes higher high, RSI makes lower high """ entry_idx = df.index.get_loc(trade.entry_time) # Need enough history if entry_idx < lookback: return True # Keep trades at start (not enough data to filter) lookback_data = df.iloc[entry_idx-lookback:entry_idx+1] if trade.direction == 'long': # Bullish divergence: price low more recent than RSI low price_min_idx = lookback_data['close'].idxmin() price_min_loc = lookback_data.index.get_loc(price_min_idx) rsi_min_idx = lookback_data['rsi'].idxmin() rsi_min_loc = lookback_data.index.get_loc(rsi_min_idx) return price_min_loc > rsi_min_loc elif trade.direction == 'short': # Bearish divergence: price high more recent than RSI high price_max_idx = lookback_data['close'].idxmax() price_max_loc = lookback_data.index.get_loc(price_max_idx) rsi_max_idx = lookback_data['rsi'].idxmax() rsi_max_loc = lookback_data.index.get_loc(rsi_max_idx) return price_max_loc > rsi_max_loc return False def test_params(args): """Test a single parameter combination with RSI divergence filter.""" params, df = args try: # Create MoneyLineInputs with correct field names inputs = MoneyLineInputs( flip_threshold_percent=params['flip_threshold'], ma_gap_threshold=params['ma_gap'], momentum_min_adx=params['momentum_adx'], momentum_long_max_pos=params['momentum_long_pos'], momentum_short_min_pos=params['momentum_short_pos'], cooldown_bars=params['cooldown_bars'], momentum_spacing=params['momentum_spacing'], momentum_cooldown=params['momentum_cooldown'] ) # Run simulation trade_config = TradeConfig(position_size=1000.0, max_bars_per_trade=2880) result = simulate_money_line(df, 'SOLUSDT', inputs=inputs, config=trade_config) all_trades = result.trades # Filter for RSI divergence trades = [t for t in all_trades if detect_rsi_divergence(df, t)] if not trades: return None total_pnl = sum(t.realized_pnl for t in trades) wins = sum(1 for t in trades if t.realized_pnl > 0) losses = sum(1 for t in trades if t.realized_pnl < 0) win_rate = (wins / len(trades) * 100) if trades else 0 avg_win = np.mean([t.realized_pnl for t in trades if t.realized_pnl > 0]) if wins > 0 else 0 avg_loss = np.mean([t.realized_pnl for t in trades if t.realized_pnl < 0]) if losses > 0 else 0 profit_factor = abs(avg_win * wins / (avg_loss * losses)) if (avg_loss != 0 and losses > 0) else 0 max_dd = 0 peak = 0 cumulative = 0 for trade in trades: cumulative += trade.realized_pnl if cumulative > peak: peak = cumulative dd = peak - cumulative if dd > max_dd: max_dd = dd return { 'flip_threshold': params['flip_threshold'], 'ma_gap': params['ma_gap'], 'momentum_adx': params['momentum_adx'], 'momentum_long_pos': params['momentum_long_pos'], 'momentum_short_pos': params['momentum_short_pos'], 'cooldown_bars': params['cooldown_bars'], 'momentum_spacing': params['momentum_spacing'], 'momentum_cooldown': params['momentum_cooldown'], 'total_pnl': total_pnl, 'num_trades': len(trades), 'win_rate': win_rate, 'profit_factor': profit_factor, 'max_drawdown': max_dd, 'avg_win': avg_win, 'avg_loss': avg_loss } except Exception as e: print(f"Error testing params {params}: {e}") return None def main(): parser = argparse.ArgumentParser(description='Run v9 parameter sweep with RSI divergence filter') parser.add_argument('--workers', type=int, default=None, help='Number of worker processes') parser.add_argument('--top', type=int, default=None, help='Only save top N results') args = parser.parse_args() start_time = datetime.now().timestamp() workers = args.workers if args.workers else max(1, cpu_count() - 2) print(f"v9 + RSI DIVERGENCE Parameter Sweep") print(f"Workers: {workers}") print(f"Started: {datetime.now()}") print() # Load data print("Loading data...") data_slice = load_csv(Path('data/solusdt_5m.csv'), 'SOLUSDT', '5m') df = data_slice.data print(f"Loaded {len(df)} candles") # Calculate RSI for divergence detection print("Calculating RSI...") df['rsi'] = calculate_rsi(df['close'], 14) print() # Parameter grid (same as vanilla v9) param_grid = { 'flip_threshold': [0.4, 0.5, 0.6, 0.7], 'ma_gap': [0.20, 0.30, 0.40, 0.50], 'momentum_adx': [18, 21, 24, 27], 'momentum_long_pos': [60, 65, 70, 75], 'momentum_short_pos': [20, 25, 30, 35], 'cooldown_bars': [1, 2, 3, 4], 'momentum_spacing': [2, 3, 4, 5], 'momentum_cooldown': [1, 2, 3, 4] } # Generate all combinations from itertools import product keys = param_grid.keys() values = param_grid.values() combinations = [dict(zip(keys, v)) for v in product(*values)] total_combos = len(combinations) print(f"Testing {total_combos:,} parameter combinations with RSI divergence filter") print(f"Parameter grid: {param_grid}") print() # Prepare arguments test_args = [(params, df) for params in combinations] # Run parallel tests with progress results = [] completed = 0 with Pool(workers) as pool: for result in pool.imap_unordered(test_params, test_args): if result is not None: results.append(result) completed += 1 if completed % 100 == 0: elapsed = (datetime.now().timestamp() - start_time) / 60 rate = completed / elapsed if elapsed > 0 else 0 remaining = (total_combos - completed) / rate if rate > 0 else 0 print(f"Progress: {completed}/{total_combos} ({(completed/total_combos*100):.1f}%) | " f"Elapsed: {elapsed:.1f}m | Remaining: {remaining:.1f}m | Rate: {rate:.1f}/min") print() print(f"Completed {len(results)} valid tests") if not results: print("No valid results!") return # Sort by total PnL results.sort(key=lambda x: x['total_pnl'], reverse=True) # Save results results_df = pd.DataFrame(results) if args.top: results_df = results_df.head(args.top) print(f"Saving top {args.top} results...") output_file = 'sweep_v9_rsi_divergence.csv' results_df.to_csv(output_file, index=False) print(f"Results saved to {output_file}") print() # Show top 10 print("=" * 80) print("TOP 10 RESULTS (v9 + RSI Divergence)") print("=" * 80) for i, result in enumerate(results_df.head(10).to_dict('records'), 1): print(f"\n{i}. P&L: ${result['total_pnl']:.2f} | Trades: {result['num_trades']} | WR: {result['win_rate']:.1f}% | PF: {result['profit_factor']:.3f}") print(f" flip={result['flip_threshold']:.1f}, ma_gap={result['ma_gap']:.2f}, " f"adx={result['momentum_adx']}, long_pos={result['momentum_long_pos']}, " f"short_pos={result['momentum_short_pos']}") print(f" cooldown={result['cooldown_bars']}, spacing={result['momentum_spacing']}, " f"mom_cd={result['momentum_cooldown']}") print(f" Max DD: ${result['max_drawdown']:.2f} | Avg Win: ${result['avg_win']:.2f} | Avg Loss: ${result['avg_loss']:.2f}") print() print(f"Finished: {datetime.now()}") if __name__ == '__main__': start_time = datetime.now().timestamp() main()