Files
trading_bot_v4/scripts/run_backtest_sweep_rsi.py
mindesbunister cc56b72df2 fix: Database-first cluster status detection + Stop button clarification
CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST

Changes:
1. app/api/cluster/status/route.ts:
   - Query exploration database before SSH detection
   - If running chunks exist, mark workers 'active' even if SSH fails
   - Override worker status: 'offline' → 'active' when chunks running
   - Log: ' Cluster status: ACTIVE (database shows running chunks)'
   - Database is source of truth, SSH only for supplementary metrics

2. app/cluster/page.tsx:
   - Stop button ALREADY EXISTS (conditionally shown)
   - Shows Start when status='idle', Stop when status='active'
   - No code changes needed - fixed by status detection

Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues

Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
2025-11-30 22:23:01 +01:00

257 lines
9.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Parameter sweep for v9 Money Line with RSI Divergence filter.
Tests same parameter grid as vanilla v9 but adds RSI divergence filtering
to all trades. Compares if divergence improves results across all parameter
combinations or just the baseline.
"""
import sys
import pandas as pd
import numpy as np
from multiprocessing import Pool, cpu_count
from datetime import datetime
import argparse
# Add project root to path
sys.path.insert(0, '/home/icke/traderv4')
from backtester.data_loader import load_csv, DataSlice
from backtester.simulator import simulate_money_line, TradeConfig
from backtester.indicators.money_line import MoneyLineInputs
from pathlib import Path
def calculate_rsi(series, period=14):
"""Calculate RSI indicator."""
delta = series.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def detect_rsi_divergence(df, trade, lookback=20):
"""
Detect RSI divergence for a trade.
Returns True if divergence detected, False otherwise.
Bullish divergence (LONG): Price makes lower low, RSI makes higher low
Bearish divergence (SHORT): Price makes higher high, RSI makes lower high
"""
entry_idx = df.index.get_loc(trade.entry_time)
# Need enough history
if entry_idx < lookback:
return True # Keep trades at start (not enough data to filter)
lookback_data = df.iloc[entry_idx-lookback:entry_idx+1]
if trade.direction == 'long':
# Bullish divergence: price low more recent than RSI low
price_min_idx = lookback_data['close'].idxmin()
price_min_loc = lookback_data.index.get_loc(price_min_idx)
rsi_min_idx = lookback_data['rsi'].idxmin()
rsi_min_loc = lookback_data.index.get_loc(rsi_min_idx)
return price_min_loc > rsi_min_loc
elif trade.direction == 'short':
# Bearish divergence: price high more recent than RSI high
price_max_idx = lookback_data['close'].idxmax()
price_max_loc = lookback_data.index.get_loc(price_max_idx)
rsi_max_idx = lookback_data['rsi'].idxmax()
rsi_max_loc = lookback_data.index.get_loc(rsi_max_idx)
return price_max_loc > rsi_max_loc
return False
def test_params(args):
"""Test a single parameter combination with RSI divergence filter."""
params, df = args
try:
# Create MoneyLineInputs with correct field names
inputs = MoneyLineInputs(
flip_threshold_percent=params['flip_threshold'],
ma_gap_threshold=params['ma_gap'],
momentum_min_adx=params['momentum_adx'],
momentum_long_max_pos=params['momentum_long_pos'],
momentum_short_min_pos=params['momentum_short_pos'],
cooldown_bars=params['cooldown_bars'],
momentum_spacing=params['momentum_spacing'],
momentum_cooldown=params['momentum_cooldown']
)
# Run simulation
trade_config = TradeConfig(position_size=1000.0, max_bars_per_trade=2880)
result = simulate_money_line(df, 'SOLUSDT', inputs=inputs, config=trade_config)
all_trades = result.trades
# Filter for RSI divergence
trades = [t for t in all_trades if detect_rsi_divergence(df, t)]
if not trades:
return None
total_pnl = sum(t.realized_pnl for t in trades)
wins = sum(1 for t in trades if t.realized_pnl > 0)
losses = sum(1 for t in trades if t.realized_pnl < 0)
win_rate = (wins / len(trades) * 100) if trades else 0
avg_win = np.mean([t.realized_pnl for t in trades if t.realized_pnl > 0]) if wins > 0 else 0
avg_loss = np.mean([t.realized_pnl for t in trades if t.realized_pnl < 0]) if losses > 0 else 0
profit_factor = abs(avg_win * wins / (avg_loss * losses)) if (avg_loss != 0 and losses > 0) else 0
max_dd = 0
peak = 0
cumulative = 0
for trade in trades:
cumulative += trade.realized_pnl
if cumulative > peak:
peak = cumulative
dd = peak - cumulative
if dd > max_dd:
max_dd = dd
return {
'flip_threshold': params['flip_threshold'],
'ma_gap': params['ma_gap'],
'momentum_adx': params['momentum_adx'],
'momentum_long_pos': params['momentum_long_pos'],
'momentum_short_pos': params['momentum_short_pos'],
'cooldown_bars': params['cooldown_bars'],
'momentum_spacing': params['momentum_spacing'],
'momentum_cooldown': params['momentum_cooldown'],
'total_pnl': total_pnl,
'num_trades': len(trades),
'win_rate': win_rate,
'profit_factor': profit_factor,
'max_drawdown': max_dd,
'avg_win': avg_win,
'avg_loss': avg_loss
}
except Exception as e:
print(f"Error testing params {params}: {e}")
return None
def main():
parser = argparse.ArgumentParser(description='Run v9 parameter sweep with RSI divergence filter')
parser.add_argument('--workers', type=int, default=None, help='Number of worker processes')
parser.add_argument('--top', type=int, default=None, help='Only save top N results')
args = parser.parse_args()
start_time = datetime.now().timestamp()
workers = args.workers if args.workers else max(1, cpu_count() - 2)
print(f"v9 + RSI DIVERGENCE Parameter Sweep")
print(f"Workers: {workers}")
print(f"Started: {datetime.now()}")
print()
# Load data
print("Loading data...")
data_slice = load_csv(Path('data/solusdt_5m.csv'), 'SOLUSDT', '5m')
df = data_slice.data
print(f"Loaded {len(df)} candles")
# Calculate RSI for divergence detection
print("Calculating RSI...")
df['rsi'] = calculate_rsi(df['close'], 14)
print()
# Parameter grid (same as vanilla v9)
param_grid = {
'flip_threshold': [0.4, 0.5, 0.6, 0.7],
'ma_gap': [0.20, 0.30, 0.40, 0.50],
'momentum_adx': [18, 21, 24, 27],
'momentum_long_pos': [60, 65, 70, 75],
'momentum_short_pos': [20, 25, 30, 35],
'cooldown_bars': [1, 2, 3, 4],
'momentum_spacing': [2, 3, 4, 5],
'momentum_cooldown': [1, 2, 3, 4]
}
# Generate all combinations
from itertools import product
keys = param_grid.keys()
values = param_grid.values()
combinations = [dict(zip(keys, v)) for v in product(*values)]
total_combos = len(combinations)
print(f"Testing {total_combos:,} parameter combinations with RSI divergence filter")
print(f"Parameter grid: {param_grid}")
print()
# Prepare arguments
test_args = [(params, df) for params in combinations]
# Run parallel tests with progress
results = []
completed = 0
with Pool(workers) as pool:
for result in pool.imap_unordered(test_params, test_args):
if result is not None:
results.append(result)
completed += 1
if completed % 100 == 0:
elapsed = (datetime.now().timestamp() - start_time) / 60
rate = completed / elapsed if elapsed > 0 else 0
remaining = (total_combos - completed) / rate if rate > 0 else 0
print(f"Progress: {completed}/{total_combos} ({(completed/total_combos*100):.1f}%) | "
f"Elapsed: {elapsed:.1f}m | Remaining: {remaining:.1f}m | Rate: {rate:.1f}/min")
print()
print(f"Completed {len(results)} valid tests")
if not results:
print("No valid results!")
return
# Sort by total PnL
results.sort(key=lambda x: x['total_pnl'], reverse=True)
# Save results
results_df = pd.DataFrame(results)
if args.top:
results_df = results_df.head(args.top)
print(f"Saving top {args.top} results...")
output_file = 'sweep_v9_rsi_divergence.csv'
results_df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
print()
# Show top 10
print("=" * 80)
print("TOP 10 RESULTS (v9 + RSI Divergence)")
print("=" * 80)
for i, result in enumerate(results_df.head(10).to_dict('records'), 1):
print(f"\n{i}. P&L: ${result['total_pnl']:.2f} | Trades: {result['num_trades']} | WR: {result['win_rate']:.1f}% | PF: {result['profit_factor']:.3f}")
print(f" flip={result['flip_threshold']:.1f}, ma_gap={result['ma_gap']:.2f}, "
f"adx={result['momentum_adx']}, long_pos={result['momentum_long_pos']}, "
f"short_pos={result['momentum_short_pos']}")
print(f" cooldown={result['cooldown_bars']}, spacing={result['momentum_spacing']}, "
f"mom_cd={result['momentum_cooldown']}")
print(f" Max DD: ${result['max_drawdown']:.2f} | Avg Win: ${result['avg_win']:.2f} | Avg Loss: ${result['avg_loss']:.2f}")
print()
print(f"Finished: {datetime.now()}")
if __name__ == '__main__':
start_time = datetime.now().timestamp()
main()