CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST
Changes:
1. app/api/cluster/status/route.ts:
- Query exploration database before SSH detection
- If running chunks exist, mark workers 'active' even if SSH fails
- Override worker status: 'offline' → 'active' when chunks running
- Log: '✅ Cluster status: ACTIVE (database shows running chunks)'
- Database is source of truth, SSH only for supplementary metrics
2. app/cluster/page.tsx:
- Stop button ALREADY EXISTS (conditionally shown)
- Shows Start when status='idle', Stop when status='active'
- No code changes needed - fixed by status detection
Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues
Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
257 lines
9.2 KiB
Python
Executable File
257 lines
9.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Parameter sweep for v9 Money Line with RSI Divergence filter.
|
|
|
|
Tests same parameter grid as vanilla v9 but adds RSI divergence filtering
|
|
to all trades. Compares if divergence improves results across all parameter
|
|
combinations or just the baseline.
|
|
"""
|
|
|
|
import sys
|
|
import pandas as pd
|
|
import numpy as np
|
|
from multiprocessing import Pool, cpu_count
|
|
from datetime import datetime
|
|
import argparse
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, '/home/icke/traderv4')
|
|
|
|
from backtester.data_loader import load_csv, DataSlice
|
|
from backtester.simulator import simulate_money_line, TradeConfig
|
|
from backtester.indicators.money_line import MoneyLineInputs
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
def calculate_rsi(series, period=14):
|
|
"""Calculate RSI indicator."""
|
|
delta = series.diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
|
rs = gain / loss
|
|
rsi = 100 - (100 / (1 + rs))
|
|
return rsi
|
|
|
|
|
|
def detect_rsi_divergence(df, trade, lookback=20):
|
|
"""
|
|
Detect RSI divergence for a trade.
|
|
|
|
Returns True if divergence detected, False otherwise.
|
|
|
|
Bullish divergence (LONG): Price makes lower low, RSI makes higher low
|
|
Bearish divergence (SHORT): Price makes higher high, RSI makes lower high
|
|
"""
|
|
entry_idx = df.index.get_loc(trade.entry_time)
|
|
|
|
# Need enough history
|
|
if entry_idx < lookback:
|
|
return True # Keep trades at start (not enough data to filter)
|
|
|
|
lookback_data = df.iloc[entry_idx-lookback:entry_idx+1]
|
|
|
|
if trade.direction == 'long':
|
|
# Bullish divergence: price low more recent than RSI low
|
|
price_min_idx = lookback_data['close'].idxmin()
|
|
price_min_loc = lookback_data.index.get_loc(price_min_idx)
|
|
rsi_min_idx = lookback_data['rsi'].idxmin()
|
|
rsi_min_loc = lookback_data.index.get_loc(rsi_min_idx)
|
|
|
|
return price_min_loc > rsi_min_loc
|
|
|
|
elif trade.direction == 'short':
|
|
# Bearish divergence: price high more recent than RSI high
|
|
price_max_idx = lookback_data['close'].idxmax()
|
|
price_max_loc = lookback_data.index.get_loc(price_max_idx)
|
|
rsi_max_idx = lookback_data['rsi'].idxmax()
|
|
rsi_max_loc = lookback_data.index.get_loc(rsi_max_idx)
|
|
|
|
return price_max_loc > rsi_max_loc
|
|
|
|
return False
|
|
|
|
|
|
def test_params(args):
|
|
"""Test a single parameter combination with RSI divergence filter."""
|
|
params, df = args
|
|
|
|
try:
|
|
# Create MoneyLineInputs with correct field names
|
|
inputs = MoneyLineInputs(
|
|
flip_threshold_percent=params['flip_threshold'],
|
|
ma_gap_threshold=params['ma_gap'],
|
|
momentum_min_adx=params['momentum_adx'],
|
|
momentum_long_max_pos=params['momentum_long_pos'],
|
|
momentum_short_min_pos=params['momentum_short_pos'],
|
|
cooldown_bars=params['cooldown_bars'],
|
|
momentum_spacing=params['momentum_spacing'],
|
|
momentum_cooldown=params['momentum_cooldown']
|
|
)
|
|
|
|
# Run simulation
|
|
trade_config = TradeConfig(position_size=1000.0, max_bars_per_trade=2880)
|
|
result = simulate_money_line(df, 'SOLUSDT', inputs=inputs, config=trade_config)
|
|
all_trades = result.trades
|
|
|
|
# Filter for RSI divergence
|
|
trades = [t for t in all_trades if detect_rsi_divergence(df, t)]
|
|
|
|
if not trades:
|
|
return None
|
|
|
|
total_pnl = sum(t.realized_pnl for t in trades)
|
|
wins = sum(1 for t in trades if t.realized_pnl > 0)
|
|
losses = sum(1 for t in trades if t.realized_pnl < 0)
|
|
win_rate = (wins / len(trades) * 100) if trades else 0
|
|
|
|
avg_win = np.mean([t.realized_pnl for t in trades if t.realized_pnl > 0]) if wins > 0 else 0
|
|
avg_loss = np.mean([t.realized_pnl for t in trades if t.realized_pnl < 0]) if losses > 0 else 0
|
|
profit_factor = abs(avg_win * wins / (avg_loss * losses)) if (avg_loss != 0 and losses > 0) else 0
|
|
|
|
max_dd = 0
|
|
peak = 0
|
|
cumulative = 0
|
|
for trade in trades:
|
|
cumulative += trade.realized_pnl
|
|
if cumulative > peak:
|
|
peak = cumulative
|
|
dd = peak - cumulative
|
|
if dd > max_dd:
|
|
max_dd = dd
|
|
|
|
return {
|
|
'flip_threshold': params['flip_threshold'],
|
|
'ma_gap': params['ma_gap'],
|
|
'momentum_adx': params['momentum_adx'],
|
|
'momentum_long_pos': params['momentum_long_pos'],
|
|
'momentum_short_pos': params['momentum_short_pos'],
|
|
'cooldown_bars': params['cooldown_bars'],
|
|
'momentum_spacing': params['momentum_spacing'],
|
|
'momentum_cooldown': params['momentum_cooldown'],
|
|
'total_pnl': total_pnl,
|
|
'num_trades': len(trades),
|
|
'win_rate': win_rate,
|
|
'profit_factor': profit_factor,
|
|
'max_drawdown': max_dd,
|
|
'avg_win': avg_win,
|
|
'avg_loss': avg_loss
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error testing params {params}: {e}")
|
|
return None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Run v9 parameter sweep with RSI divergence filter')
|
|
parser.add_argument('--workers', type=int, default=None, help='Number of worker processes')
|
|
parser.add_argument('--top', type=int, default=None, help='Only save top N results')
|
|
args = parser.parse_args()
|
|
|
|
start_time = datetime.now().timestamp()
|
|
workers = args.workers if args.workers else max(1, cpu_count() - 2)
|
|
|
|
print(f"v9 + RSI DIVERGENCE Parameter Sweep")
|
|
print(f"Workers: {workers}")
|
|
print(f"Started: {datetime.now()}")
|
|
print()
|
|
|
|
# Load data
|
|
print("Loading data...")
|
|
data_slice = load_csv(Path('data/solusdt_5m.csv'), 'SOLUSDT', '5m')
|
|
df = data_slice.data
|
|
print(f"Loaded {len(df)} candles")
|
|
|
|
# Calculate RSI for divergence detection
|
|
print("Calculating RSI...")
|
|
df['rsi'] = calculate_rsi(df['close'], 14)
|
|
print()
|
|
|
|
# Parameter grid (same as vanilla v9)
|
|
param_grid = {
|
|
'flip_threshold': [0.4, 0.5, 0.6, 0.7],
|
|
'ma_gap': [0.20, 0.30, 0.40, 0.50],
|
|
'momentum_adx': [18, 21, 24, 27],
|
|
'momentum_long_pos': [60, 65, 70, 75],
|
|
'momentum_short_pos': [20, 25, 30, 35],
|
|
'cooldown_bars': [1, 2, 3, 4],
|
|
'momentum_spacing': [2, 3, 4, 5],
|
|
'momentum_cooldown': [1, 2, 3, 4]
|
|
}
|
|
|
|
# Generate all combinations
|
|
from itertools import product
|
|
keys = param_grid.keys()
|
|
values = param_grid.values()
|
|
combinations = [dict(zip(keys, v)) for v in product(*values)]
|
|
|
|
total_combos = len(combinations)
|
|
print(f"Testing {total_combos:,} parameter combinations with RSI divergence filter")
|
|
print(f"Parameter grid: {param_grid}")
|
|
print()
|
|
|
|
# Prepare arguments
|
|
test_args = [(params, df) for params in combinations]
|
|
|
|
# Run parallel tests with progress
|
|
results = []
|
|
completed = 0
|
|
|
|
with Pool(workers) as pool:
|
|
for result in pool.imap_unordered(test_params, test_args):
|
|
if result is not None:
|
|
results.append(result)
|
|
|
|
completed += 1
|
|
if completed % 100 == 0:
|
|
elapsed = (datetime.now().timestamp() - start_time) / 60
|
|
rate = completed / elapsed if elapsed > 0 else 0
|
|
remaining = (total_combos - completed) / rate if rate > 0 else 0
|
|
|
|
print(f"Progress: {completed}/{total_combos} ({(completed/total_combos*100):.1f}%) | "
|
|
f"Elapsed: {elapsed:.1f}m | Remaining: {remaining:.1f}m | Rate: {rate:.1f}/min")
|
|
|
|
print()
|
|
print(f"Completed {len(results)} valid tests")
|
|
|
|
if not results:
|
|
print("No valid results!")
|
|
return
|
|
|
|
# Sort by total PnL
|
|
results.sort(key=lambda x: x['total_pnl'], reverse=True)
|
|
|
|
# Save results
|
|
results_df = pd.DataFrame(results)
|
|
|
|
if args.top:
|
|
results_df = results_df.head(args.top)
|
|
print(f"Saving top {args.top} results...")
|
|
|
|
output_file = 'sweep_v9_rsi_divergence.csv'
|
|
results_df.to_csv(output_file, index=False)
|
|
print(f"Results saved to {output_file}")
|
|
print()
|
|
|
|
# Show top 10
|
|
print("=" * 80)
|
|
print("TOP 10 RESULTS (v9 + RSI Divergence)")
|
|
print("=" * 80)
|
|
for i, result in enumerate(results_df.head(10).to_dict('records'), 1):
|
|
print(f"\n{i}. P&L: ${result['total_pnl']:.2f} | Trades: {result['num_trades']} | WR: {result['win_rate']:.1f}% | PF: {result['profit_factor']:.3f}")
|
|
print(f" flip={result['flip_threshold']:.1f}, ma_gap={result['ma_gap']:.2f}, "
|
|
f"adx={result['momentum_adx']}, long_pos={result['momentum_long_pos']}, "
|
|
f"short_pos={result['momentum_short_pos']}")
|
|
print(f" cooldown={result['cooldown_bars']}, spacing={result['momentum_spacing']}, "
|
|
f"mom_cd={result['momentum_cooldown']}")
|
|
print(f" Max DD: ${result['max_drawdown']:.2f} | Avg Win: ${result['avg_win']:.2f} | Avg Loss: ${result['avg_loss']:.2f}")
|
|
|
|
print()
|
|
print(f"Finished: {datetime.now()}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
start_time = datetime.now().timestamp()
|
|
main()
|