Files
trading_bot_v4/backtester/indicators/money_line.py
mindesbunister cc56b72df2 fix: Database-first cluster status detection + Stop button clarification
CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST

Changes:
1. app/api/cluster/status/route.ts:
   - Query exploration database before SSH detection
   - If running chunks exist, mark workers 'active' even if SSH fails
   - Override worker status: 'offline' → 'active' when chunks running
   - Log: ' Cluster status: ACTIVE (database shows running chunks)'
   - Database is source of truth, SSH only for supplementary metrics

2. app/cluster/page.tsx:
   - Stop button ALREADY EXISTS (conditionally shown)
   - Shows Start when status='idle', Stop when status='active'
   - No code changes needed - fixed by status detection

Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues

Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
2025-11-30 22:23:01 +01:00

182 lines
6.3 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
try: # Python 3.8+ has Literal in typing, otherwise fall back to typing_extensions
from typing import Literal
except ImportError: # pragma: no cover - compatibility path for Python 3.7
from typing_extensions import Literal
import numpy as np
import pandas as pd
from backtester.math_utils import calculate_adx, calculate_atr, rma
Direction = Literal["long", "short"]
@dataclass
class MoneyLineInputs:
atr_length: int = 14
adx_length: int = 14
rsi_length: int = 14
ma_fast_length: int = 50
ma_slow_length: int = 200
ma_gap_threshold: float = 0.35
flip_threshold_percent: float = 0.6
cooldown_bars: int = 3
momentum_spacing: int = 4
momentum_cooldown: int = 3
momentum_min_adx: float = 23.0
momentum_min_volume_ratio: float = 1.0
momentum_long_max_pos: float = 70.0
momentum_short_min_pos: float = 30.0
@dataclass
class MoneyLineSignal:
timestamp: pd.Timestamp
direction: Direction
entry_price: float
adx: float
atr: float
rsi: float
volume_ratio: float
price_position: float
signal_type: Literal["primary", "momentum"]
def ema(series: pd.Series, length: int) -> pd.Series:
return series.ewm(span=length, adjust=False).mean()
def rolling_volume_ratio(volume: pd.Series, length: int = 20) -> pd.Series:
avg = volume.rolling(length).mean()
return volume / avg
def price_position(high: pd.Series, low: pd.Series, close: pd.Series, length: int = 100) -> pd.Series:
highest = high.rolling(length).max()
lowest = low.rolling(length).min()
return 100.0 * (close - lowest) / (highest - lowest)
def money_line_signals(df: pd.DataFrame, inputs: Optional[MoneyLineInputs] = None) -> list[MoneyLineSignal]:
if inputs is None:
inputs = MoneyLineInputs()
data = df.copy()
data = data.sort_index()
data["ema_fast"] = ema(data["close"], inputs.ma_fast_length)
data["ema_slow"] = ema(data["close"], inputs.ma_slow_length)
data["rsi"] = rsi(data["close"], inputs.rsi_length)
data["atr"] = calculate_atr(data, inputs.atr_length)
data["adx"] = calculate_adx(data, inputs.adx_length)
data["volume_ratio"] = rolling_volume_ratio(data["volume"])
data["price_position"] = price_position(data["high"], data["low"], data["close"])
ma_gap = 100.0 * (data["ema_fast"] - data["ema_slow"]) / data["close"]
ma_gap_score = np.tanh(ma_gap / inputs.ma_gap_threshold)
signals: list[MoneyLineSignal] = []
last_direction: Optional[Direction] = None
cooldown_remaining = 0
momentum_cooldown = 0
# CRITICAL FIX: Skip warmup period to avoid invalid indicators
# EMA(200) needs 200 bars minimum, plus buffer for other indicators
# ADX, volume ratio, price position all need warmup
warmup_bars = 200
for idx in range(max(1, warmup_bars), len(data)):
row = data.iloc[idx]
prev = data.iloc[idx - 1]
close = row.close
fast = row.ema_fast
slow = row.ema_slow
gap_score = ma_gap_score.iloc[idx]
flip_up = prev.close <= prev.ema_fast and close > fast
flip_down = prev.close >= prev.ema_fast and close < fast
direction: Optional[Direction] = None
if flip_up and gap_score > inputs.flip_threshold_percent / 100.0:
direction = "long"
elif flip_down and gap_score < -inputs.flip_threshold_percent / 100.0:
direction = "short"
if direction and cooldown_remaining == 0:
signals.append(
MoneyLineSignal(
timestamp=row.name,
direction=direction,
entry_price=float(close),
adx=float(row.adx),
atr=float(row.atr),
rsi=float(row.rsi),
volume_ratio=float(row.volume_ratio),
price_position=float(row.price_position),
signal_type="primary",
)
)
last_direction = direction
cooldown_remaining = inputs.cooldown_bars
momentum_cooldown = inputs.momentum_cooldown
else:
cooldown_remaining = max(0, cooldown_remaining - 1)
momentum_cooldown = max(0, momentum_cooldown - 1)
if (
last_direction
and momentum_cooldown == 0
and row.adx >= inputs.momentum_min_adx
and row.volume_ratio >= inputs.momentum_min_volume_ratio
):
pos = row.price_position
if last_direction == "long" and pos <= inputs.momentum_long_max_pos:
signals.append(
MoneyLineSignal(
timestamp=row.name,
direction="long",
entry_price=float(close),
adx=float(row.adx),
atr=float(row.atr),
rsi=float(row.rsi),
volume_ratio=float(row.volume_ratio),
price_position=float(row.price_position),
signal_type="momentum",
)
)
momentum_cooldown = inputs.momentum_spacing
elif last_direction == "short" and pos >= inputs.momentum_short_min_pos:
signals.append(
MoneyLineSignal(
timestamp=row.name,
direction="short",
entry_price=float(close),
adx=float(row.adx),
atr=float(row.atr),
rsi=float(row.rsi),
volume_ratio=float(row.volume_ratio),
price_position=float(row.price_position),
signal_type="momentum",
)
)
momentum_cooldown = inputs.momentum_spacing
return signals
def rsi(series: pd.Series, length: int) -> pd.Series:
delta = series.diff()
gain = np.where(delta > 0, delta, 0.0)
loss = np.where(delta < 0, -delta, 0.0)
avg_gain = rma(pd.Series(gain), length)
avg_loss = rma(pd.Series(loss), length)
rs = avg_gain / avg_loss.replace(0, np.nan)
rsi_series = 100 - (100 / (1 + rs))
return rsi_series.fillna(50.0)