#!/usr/bin/env python3 """Parameter sweep utility for Money Line backtests.""" from __future__ import annotations import argparse from itertools import product from multiprocessing import Pool from pathlib import Path from typing import List, Sequence import sys import time PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.append(str(PROJECT_ROOT)) import pandas as pd from backtester.data_loader import load_csv from backtester.indicators.money_line import MoneyLineInputs from backtester.simulator import TradeConfig, simulate_money_line _DATA_SLICE = None _TRADE_CONFIG = None _GRID_KEYS: Sequence[str] = [] def parse_float_list(value: str) -> List[float]: return [float(item) for item in value.split(",") if item] def parse_int_list(value: str) -> List[int]: return [int(item) for item in value.split(",") if item] def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run Money Line parameter sweeps against a CSV dataset") parser.add_argument("--csv", type=Path, required=True, help="Path to CSV with OHLCV data") parser.add_argument("--symbol", required=True, help="Symbol label (e.g., SOL-PERP)") parser.add_argument("--timeframe", default="5", help="Timeframe label (for reporting only)") parser.add_argument("--start", help="Optional ISO start timestamp filter", default=None) parser.add_argument("--end", help="Optional ISO end timestamp filter", default=None) parser.add_argument("--position-size", type=float, default=8100.0, help="Notional position size per trade") parser.add_argument("--max-bars", type=int, default=288, help="Max bars to hold a trade (default 288 = 24h on 5m)") parser.add_argument("--top", type=int, default=10, help="How many results to show") parser.add_argument( "--output", type=Path, help="Optional CSV to store every combination's metrics", ) parser.add_argument( "--flip-thresholds", default="0.5,0.6,0.7", help="Comma separated flip threshold percentages", ) parser.add_argument( "--ma-gap-thresholds", default="0.25,0.35,0.45", help="Comma separated MA gap thresholds", ) parser.add_argument( "--momentum-adx", default="22,24,26", help="Comma separated ADX minimums for momentum dots", ) parser.add_argument( "--momentum-long-pos", default="65,70", help="Comma separated maximum price position for long momentum entries", ) parser.add_argument( "--momentum-short-pos", default="30,35", help="Comma separated minimum price position for short momentum entries", ) parser.add_argument( "--cooldown-bars", default="2,3,4", help="Comma separated cooldown bar counts between primary flips", ) parser.add_argument( "--momentum-spacing", default="3,4,5", help="Comma separated spacing (bars) between momentum signals", ) parser.add_argument( "--momentum-cooldown", default="2,3", help="Comma separated cooldown (bars) after primary signal before momentum allowed", ) parser.add_argument( "--workers", type=int, default=1, help="Number of worker processes (use >1 for multi-core sweeps)", ) parser.add_argument( "--limit", type=int, help="Optional number of combinations to run (preview mode)", ) return parser def run_sweep(args: argparse.Namespace) -> pd.DataFrame: data_slice = load_csv(args.csv, args.symbol, args.timeframe, start=args.start, end=args.end) trade_config = TradeConfig(position_size=args.position_size, max_bars_per_trade=args.max_bars) grids = { "flip_threshold_percent": parse_float_list(args.flip_thresholds), "ma_gap_threshold": parse_float_list(args.ma_gap_thresholds), "momentum_min_adx": parse_float_list(args.momentum_adx), "momentum_long_max_pos": parse_float_list(args.momentum_long_pos), "momentum_short_min_pos": parse_float_list(args.momentum_short_pos), "cooldown_bars": parse_int_list(args.cooldown_bars), "momentum_spacing": parse_int_list(args.momentum_spacing), "momentum_cooldown": parse_int_list(args.momentum_cooldown), } combos = list(product(*grids.values())) total_combos = len(combos) print(f"Evaluating {total_combos} combinations...") if args.limit is not None and args.limit > 0 and args.limit < total_combos: combos = combos[: args.limit] print(f"Preview mode: running first {len(combos)} combos (out of {total_combos})") keys = list(grids.keys()) worker_count = args.workers if args.workers and args.workers > 0 else 1 total_to_run = len(combos) progress = ProgressBar(total=total_to_run) if total_to_run == 0: print("No combinations to evaluate with current configuration.") return pd.DataFrame() if worker_count <= 1: records = [] for idx, combo in enumerate(combos, start=1): records.append(_evaluate_combo(combo, keys, data_slice, trade_config)) progress.update(idx) else: print(f"Using {worker_count} worker processes") with Pool( processes=worker_count, initializer=_init_worker, initargs=(data_slice, trade_config, keys), ) as pool: records = [] for idx, record in enumerate(pool.imap_unordered(_worker_eval, combos), start=1): records.append(record) progress.update(idx) progress.finish() elapsed = progress.elapsed combos_run = len(combos) avg_time = elapsed / combos_run if combos_run else 0 print( f"Processed {combos_run} combos in {elapsed:.1f}s (avg {avg_time:.2f}s per combo)" ) if combos_run < total_combos: projected = avg_time * total_combos print( f"Estimated full sweep ({total_combos} combos) would take ~{projected/60:.1f} minutes with current settings" ) return pd.DataFrame(records) def _init_worker(data_slice, trade_config, keys): global _DATA_SLICE, _TRADE_CONFIG, _GRID_KEYS _DATA_SLICE = data_slice _TRADE_CONFIG = trade_config _GRID_KEYS = keys def _worker_eval(combo): return _evaluate_combo(combo, _GRID_KEYS, _DATA_SLICE, _TRADE_CONFIG) def _evaluate_combo(combo, keys, data_slice, trade_config): params = dict(zip(keys, combo)) inputs = MoneyLineInputs(**params) result = simulate_money_line(data_slice.data, data_slice.symbol, inputs=inputs, config=trade_config) trades = result.trades pnl = result.total_pnl win_rate = result.win_rate * 100 max_dd = result.max_drawdown avg_trade = result.average_pnl profit_factor = _profit_factor(trades) return { **params, "trades": len(trades), "total_pnl": pnl, "win_rate": win_rate, "avg_pnl": avg_trade, "max_drawdown": max_dd, "profit_factor": profit_factor, } class ProgressBar: def __init__(self, total: int, bar_length: int = 40) -> None: self.total = max(total, 0) self.bar_length = bar_length self.start_time = time.time() self._done = False def update(self, count: int) -> None: if self.total <= 0: return count = min(count, self.total) percent = count / self.total filled = int(self.bar_length * percent) bar = "#" * filled + "-" * (self.bar_length - filled) elapsed = time.time() - self.start_time rate = elapsed / count if count else 0 remaining = rate * (self.total - count) if rate else 0 # Format elapsed time elapsed_hours = int(elapsed // 3600) elapsed_mins = int((elapsed % 3600) // 60) elapsed_str = f"{elapsed_hours}h {elapsed_mins}m" if elapsed_hours > 0 else f"{elapsed_mins}m" # Format remaining time remaining_hours = int(remaining // 3600) remaining_mins = int((remaining % 3600) // 60) remaining_str = f"{remaining_hours}h {remaining_mins}m" if remaining_hours > 0 else f"{remaining_mins}m" sys.stdout.write( f"\r[{bar}] {percent*100:5.1f}% ({count}/{self.total}) | " f"Elapsed {elapsed_str:>7} | ETA {remaining_str:>7}" ) sys.stdout.flush() if count >= self.total: self._done = True sys.stdout.write("\n") def finish(self) -> None: if not self._done and self.total > 0: self.update(self.total) @property def elapsed(self) -> float: return time.time() - self.start_time def _profit_factor(trades) -> float: gains = [t.realized_pnl for t in trades if t.realized_pnl > 0] losses = [-t.realized_pnl for t in trades if t.realized_pnl < 0] total_gain = sum(gains) total_loss = sum(losses) if total_loss == 0: return float("inf") if total_gain > 0 else 0.0 return total_gain / total_loss def main() -> int: parser = build_arg_parser() args = parser.parse_args() df = run_sweep(args) df = df.sort_values("total_pnl", ascending=False) top_n = df.head(args.top) print("\n=== TOP RESULTS ===") for idx, row in top_n.iterrows(): print( f"PnL ${row['total_pnl']:.2f} | Trades {row['trades']} | Win {row['win_rate']:.2f}% | " f"DD ${row['max_drawdown']:.2f} | PF {row['profit_factor']:.2f} | params {row.drop(['trades','total_pnl','win_rate','avg_pnl','max_drawdown','profit_factor']).to_dict()}" ) if args.output: args.output.parent.mkdir(parents=True, exist_ok=True) df.to_csv(args.output, index=False) print(f"\nSaved all {len(df)} results to {args.output}") return 0 if __name__ == "__main__": raise SystemExit(main())