from __future__ import annotations from dataclasses import dataclass from typing import Optional try: # Python 3.8+ has Literal in typing, otherwise fall back to typing_extensions from typing import Literal except ImportError: # pragma: no cover - compatibility path for Python 3.7 from typing_extensions import Literal import numpy as np import pandas as pd from backtester.math_utils import calculate_adx, calculate_atr, rma Direction = Literal["long", "short"] @dataclass class MoneyLineInputs: atr_length: int = 14 adx_length: int = 14 rsi_length: int = 14 ma_fast_length: int = 50 ma_slow_length: int = 200 ma_gap_threshold: float = 0.35 flip_threshold_percent: float = 0.6 cooldown_bars: int = 3 momentum_spacing: int = 4 momentum_cooldown: int = 3 momentum_min_adx: float = 23.0 momentum_min_volume_ratio: float = 1.0 momentum_long_max_pos: float = 70.0 momentum_short_min_pos: float = 30.0 @dataclass class MoneyLineSignal: timestamp: pd.Timestamp direction: Direction entry_price: float adx: float atr: float rsi: float volume_ratio: float price_position: float signal_type: Literal["primary", "momentum"] def ema(series: pd.Series, length: int) -> pd.Series: return series.ewm(span=length, adjust=False).mean() def rolling_volume_ratio(volume: pd.Series, length: int = 20) -> pd.Series: avg = volume.rolling(length).mean() return volume / avg def price_position(high: pd.Series, low: pd.Series, close: pd.Series, length: int = 100) -> pd.Series: highest = high.rolling(length).max() lowest = low.rolling(length).min() return 100.0 * (close - lowest) / (highest - lowest) def money_line_signals(df: pd.DataFrame, inputs: Optional[MoneyLineInputs] = None) -> list[MoneyLineSignal]: if inputs is None: inputs = MoneyLineInputs() data = df.copy() data = data.sort_index() data["ema_fast"] = ema(data["close"], inputs.ma_fast_length) data["ema_slow"] = ema(data["close"], inputs.ma_slow_length) data["rsi"] = rsi(data["close"], inputs.rsi_length) data["atr"] = calculate_atr(data, inputs.atr_length) data["adx"] = calculate_adx(data, inputs.adx_length) data["volume_ratio"] = rolling_volume_ratio(data["volume"]) data["price_position"] = price_position(data["high"], data["low"], data["close"]) ma_gap = 100.0 * (data["ema_fast"] - data["ema_slow"]) / data["close"] ma_gap_score = np.tanh(ma_gap / inputs.ma_gap_threshold) signals: list[MoneyLineSignal] = [] last_direction: Optional[Direction] = None cooldown_remaining = 0 momentum_cooldown = 0 for idx in range(1, len(data)): row = data.iloc[idx] prev = data.iloc[idx - 1] close = row.close fast = row.ema_fast slow = row.ema_slow gap_score = ma_gap_score.iloc[idx] flip_up = prev.close <= prev.ema_fast and close > fast flip_down = prev.close >= prev.ema_fast and close < fast direction: Optional[Direction] = None if flip_up and gap_score > inputs.flip_threshold_percent / 100.0: direction = "long" elif flip_down and gap_score < -inputs.flip_threshold_percent / 100.0: direction = "short" if direction and cooldown_remaining == 0: signals.append( MoneyLineSignal( timestamp=row.name, direction=direction, entry_price=float(close), adx=float(row.adx), atr=float(row.atr), rsi=float(row.rsi), volume_ratio=float(row.volume_ratio), price_position=float(row.price_position), signal_type="primary", ) ) last_direction = direction cooldown_remaining = inputs.cooldown_bars momentum_cooldown = inputs.momentum_cooldown else: cooldown_remaining = max(0, cooldown_remaining - 1) momentum_cooldown = max(0, momentum_cooldown - 1) if ( last_direction and momentum_cooldown == 0 and row.adx >= inputs.momentum_min_adx and row.volume_ratio >= inputs.momentum_min_volume_ratio ): pos = row.price_position if last_direction == "long" and pos <= inputs.momentum_long_max_pos: signals.append( MoneyLineSignal( timestamp=row.name, direction="long", entry_price=float(close), adx=float(row.adx), atr=float(row.atr), rsi=float(row.rsi), volume_ratio=float(row.volume_ratio), price_position=float(row.price_position), signal_type="momentum", ) ) momentum_cooldown = inputs.momentum_spacing elif last_direction == "short" and pos >= inputs.momentum_short_min_pos: signals.append( MoneyLineSignal( timestamp=row.name, direction="short", entry_price=float(close), adx=float(row.adx), atr=float(row.atr), rsi=float(row.rsi), volume_ratio=float(row.volume_ratio), price_position=float(row.price_position), signal_type="momentum", ) ) momentum_cooldown = inputs.momentum_spacing return signals def rsi(series: pd.Series, length: int) -> pd.Series: delta = series.diff() gain = np.where(delta > 0, delta, 0.0) loss = np.where(delta < 0, -delta, 0.0) avg_gain = rma(pd.Series(gain), length) avg_loss = rma(pd.Series(loss), length) rs = avg_gain / avg_loss.replace(0, np.nan) rsi_series = 100 - (100 / (1 + rs)) return rsi_series.fillna(50.0)