Files
trading_bot_v4/scripts/breaker_backtest.py
mindesbunister ba1fe4433e feat: Indicator score bypass - v11.2 sends SCORE:100 to bypass bot quality scoring
Changes:
- moneyline_v11_2_indicator.pinescript: Alert format now includes SCORE:100
- parse_signal_enhanced.json: Added indicatorScore parsing (SCORE:X regex)
- execute/route.ts: Added hasIndicatorScore bypass (score >= 90 bypasses quality check)
- Money_Machine.json: Both Execute Trade nodes now pass indicatorScore to API

Rationale: v11.2 indicator filters already optimized (2.544 PF, +51.80% return).
Bot-side quality scoring was blocking proven profitable signals (e.g., quality 75).
Now indicator passes SCORE:100, bot respects it and executes immediately.

This completes the signal chain:
Indicator (SCORE:100) → n8n parser (indicatorScore) → workflow → bot endpoint (bypass)
2025-12-26 11:40:12 +01:00

321 lines
11 KiB
Python

import math
import pandas as pd
from dataclasses import dataclass
from typing import List, Optional, Tuple
# Quick breaker v1 backtest on SOLUSDT 5m CSV (backtester/data/solusdt_5m.csv)
# This mirrors the TradingView strategy defaults as closely as possible.
@dataclass
class Trade:
direction: str
entry_time: pd.Timestamp
exit_time: pd.Timestamp
entry_price: float
exit_price: float
pnl: float
@dataclass
class Result:
params: dict
trades: List[Trade]
total_pnl: float
win_rate: float
avg_pnl: float
max_drawdown: float
def ema(series: pd.Series, length: int) -> pd.Series:
return series.ewm(span=length, adjust=False).mean()
def atr(df: pd.DataFrame, length: int) -> pd.Series:
prev_close = df['close'].shift(1)
tr = pd.concat([
df['high'] - df['low'],
(df['high'] - prev_close).abs(),
(df['low'] - prev_close).abs()
], axis=1).max(axis=1)
return tr.ewm(span=length, adjust=False).mean()
def dmi_adx(df: pd.DataFrame, length: int) -> pd.Series:
up_move = df['high'].diff()
down_move = -df['low'].diff()
plus_dm = up_move.where((up_move > down_move) & (up_move > 0), 0.0)
minus_dm = down_move.where((down_move > up_move) & (down_move > 0), 0.0)
tr = atr(df, length)
plus_di = 100 * (plus_dm.ewm(span=length, adjust=False).mean() / tr)
minus_di = 100 * (minus_dm.ewm(span=length, adjust=False).mean() / tr)
dx = (plus_di - minus_di).abs() / (plus_di + minus_di).abs() * 100
return dx.ewm(span=length, adjust=False).mean()
def prepare(df: pd.DataFrame, p: dict) -> pd.DataFrame:
df = df.copy()
df['ema_fast'] = ema(df['close'], p['emaFastLen'])
df['ema_slow'] = ema(df['close'], p['emaSlowLen'])
df['trend_gap'] = (df['ema_fast'] - df['ema_slow']).abs() / df['ema_slow'].replace(0, math.nan) * 100
df['trend_long'] = (df['ema_fast'] > df['ema_slow']) & (df['trend_gap'] >= p['trendBuffer'])
df['trend_short'] = (df['ema_fast'] < df['ema_slow']) & (df['trend_gap'] >= p['trendBuffer'])
basis = df['close'].rolling(p['bbLen']).mean()
dev = p['bbMult'] * df['close'].rolling(p['bbLen']).std()
df['bb_upper'] = basis + dev
df['bb_lower'] = basis - dev
df['width_pct'] = (df['bb_upper'] - df['bb_lower']) / basis.replace(0, math.nan) * 100
low_w = df['width_pct'].rolling(p['squeezeLookback']).min()
high_w = df['width_pct'].rolling(p['squeezeLookback']).max()
denom = (high_w - low_w).replace(0, 1e-4)
df['norm_width'] = (df['width_pct'] - low_w) / denom
df['squeeze'] = df['norm_width'] < p['squeezeThreshold']
df['bars_since_squeeze'] = df['squeeze'][::-1].rolling(p['releaseWindow']).apply(lambda x: any(x), raw=False)[::-1]
# recent squeeze if any squeeze in window
df['recent_squeeze'] = df['bars_since_squeeze'] > 0
df['release'] = (df['norm_width'] > p['releaseThreshold']) & df['recent_squeeze']
vol_ma = df['volume'].rolling(p['volLookback']).mean()
df['volume_ratio'] = df['volume'] / vol_ma.replace(0, math.nan)
df['rsi'] = ta_rsi(df['close'], p['rsiLen'])
df['adx'] = dmi_adx(df, p['adxLen'])
highest = df['high'].rolling(p['priceRangeLookback']).max()
lowest = df['low'].rolling(p['priceRangeLookback']).min()
range_span = (highest - lowest).replace(0, 1e-4)
df['price_pos'] = (df['close'] - lowest) / range_span * 100
return df
def ta_rsi(series: pd.Series, length: int) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0).ewm(alpha=1/length, adjust=False).mean()
loss = (-delta.clip(upper=0)).ewm(alpha=1/length, adjust=False).mean()
rs = gain / loss.replace(0, 1e-10)
return 100 - (100 / (1 + rs))
def backtest(df: pd.DataFrame, p: dict) -> Result:
df = prepare(df, p)
trades: List[Trade] = []
pos_dir = 0 # 1 long, -1 short
entry_price = 0.0
entry_time: Optional[pd.Timestamp] = None
last_signal_bar: Optional[int] = None
pending_dir = 0
pending_price = 0.0
pending_bar: Optional[int] = None
tp_mult = p['tpPct'] / 100.0
sl_mult = p['slPct'] / 100.0
size_usd = 1000.0
closes = df['close'].values
highs = df['high'].values
lows = df['low'].values
index = df.index
for i in range(len(df)):
if math.isnan(closes[i]):
continue
# Exit logic for existing position using intrabar high/low hit detection
if pos_dir != 0:
tp_price = entry_price * (1 + tp_mult) if pos_dir == 1 else entry_price * (1 - tp_mult)
sl_price = entry_price * (1 - sl_mult) if pos_dir == 1 else entry_price * (1 + sl_mult)
hit_tp = highs[i] >= tp_price if pos_dir == 1 else lows[i] <= tp_price
hit_sl = lows[i] <= sl_price if pos_dir == 1 else highs[i] >= sl_price
exit_price = None
if hit_tp and hit_sl:
# If both hit, assume worst-case (SL first) for safety
exit_price = sl_price
elif hit_tp:
exit_price = tp_price
elif hit_sl:
exit_price = sl_price
if exit_price is not None:
qty = size_usd / entry_price
pnl = (exit_price - entry_price) * qty * pos_dir
trades.append(Trade(
direction='long' if pos_dir == 1 else 'short',
entry_time=entry_time,
exit_time=index[i],
entry_price=entry_price,
exit_price=exit_price,
pnl=pnl,
))
pos_dir = 0
entry_price = 0.0
entry_time = None
# Signal generation
trend_long = bool(df['trend_long'].iloc[i])
trend_short = bool(df['trend_short'].iloc[i])
release = bool(df['release'].iloc[i])
vol_ok = df['volume_ratio'].iloc[i] >= p['volSpike']
adx_ok = df['adx'].iloc[i] >= p['adxMin']
rsi_val = df['rsi'].iloc[i]
rsi_long_ok = p['rsiLongMin'] <= rsi_val <= p['rsiLongMax']
rsi_short_ok = p['rsiShortMin'] <= rsi_val <= p['rsiShortMax']
price_pos = df['price_pos'].iloc[i]
price_long_ok = price_pos <= p['longPosMax']
price_short_ok = price_pos >= p['shortPosMin']
close_val = closes[i]
upper = df['bb_upper'].iloc[i]
lower = df['bb_lower'].iloc[i]
breakout_long = trend_long and release and close_val > upper and vol_ok and adx_ok and rsi_long_ok and price_long_ok
breakout_short = trend_short and release and close_val < lower and vol_ok and adx_ok and rsi_short_ok and price_short_ok
cooldown_ok = (last_signal_bar is None) or (i - last_signal_bar > p['cooldownBars'])
if cooldown_ok and pos_dir == 0:
if breakout_long:
pending_dir = 1
pending_price = close_val
pending_bar = i
elif breakout_short:
pending_dir = -1
pending_price = close_val
pending_bar = i
if pending_dir != 0 and pending_bar is not None and i == pending_bar + 1:
if pending_dir == 1:
pass_confirm = (p['confirmPct'] <= 0) or (close_val >= pending_price * (1 + p['confirmPct'] / 100.0))
if pass_confirm:
pos_dir = 1
entry_price = close_val
entry_time = index[i]
last_signal_bar = i
elif pending_dir == -1:
pass_confirm = (p['confirmPct'] <= 0) or (close_val <= pending_price * (1 - p['confirmPct'] / 100.0))
if pass_confirm:
pos_dir = -1
entry_price = close_val
entry_time = index[i]
last_signal_bar = i
pending_dir = 0
pending_price = 0.0
pending_bar = None
# Close any open position at last close
if pos_dir != 0 and entry_time is not None:
qty = size_usd / entry_price
exit_price = closes[-1]
pnl = (exit_price - entry_price) * qty * pos_dir
trades.append(Trade(
direction='long' if pos_dir == 1 else 'short',
entry_time=entry_time,
exit_time=index[-1],
entry_price=entry_price,
exit_price=exit_price,
pnl=pnl,
))
total_pnl = sum(t.pnl for t in trades)
wins = [t for t in trades if t.pnl > 0]
win_rate = (len(wins) / len(trades)) if trades else 0.0
avg_pnl = total_pnl / len(trades) if trades else 0.0
# simple equity curve dd
equity = 0.0
peak = 0.0
max_dd = 0.0
for t in trades:
equity += t.pnl
peak = max(peak, equity)
max_dd = min(max_dd, equity - peak)
return Result(
params=p,
trades=trades,
total_pnl=total_pnl,
win_rate=win_rate,
avg_pnl=avg_pnl,
max_drawdown=max_dd,
)
from multiprocessing import Pool
from itertools import product
def _run_single(args):
df, p = args
return backtest(df, p)
def run_grid(df: pd.DataFrame) -> Result:
grid = {
'confirmPct': [0.15, 0.20, 0.25, 0.30],
'adxMin': [12, 14, 18],
'volSpike': [1.0, 1.2, 1.4],
'releaseThreshold': [0.30, 0.35, 0.40],
'rsiLongMax': [66, 68, 70],
'shortPosMin': [10, 15, 20],
'longPosMax': [80, 85, 90],
}
base = dict(
emaFastLen=50,
emaSlowLen=200,
trendBuffer=0.10,
bbLen=20,
bbMult=2.0,
squeezeLookback=120,
squeezeThreshold=0.25,
releaseThreshold=0.35,
releaseWindow=30,
confirmPct=0.25,
volLookback=20,
volSpike=1.2,
adxLen=14,
adxMin=18,
rsiLen=14,
rsiLongMin=48,
rsiLongMax=68,
rsiShortMin=32,
rsiShortMax=60,
priceRangeLookback=100,
longPosMax=88,
shortPosMin=12,
cooldownBars=5,
tpPct=1.2,
slPct=0.6,
)
# Build all param combos
keys = list(grid.keys())
combos = list(product(*[grid[k] for k in keys]))
print(f"Testing {len(combos)} parameter combinations on 4 cores...")
tasks = []
for combo in combos:
p = base.copy()
for i, k in enumerate(keys):
p[k] = combo[i]
tasks.append((df, p))
with Pool(4) as pool:
results = pool.map(_run_single, tasks)
best = max(results, key=lambda r: r.total_pnl)
return best
def main():
df = pd.read_csv('backtester/data/solusdt_5m.csv', parse_dates=['timestamp']).set_index('timestamp')
best = run_grid(df)
print('Best total PnL: $%.2f' % best.total_pnl)
print('Win rate: %.2f%%' % (best.win_rate * 100))
print('Trades:', len(best.trades))
print('Avg PnL: $%.2f' % best.avg_pnl)
print('Max drawdown: $%.2f' % best.max_drawdown)
print('Params:')
for k, v in best.params.items():
print(f' {k}: {v}')
if __name__ == '__main__':
main()