import math import pandas as pd from dataclasses import dataclass from typing import List, Optional, Tuple # Quick breaker v1 backtest on SOLUSDT 5m CSV (backtester/data/solusdt_5m.csv) # This mirrors the TradingView strategy defaults as closely as possible. @dataclass class Trade: direction: str entry_time: pd.Timestamp exit_time: pd.Timestamp entry_price: float exit_price: float pnl: float @dataclass class Result: params: dict trades: List[Trade] total_pnl: float win_rate: float avg_pnl: float max_drawdown: float def ema(series: pd.Series, length: int) -> pd.Series: return series.ewm(span=length, adjust=False).mean() def atr(df: pd.DataFrame, length: int) -> pd.Series: prev_close = df['close'].shift(1) tr = pd.concat([ df['high'] - df['low'], (df['high'] - prev_close).abs(), (df['low'] - prev_close).abs() ], axis=1).max(axis=1) return tr.ewm(span=length, adjust=False).mean() def dmi_adx(df: pd.DataFrame, length: int) -> pd.Series: up_move = df['high'].diff() down_move = -df['low'].diff() plus_dm = up_move.where((up_move > down_move) & (up_move > 0), 0.0) minus_dm = down_move.where((down_move > up_move) & (down_move > 0), 0.0) tr = atr(df, length) plus_di = 100 * (plus_dm.ewm(span=length, adjust=False).mean() / tr) minus_di = 100 * (minus_dm.ewm(span=length, adjust=False).mean() / tr) dx = (plus_di - minus_di).abs() / (plus_di + minus_di).abs() * 100 return dx.ewm(span=length, adjust=False).mean() def prepare(df: pd.DataFrame, p: dict) -> pd.DataFrame: df = df.copy() df['ema_fast'] = ema(df['close'], p['emaFastLen']) df['ema_slow'] = ema(df['close'], p['emaSlowLen']) df['trend_gap'] = (df['ema_fast'] - df['ema_slow']).abs() / df['ema_slow'].replace(0, math.nan) * 100 df['trend_long'] = (df['ema_fast'] > df['ema_slow']) & (df['trend_gap'] >= p['trendBuffer']) df['trend_short'] = (df['ema_fast'] < df['ema_slow']) & (df['trend_gap'] >= p['trendBuffer']) basis = df['close'].rolling(p['bbLen']).mean() dev = p['bbMult'] * df['close'].rolling(p['bbLen']).std() df['bb_upper'] = basis + dev df['bb_lower'] = basis - dev df['width_pct'] = (df['bb_upper'] - df['bb_lower']) / basis.replace(0, math.nan) * 100 low_w = df['width_pct'].rolling(p['squeezeLookback']).min() high_w = df['width_pct'].rolling(p['squeezeLookback']).max() denom = (high_w - low_w).replace(0, 1e-4) df['norm_width'] = (df['width_pct'] - low_w) / denom df['squeeze'] = df['norm_width'] < p['squeezeThreshold'] df['bars_since_squeeze'] = df['squeeze'][::-1].rolling(p['releaseWindow']).apply(lambda x: any(x), raw=False)[::-1] # recent squeeze if any squeeze in window df['recent_squeeze'] = df['bars_since_squeeze'] > 0 df['release'] = (df['norm_width'] > p['releaseThreshold']) & df['recent_squeeze'] vol_ma = df['volume'].rolling(p['volLookback']).mean() df['volume_ratio'] = df['volume'] / vol_ma.replace(0, math.nan) df['rsi'] = ta_rsi(df['close'], p['rsiLen']) df['adx'] = dmi_adx(df, p['adxLen']) highest = df['high'].rolling(p['priceRangeLookback']).max() lowest = df['low'].rolling(p['priceRangeLookback']).min() range_span = (highest - lowest).replace(0, 1e-4) df['price_pos'] = (df['close'] - lowest) / range_span * 100 return df def ta_rsi(series: pd.Series, length: int) -> pd.Series: delta = series.diff() gain = delta.clip(lower=0).ewm(alpha=1/length, adjust=False).mean() loss = (-delta.clip(upper=0)).ewm(alpha=1/length, adjust=False).mean() rs = gain / loss.replace(0, 1e-10) return 100 - (100 / (1 + rs)) def backtest(df: pd.DataFrame, p: dict) -> Result: df = prepare(df, p) trades: List[Trade] = [] pos_dir = 0 # 1 long, -1 short entry_price = 0.0 entry_time: Optional[pd.Timestamp] = None last_signal_bar: Optional[int] = None pending_dir = 0 pending_price = 0.0 pending_bar: Optional[int] = None tp_mult = p['tpPct'] / 100.0 sl_mult = p['slPct'] / 100.0 size_usd = 1000.0 closes = df['close'].values highs = df['high'].values lows = df['low'].values index = df.index for i in range(len(df)): if math.isnan(closes[i]): continue # Exit logic for existing position using intrabar high/low hit detection if pos_dir != 0: tp_price = entry_price * (1 + tp_mult) if pos_dir == 1 else entry_price * (1 - tp_mult) sl_price = entry_price * (1 - sl_mult) if pos_dir == 1 else entry_price * (1 + sl_mult) hit_tp = highs[i] >= tp_price if pos_dir == 1 else lows[i] <= tp_price hit_sl = lows[i] <= sl_price if pos_dir == 1 else highs[i] >= sl_price exit_price = None if hit_tp and hit_sl: # If both hit, assume worst-case (SL first) for safety exit_price = sl_price elif hit_tp: exit_price = tp_price elif hit_sl: exit_price = sl_price if exit_price is not None: qty = size_usd / entry_price pnl = (exit_price - entry_price) * qty * pos_dir trades.append(Trade( direction='long' if pos_dir == 1 else 'short', entry_time=entry_time, exit_time=index[i], entry_price=entry_price, exit_price=exit_price, pnl=pnl, )) pos_dir = 0 entry_price = 0.0 entry_time = None # Signal generation trend_long = bool(df['trend_long'].iloc[i]) trend_short = bool(df['trend_short'].iloc[i]) release = bool(df['release'].iloc[i]) vol_ok = df['volume_ratio'].iloc[i] >= p['volSpike'] adx_ok = df['adx'].iloc[i] >= p['adxMin'] rsi_val = df['rsi'].iloc[i] rsi_long_ok = p['rsiLongMin'] <= rsi_val <= p['rsiLongMax'] rsi_short_ok = p['rsiShortMin'] <= rsi_val <= p['rsiShortMax'] price_pos = df['price_pos'].iloc[i] price_long_ok = price_pos <= p['longPosMax'] price_short_ok = price_pos >= p['shortPosMin'] close_val = closes[i] upper = df['bb_upper'].iloc[i] lower = df['bb_lower'].iloc[i] breakout_long = trend_long and release and close_val > upper and vol_ok and adx_ok and rsi_long_ok and price_long_ok breakout_short = trend_short and release and close_val < lower and vol_ok and adx_ok and rsi_short_ok and price_short_ok cooldown_ok = (last_signal_bar is None) or (i - last_signal_bar > p['cooldownBars']) if cooldown_ok and pos_dir == 0: if breakout_long: pending_dir = 1 pending_price = close_val pending_bar = i elif breakout_short: pending_dir = -1 pending_price = close_val pending_bar = i if pending_dir != 0 and pending_bar is not None and i == pending_bar + 1: if pending_dir == 1: pass_confirm = (p['confirmPct'] <= 0) or (close_val >= pending_price * (1 + p['confirmPct'] / 100.0)) if pass_confirm: pos_dir = 1 entry_price = close_val entry_time = index[i] last_signal_bar = i elif pending_dir == -1: pass_confirm = (p['confirmPct'] <= 0) or (close_val <= pending_price * (1 - p['confirmPct'] / 100.0)) if pass_confirm: pos_dir = -1 entry_price = close_val entry_time = index[i] last_signal_bar = i pending_dir = 0 pending_price = 0.0 pending_bar = None # Close any open position at last close if pos_dir != 0 and entry_time is not None: qty = size_usd / entry_price exit_price = closes[-1] pnl = (exit_price - entry_price) * qty * pos_dir trades.append(Trade( direction='long' if pos_dir == 1 else 'short', entry_time=entry_time, exit_time=index[-1], entry_price=entry_price, exit_price=exit_price, pnl=pnl, )) total_pnl = sum(t.pnl for t in trades) wins = [t for t in trades if t.pnl > 0] win_rate = (len(wins) / len(trades)) if trades else 0.0 avg_pnl = total_pnl / len(trades) if trades else 0.0 # simple equity curve dd equity = 0.0 peak = 0.0 max_dd = 0.0 for t in trades: equity += t.pnl peak = max(peak, equity) max_dd = min(max_dd, equity - peak) return Result( params=p, trades=trades, total_pnl=total_pnl, win_rate=win_rate, avg_pnl=avg_pnl, max_drawdown=max_dd, ) from multiprocessing import Pool from itertools import product def _run_single(args): df, p = args return backtest(df, p) def run_grid(df: pd.DataFrame) -> Result: grid = { 'confirmPct': [0.15, 0.20, 0.25, 0.30], 'adxMin': [12, 14, 18], 'volSpike': [1.0, 1.2, 1.4], 'releaseThreshold': [0.30, 0.35, 0.40], 'rsiLongMax': [66, 68, 70], 'shortPosMin': [10, 15, 20], 'longPosMax': [80, 85, 90], } base = dict( emaFastLen=50, emaSlowLen=200, trendBuffer=0.10, bbLen=20, bbMult=2.0, squeezeLookback=120, squeezeThreshold=0.25, releaseThreshold=0.35, releaseWindow=30, confirmPct=0.25, volLookback=20, volSpike=1.2, adxLen=14, adxMin=18, rsiLen=14, rsiLongMin=48, rsiLongMax=68, rsiShortMin=32, rsiShortMax=60, priceRangeLookback=100, longPosMax=88, shortPosMin=12, cooldownBars=5, tpPct=1.2, slPct=0.6, ) # Build all param combos keys = list(grid.keys()) combos = list(product(*[grid[k] for k in keys])) print(f"Testing {len(combos)} parameter combinations on 4 cores...") tasks = [] for combo in combos: p = base.copy() for i, k in enumerate(keys): p[k] = combo[i] tasks.append((df, p)) with Pool(4) as pool: results = pool.map(_run_single, tasks) best = max(results, key=lambda r: r.total_pnl) return best def main(): df = pd.read_csv('backtester/data/solusdt_5m.csv', parse_dates=['timestamp']).set_index('timestamp') best = run_grid(df) print('Best total PnL: $%.2f' % best.total_pnl) print('Win rate: %.2f%%' % (best.win_rate * 100)) print('Trades:', len(best.trades)) print('Avg PnL: $%.2f' % best.avg_pnl) print('Max drawdown: $%.2f' % best.max_drawdown) print('Params:') for k, v in best.params.items(): print(f' {k}: {v}') if __name__ == '__main__': main()