CRITICAL FIX (Nov 30, 2025):
- Dashboard showed 'idle' despite 22+ worker processes running
- Root cause: SSH-based worker detection timing out
- Solution: Check database for running chunks FIRST
Changes:
1. app/api/cluster/status/route.ts:
- Query exploration database before SSH detection
- If running chunks exist, mark workers 'active' even if SSH fails
- Override worker status: 'offline' → 'active' when chunks running
- Log: '✅ Cluster status: ACTIVE (database shows running chunks)'
- Database is source of truth, SSH only for supplementary metrics
2. app/cluster/page.tsx:
- Stop button ALREADY EXISTS (conditionally shown)
- Shows Start when status='idle', Stop when status='active'
- No code changes needed - fixed by status detection
Result:
- Dashboard now shows 'ACTIVE' with 2 workers (correct)
- Workers show 'active' status (was 'offline')
- Stop button automatically visible when cluster active
- System resilient to SSH timeouts/network issues
Verified:
- Container restarted: Nov 30 21:18 UTC
- API tested: Returns status='active', activeWorkers=2
- Logs confirm: Database-first logic working
- Workers confirmed running: 22+ processes on worker1, workers on worker2
285 lines
9.8 KiB
Python
285 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Parameter sweep utility for Money Line backtests."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
from itertools import product
|
|
from multiprocessing import Pool
|
|
from pathlib import Path
|
|
from typing import List, Sequence
|
|
|
|
import sys
|
|
import time
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.append(str(PROJECT_ROOT))
|
|
|
|
import pandas as pd
|
|
|
|
from backtester.data_loader import load_csv
|
|
from backtester.indicators.money_line import MoneyLineInputs
|
|
from backtester.simulator import TradeConfig, simulate_money_line
|
|
|
|
|
|
_DATA_SLICE = None
|
|
_TRADE_CONFIG = None
|
|
_GRID_KEYS: Sequence[str] = []
|
|
|
|
|
|
def parse_float_list(value: str) -> List[float]:
|
|
return [float(item) for item in value.split(",") if item]
|
|
|
|
|
|
def parse_int_list(value: str) -> List[int]:
|
|
return [int(item) for item in value.split(",") if item]
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Run Money Line parameter sweeps against a CSV dataset")
|
|
parser.add_argument("--csv", type=Path, required=True, help="Path to CSV with OHLCV data")
|
|
parser.add_argument("--symbol", required=True, help="Symbol label (e.g., SOL-PERP)")
|
|
parser.add_argument("--timeframe", default="5", help="Timeframe label (for reporting only)")
|
|
parser.add_argument("--start", help="Optional ISO start timestamp filter", default=None)
|
|
parser.add_argument("--end", help="Optional ISO end timestamp filter", default=None)
|
|
parser.add_argument("--position-size", type=float, default=8100.0, help="Notional position size per trade")
|
|
parser.add_argument("--max-bars", type=int, default=288, help="Max bars to hold a trade (default 288 = 24h on 5m)")
|
|
parser.add_argument("--top", type=int, default=10, help="How many results to show")
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
help="Optional CSV to store every combination's metrics",
|
|
)
|
|
parser.add_argument(
|
|
"--flip-thresholds",
|
|
default="0.5,0.6,0.7",
|
|
help="Comma separated flip threshold percentages",
|
|
)
|
|
parser.add_argument(
|
|
"--ma-gap-thresholds",
|
|
default="0.25,0.35,0.45",
|
|
help="Comma separated MA gap thresholds",
|
|
)
|
|
parser.add_argument(
|
|
"--momentum-adx",
|
|
default="22,24,26",
|
|
help="Comma separated ADX minimums for momentum dots",
|
|
)
|
|
parser.add_argument(
|
|
"--momentum-long-pos",
|
|
default="65,70",
|
|
help="Comma separated maximum price position for long momentum entries",
|
|
)
|
|
parser.add_argument(
|
|
"--momentum-short-pos",
|
|
default="30,35",
|
|
help="Comma separated minimum price position for short momentum entries",
|
|
)
|
|
parser.add_argument(
|
|
"--cooldown-bars",
|
|
default="2,3,4",
|
|
help="Comma separated cooldown bar counts between primary flips",
|
|
)
|
|
parser.add_argument(
|
|
"--momentum-spacing",
|
|
default="3,4,5",
|
|
help="Comma separated spacing (bars) between momentum signals",
|
|
)
|
|
parser.add_argument(
|
|
"--momentum-cooldown",
|
|
default="2,3",
|
|
help="Comma separated cooldown (bars) after primary signal before momentum allowed",
|
|
)
|
|
parser.add_argument(
|
|
"--workers",
|
|
type=int,
|
|
default=1,
|
|
help="Number of worker processes (use >1 for multi-core sweeps)",
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
help="Optional number of combinations to run (preview mode)",
|
|
)
|
|
return parser
|
|
|
|
|
|
def run_sweep(args: argparse.Namespace) -> pd.DataFrame:
|
|
data_slice = load_csv(args.csv, args.symbol, args.timeframe, start=args.start, end=args.end)
|
|
trade_config = TradeConfig(position_size=args.position_size, max_bars_per_trade=args.max_bars)
|
|
|
|
grids = {
|
|
"flip_threshold_percent": parse_float_list(args.flip_thresholds),
|
|
"ma_gap_threshold": parse_float_list(args.ma_gap_thresholds),
|
|
"momentum_min_adx": parse_float_list(args.momentum_adx),
|
|
"momentum_long_max_pos": parse_float_list(args.momentum_long_pos),
|
|
"momentum_short_min_pos": parse_float_list(args.momentum_short_pos),
|
|
"cooldown_bars": parse_int_list(args.cooldown_bars),
|
|
"momentum_spacing": parse_int_list(args.momentum_spacing),
|
|
"momentum_cooldown": parse_int_list(args.momentum_cooldown),
|
|
}
|
|
|
|
combos = list(product(*grids.values()))
|
|
total_combos = len(combos)
|
|
print(f"Evaluating {total_combos} combinations...")
|
|
|
|
if args.limit is not None and args.limit > 0 and args.limit < total_combos:
|
|
combos = combos[: args.limit]
|
|
print(f"Preview mode: running first {len(combos)} combos (out of {total_combos})")
|
|
|
|
keys = list(grids.keys())
|
|
worker_count = args.workers if args.workers and args.workers > 0 else 1
|
|
total_to_run = len(combos)
|
|
progress = ProgressBar(total=total_to_run)
|
|
|
|
if total_to_run == 0:
|
|
print("No combinations to evaluate with current configuration.")
|
|
return pd.DataFrame()
|
|
|
|
if worker_count <= 1:
|
|
records = []
|
|
for idx, combo in enumerate(combos, start=1):
|
|
records.append(_evaluate_combo(combo, keys, data_slice, trade_config))
|
|
progress.update(idx)
|
|
else:
|
|
print(f"Using {worker_count} worker processes")
|
|
with Pool(
|
|
processes=worker_count,
|
|
initializer=_init_worker,
|
|
initargs=(data_slice, trade_config, keys),
|
|
) as pool:
|
|
records = []
|
|
for idx, record in enumerate(pool.imap_unordered(_worker_eval, combos), start=1):
|
|
records.append(record)
|
|
progress.update(idx)
|
|
|
|
progress.finish()
|
|
|
|
elapsed = progress.elapsed
|
|
combos_run = len(combos)
|
|
avg_time = elapsed / combos_run if combos_run else 0
|
|
print(
|
|
f"Processed {combos_run} combos in {elapsed:.1f}s (avg {avg_time:.2f}s per combo)"
|
|
)
|
|
if combos_run < total_combos:
|
|
projected = avg_time * total_combos
|
|
print(
|
|
f"Estimated full sweep ({total_combos} combos) would take ~{projected/60:.1f} minutes with current settings"
|
|
)
|
|
|
|
return pd.DataFrame(records)
|
|
|
|
|
|
def _init_worker(data_slice, trade_config, keys):
|
|
global _DATA_SLICE, _TRADE_CONFIG, _GRID_KEYS
|
|
_DATA_SLICE = data_slice
|
|
_TRADE_CONFIG = trade_config
|
|
_GRID_KEYS = keys
|
|
|
|
|
|
def _worker_eval(combo):
|
|
return _evaluate_combo(combo, _GRID_KEYS, _DATA_SLICE, _TRADE_CONFIG)
|
|
|
|
|
|
def _evaluate_combo(combo, keys, data_slice, trade_config):
|
|
params = dict(zip(keys, combo))
|
|
inputs = MoneyLineInputs(**params)
|
|
result = simulate_money_line(data_slice.data, data_slice.symbol, inputs=inputs, config=trade_config)
|
|
trades = result.trades
|
|
pnl = result.total_pnl
|
|
win_rate = result.win_rate * 100
|
|
max_dd = result.max_drawdown
|
|
avg_trade = result.average_pnl
|
|
profit_factor = _profit_factor(trades)
|
|
return {
|
|
**params,
|
|
"trades": len(trades),
|
|
"total_pnl": pnl,
|
|
"win_rate": win_rate,
|
|
"avg_pnl": avg_trade,
|
|
"max_drawdown": max_dd,
|
|
"profit_factor": profit_factor,
|
|
}
|
|
|
|
|
|
class ProgressBar:
|
|
def __init__(self, total: int, bar_length: int = 40) -> None:
|
|
self.total = max(total, 0)
|
|
self.bar_length = bar_length
|
|
self.start_time = time.time()
|
|
self._done = False
|
|
|
|
def update(self, count: int) -> None:
|
|
if self.total <= 0:
|
|
return
|
|
count = min(count, self.total)
|
|
percent = count / self.total
|
|
filled = int(self.bar_length * percent)
|
|
bar = "#" * filled + "-" * (self.bar_length - filled)
|
|
elapsed = time.time() - self.start_time
|
|
rate = elapsed / count if count else 0
|
|
remaining = rate * (self.total - count) if rate else 0
|
|
|
|
# Format elapsed time
|
|
elapsed_hours = int(elapsed // 3600)
|
|
elapsed_mins = int((elapsed % 3600) // 60)
|
|
elapsed_str = f"{elapsed_hours}h {elapsed_mins}m" if elapsed_hours > 0 else f"{elapsed_mins}m"
|
|
|
|
# Format remaining time
|
|
remaining_hours = int(remaining // 3600)
|
|
remaining_mins = int((remaining % 3600) // 60)
|
|
remaining_str = f"{remaining_hours}h {remaining_mins}m" if remaining_hours > 0 else f"{remaining_mins}m"
|
|
|
|
sys.stdout.write(
|
|
f"\r[{bar}] {percent*100:5.1f}% ({count}/{self.total}) | "
|
|
f"Elapsed {elapsed_str:>7} | ETA {remaining_str:>7}"
|
|
)
|
|
sys.stdout.flush()
|
|
if count >= self.total:
|
|
self._done = True
|
|
sys.stdout.write("\n")
|
|
|
|
def finish(self) -> None:
|
|
if not self._done and self.total > 0:
|
|
self.update(self.total)
|
|
|
|
@property
|
|
def elapsed(self) -> float:
|
|
return time.time() - self.start_time
|
|
|
|
|
|
def _profit_factor(trades) -> float:
|
|
gains = [t.realized_pnl for t in trades if t.realized_pnl > 0]
|
|
losses = [-t.realized_pnl for t in trades if t.realized_pnl < 0]
|
|
total_gain = sum(gains)
|
|
total_loss = sum(losses)
|
|
if total_loss == 0:
|
|
return float("inf") if total_gain > 0 else 0.0
|
|
return total_gain / total_loss
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_arg_parser()
|
|
args = parser.parse_args()
|
|
df = run_sweep(args)
|
|
df = df.sort_values("total_pnl", ascending=False)
|
|
|
|
top_n = df.head(args.top)
|
|
print("\n=== TOP RESULTS ===")
|
|
for idx, row in top_n.iterrows():
|
|
print(
|
|
f"PnL ${row['total_pnl']:.2f} | Trades {row['trades']} | Win {row['win_rate']:.2f}% | "
|
|
f"DD ${row['max_drawdown']:.2f} | PF {row['profit_factor']:.2f} | params {row.drop(['trades','total_pnl','win_rate','avg_pnl','max_drawdown','profit_factor']).to_dict()}"
|
|
)
|
|
|
|
if args.output:
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
df.to_csv(args.output, index=False)
|
|
print(f"\nSaved all {len(df)} results to {args.output}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|