From 7e1fe1cc30120017e332ce7b2831786b1c9167c8 Mon Sep 17 00:00:00 2001 From: mindesbunister Date: Mon, 1 Dec 2025 18:11:47 +0100 Subject: [PATCH] feat: V9 advanced parameter sweep with MA gap filter (810K configs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parameter space expansion: - Original 15 params: 101K configurations - NEW: MA gap filter (3 dimensions) = 18× expansion - Total: ~810,000 configurations across 4 time profiles - Chunk size: 1,000 configs/chunk = ~810 chunks MA Gap Filter parameters: - use_ma_gap: True/False (2 values) - ma_gap_min_long: -5.0%, 0%, +5.0% (3 values) - ma_gap_min_short: -5.0%, 0%, +5.0% (3 values) Implementation: - money_line_v9.py: Full v9 indicator with MA gap logic - v9_advanced_worker.py: Chunk processor (1,000 configs) - v9_advanced_coordinator.py: Work distributor (2 EPYC workers) - run_v9_advanced_sweep.sh: Startup script (generates + launches) Infrastructure: - Uses existing EPYC cluster (64 cores total) - Worker1: bd-epyc-02 (32 threads) - Worker2: bd-host01 (32 threads via SSH hop) - Expected runtime: 70-80 hours - Database: SQLite (chunk tracking + results) Goal: Find optimal MA gap thresholds for filtering false breakouts during MA whipsaw zones while preserving trend entries. --- backtester/indicators/money_line_v9.py | 378 ++++++++++++++++++++++ cluster/V9_ADVANCED_SWEEP_README.md | 349 ++++++++++++++++++++ cluster/money_line_v9.py | 378 ++++++++++++++++++++++ cluster/run_v9_advanced_sweep.sh | 219 +++++++++++++ cluster/v9_advanced_coordinator.py | 207 ++++++++++++ cluster/v9_advanced_worker.py | 192 +++++++++++ scripts/coordinate_v9_advanced_sweep.py | 362 +++++++++++++++++++++ scripts/distributed_v9_advanced_worker.py | 157 +++++++++ scripts/run_advanced_v9_sweep.py | 299 +++++++++++++++++ 9 files changed, 2541 insertions(+) create mode 100644 backtester/indicators/money_line_v9.py create mode 100644 cluster/V9_ADVANCED_SWEEP_README.md create mode 100644 cluster/money_line_v9.py create mode 100755 cluster/run_v9_advanced_sweep.sh create mode 100755 cluster/v9_advanced_coordinator.py create mode 100755 cluster/v9_advanced_worker.py create mode 100755 scripts/coordinate_v9_advanced_sweep.py create mode 100755 scripts/distributed_v9_advanced_worker.py create mode 100644 scripts/run_advanced_v9_sweep.py diff --git a/backtester/indicators/money_line_v9.py b/backtester/indicators/money_line_v9.py new file mode 100644 index 0000000..8166d7d --- /dev/null +++ b/backtester/indicators/money_line_v9.py @@ -0,0 +1,378 @@ +""" +v9 "Money Line with MA Gap" indicator implementation for backtesting. + +Key features vs v8: +- confirmBars = 0 (immediate signals, no wait) +- flipThreshold = 0.5% (more responsive than v8's 0.8%) +- MA gap analysis (50/200 MA convergence/divergence) +- ATR profile system (timeframe-based ATR/multiplier) +- Expanded filters: RSI boundaries, volume range, entry buffer +- Heikin Ashi source mode support +- Price position filters (don't chase extremes) + +ADVANCED OPTIMIZATION PARAMETERS: +- 8 ATR profile params (4 timeframes × period + multiplier) +- 4 RSI boundary params (long/short min/max) +- Volume max threshold +- Entry buffer ATR size +- ADX length +- Source mode (Chart vs Heikin Ashi) +- MA gap filter (optional - not in original v9) +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + +import numpy as np +import pandas as pd + +from backtester.math_utils import calculate_adx, calculate_atr, rma + +Direction = Literal["long", "short"] + + +@dataclass +class MoneyLineV9Inputs: + # Basic Money Line parameters (OPTIMIZED) + confirm_bars: int = 0 # v9: Immediate signals (0 = no wait) + flip_threshold_percent: float = 0.5 # v9: Lower threshold (more responsive) + cooldown_bars: int = 3 # Prevent overtrading + + # ATR Profile System (NEW - for advanced optimization) + # Default: "minutes" profile optimized for 5-minute charts + atr_period: int = 12 # ATR calculation length + multiplier: float = 3.8 # ATR band multiplier + + # Filter parameters (OPTIMIZED) + adx_length: int = 16 # ADX calculation length + adx_min: float = 21 # Minimum ADX for signal (momentum filter) + rsi_length: int = 14 # RSI calculation length + + # RSI boundaries (EXPANDED - for advanced optimization) + rsi_long_min: float = 35 # RSI minimum for longs + rsi_long_max: float = 70 # RSI maximum for longs + rsi_short_min: float = 30 # RSI minimum for shorts + rsi_short_max: float = 70 # RSI maximum for shorts + + # Volume filter (OPTIMIZED) + vol_min: float = 1.0 # Minimum volume ratio (1.0 = average) + vol_max: float = 3.5 # Maximum volume ratio (prevent overheated) + + # Price position filter (OPTIMIZED) + long_pos_max: float = 75 # Don't long above 75% of range (chase tops) + short_pos_min: float = 20 # Don't short below 20% of range (chase bottoms) + + # Entry buffer (NEW - for advanced optimization) + entry_buffer_atr: float = 0.20 # Require price X*ATR beyond line + + # Source mode (NEW - for advanced optimization) + use_heikin_ashi: bool = False # Use Heikin Ashi candles vs Chart + + # MA gap filter (NEW - optional, not in original v9) + use_ma_gap_filter: bool = False # Require MA alignment + ma_gap_long_min: float = 0.0 # Require ma50 > ma200 by this % for longs + ma_gap_short_max: float = 0.0 # Require ma50 < ma200 by this % for shorts + + +@dataclass +class MoneyLineV9Signal: + timestamp: pd.Timestamp + direction: Direction + entry_price: float + adx: float + atr: float + rsi: float + volume_ratio: float + price_position: float + ma_gap: float # NEW: MA50-MA200 gap percentage + + +def ema(series: pd.Series, length: int) -> pd.Series: + """Exponential Moving Average.""" + return series.ewm(span=length, adjust=False).mean() + + +def sma(series: pd.Series, length: int) -> pd.Series: + """Simple Moving Average.""" + return series.rolling(length).mean() + + +def rolling_volume_ratio(volume: pd.Series, length: int = 20) -> pd.Series: + """Volume ratio vs moving average.""" + avg = volume.rolling(length).mean() + return volume / avg + + +def price_position(high: pd.Series, low: pd.Series, close: pd.Series, length: int = 100) -> pd.Series: + """Price position in percentage of range (0-100).""" + highest = high.rolling(length).max() + lowest = low.rolling(length).min() + return 100.0 * (close - lowest) / (highest - lowest) + + +def rsi(series: pd.Series, length: int) -> pd.Series: + """Relative Strength Index.""" + delta = series.diff() + gain = np.where(delta > 0, delta, 0.0) + loss = np.where(delta < 0, -delta, 0.0) + avg_gain = rma(pd.Series(gain), length) + avg_loss = rma(pd.Series(loss), length) + rs = avg_gain / avg_loss.replace(0, np.nan) + rsi_series = 100 - (100 / (1 + rs)) + return rsi_series.fillna(50.0) + + +def heikin_ashi(df: pd.DataFrame) -> pd.DataFrame: + """Calculate Heikin Ashi candles.""" + ha = pd.DataFrame(index=df.index) + ha['close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4 + + # Calculate HA open + ha['open'] = df['open'].copy() + for i in range(1, len(df)): + ha.loc[ha.index[i], 'open'] = (ha.loc[ha.index[i-1], 'open'] + ha.loc[ha.index[i-1], 'close']) / 2 + + ha['high'] = df[['high']].join(ha[['open', 'close']]).max(axis=1) + ha['low'] = df[['low']].join(ha[['open', 'close']]).min(axis=1) + + return ha + + +def supertrend_v9(df: pd.DataFrame, atr_period: int, multiplier: float, + flip_threshold_percent: float, confirm_bars: int, + use_heikin_ashi: bool = False) -> tuple[pd.Series, pd.Series]: + """ + Calculate v9 Money Line (Supertrend with flip threshold and momentum). + + Returns: + (supertrend_line, trend): Line values and trend direction (1=bull, -1=bear) + """ + # Use Heikin Ashi or Chart + if use_heikin_ashi: + ha = heikin_ashi(df[['open', 'high', 'low', 'close']]) + high, low, close = ha['high'], ha['low'], ha['close'] + else: + high, low, close = df['high'], df['low'], df['close'] + + # Calculate ATR on selected source + tr = pd.concat([ + high - low, + (high - close.shift(1)).abs(), + (low - close.shift(1)).abs() + ], axis=1).max(axis=1) + atr = rma(tr, atr_period) + + # Supertrend bands + src = (high + low) / 2 + up = src - (multiplier * atr) + dn = src + (multiplier * atr) + + # Initialize tracking arrays + up1 = up.copy() + dn1 = dn.copy() + trend = pd.Series(1, index=df.index) # Start bullish + tsl = up1.copy() # Trailing stop line + + # Momentum tracking for anti-whipsaw + bull_momentum = pd.Series(0, index=df.index) + bear_momentum = pd.Series(0, index=df.index) + + # Calculate flip threshold + threshold = flip_threshold_percent / 100.0 + + for i in range(1, len(df)): + # Update bands + if close.iloc[i-1] > up1.iloc[i-1]: + up1.iloc[i] = max(up.iloc[i], up1.iloc[i-1]) + else: + up1.iloc[i] = up.iloc[i] + + if close.iloc[i-1] < dn1.iloc[i-1]: + dn1.iloc[i] = min(dn.iloc[i], dn1.iloc[i-1]) + else: + dn1.iloc[i] = dn.iloc[i] + + # Get previous trend and tsl + prev_trend = trend.iloc[i-1] + prev_tsl = tsl.iloc[i-1] + + # Update TSL based on trend + if prev_trend == 1: + tsl.iloc[i] = max(up1.iloc[i], prev_tsl) + else: + tsl.iloc[i] = min(dn1.iloc[i], prev_tsl) + + # Check for flip with threshold and momentum + threshold_amount = tsl.iloc[i] * threshold + + if prev_trend == 1: + # Currently bullish - check for bearish flip + if close.iloc[i] < (tsl.iloc[i] - threshold_amount): + bear_momentum.iloc[i] = bear_momentum.iloc[i-1] + 1 + bull_momentum.iloc[i] = 0 + else: + bear_momentum.iloc[i] = 0 + bull_momentum.iloc[i] = 0 + + # Flip after confirm_bars + 1 consecutive bearish bars + if bear_momentum.iloc[i] >= (confirm_bars + 1): + trend.iloc[i] = -1 + else: + trend.iloc[i] = 1 + else: + # Currently bearish - check for bullish flip + if close.iloc[i] > (tsl.iloc[i] + threshold_amount): + bull_momentum.iloc[i] = bull_momentum.iloc[i-1] + 1 + bear_momentum.iloc[i] = 0 + else: + bull_momentum.iloc[i] = 0 + bear_momentum.iloc[i] = 0 + + # Flip after confirm_bars + 1 consecutive bullish bars + if bull_momentum.iloc[i] >= (confirm_bars + 1): + trend.iloc[i] = 1 + else: + trend.iloc[i] = -1 + + return tsl, trend + + +def money_line_v9_signals(df: pd.DataFrame, inputs: Optional[MoneyLineV9Inputs] = None) -> list[MoneyLineV9Signal]: + """ + v9 "Money Line with MA Gap" signal generation. + + Key behavior: + - Immediate signals on line flip (confirmBars=0) + - Lower flip threshold (0.5% vs v8's 0.8%) + - Expanded filters: RSI boundaries, volume range, price position + - MA gap analysis for trend structure + - Entry buffer requirement (price must be X*ATR beyond line) + - Heikin Ashi source mode support + + Advanced optimization parameters: + - ATR profile (period + multiplier) + - RSI boundaries (4 params) + - Volume max threshold + - Entry buffer size + - ADX length + - Source mode + - MA gap filter (optional) + """ + if inputs is None: + inputs = MoneyLineV9Inputs() + + data = df.copy() + data = data.sort_index() + + # Calculate Money Line + supertrend, trend = supertrend_v9( + data, + inputs.atr_period, + inputs.multiplier, + inputs.flip_threshold_percent, + inputs.confirm_bars, + inputs.use_heikin_ashi + ) + data['supertrend'] = supertrend + data['trend'] = trend + + # Calculate indicators (use Chart prices for consistency with filters) + data["rsi"] = rsi(data["close"], inputs.rsi_length) + data["atr"] = calculate_atr(data, inputs.atr_period) + data["adx"] = calculate_adx(data, inputs.adx_length) + data["volume_ratio"] = rolling_volume_ratio(data["volume"]) + data["price_position"] = price_position(data["high"], data["low"], data["close"]) + + # MA gap analysis (NEW) + data["ma50"] = sma(data["close"], 50) + data["ma200"] = sma(data["close"], 200) + data["ma_gap"] = ((data["ma50"] - data["ma200"]) / data["ma200"]) * 100 + + signals: list[MoneyLineV9Signal] = [] + cooldown_remaining = 0 + + for idx in range(1, len(data)): + row = data.iloc[idx] + prev = data.iloc[idx - 1] + + # Detect trend flip + flip_long = prev.trend == -1 and row.trend == 1 + flip_short = prev.trend == 1 and row.trend == -1 + + if cooldown_remaining > 0: + cooldown_remaining -= 1 + continue + + # Apply filters + adx_ok = row.adx >= inputs.adx_min + volume_ok = inputs.vol_min <= row.volume_ratio <= inputs.vol_max + + # Entry buffer check (price must be X*ATR beyond line) + if flip_long: + entry_buffer_ok = row.close > (row.supertrend + inputs.entry_buffer_atr * row.atr) + elif flip_short: + entry_buffer_ok = row.close < (row.supertrend - inputs.entry_buffer_atr * row.atr) + else: + entry_buffer_ok = False + + if flip_long: + # Long filters + rsi_ok = inputs.rsi_long_min <= row.rsi <= inputs.rsi_long_max + pos_ok = row.price_position < inputs.long_pos_max + + # MA gap filter (optional) + if inputs.use_ma_gap_filter: + ma_gap_ok = row.ma_gap >= inputs.ma_gap_long_min + else: + ma_gap_ok = True + + if adx_ok and volume_ok and rsi_ok and pos_ok and entry_buffer_ok and ma_gap_ok: + signals.append( + MoneyLineV9Signal( + timestamp=row.name, + direction="long", + entry_price=float(row.close), + adx=float(row.adx), + atr=float(row.atr), + rsi=float(row.rsi), + volume_ratio=float(row.volume_ratio), + price_position=float(row.price_position), + ma_gap=float(row.ma_gap), + ) + ) + cooldown_remaining = inputs.cooldown_bars + + elif flip_short: + # Short filters + rsi_ok = inputs.rsi_short_min <= row.rsi <= inputs.rsi_short_max + pos_ok = row.price_position > inputs.short_pos_min + + # MA gap filter (optional) + if inputs.use_ma_gap_filter: + ma_gap_ok = row.ma_gap <= inputs.ma_gap_short_max + else: + ma_gap_ok = True + + if adx_ok and volume_ok and rsi_ok and pos_ok and entry_buffer_ok and ma_gap_ok: + signals.append( + MoneyLineV9Signal( + timestamp=row.name, + direction="short", + entry_price=float(row.close), + adx=float(row.adx), + atr=float(row.atr), + rsi=float(row.rsi), + volume_ratio=float(row.volume_ratio), + price_position=float(row.price_position), + ma_gap=float(row.ma_gap), + ) + ) + cooldown_remaining = inputs.cooldown_bars + + return signals diff --git a/cluster/V9_ADVANCED_SWEEP_README.md b/cluster/V9_ADVANCED_SWEEP_README.md new file mode 100644 index 0000000..eed4570 --- /dev/null +++ b/cluster/V9_ADVANCED_SWEEP_README.md @@ -0,0 +1,349 @@ +# V9 Advanced Parameter Sweep - 810K Configurations + +**Status:** Ready to launch (Dec 1, 2025) +**Total Configs:** ~810,000 (18-parameter grid with MA gap filter) +**Expected Runtime:** 70-80 hours on 2 EPYC servers +**Enhancement:** Added MA gap filter exploration (8× expansion from 101K) + +## Architecture + +### Parameter Space (18 dimensions) + +Builds on existing v9 grid but adds **MA gap filter** parameters: + +**Original 15 parameters (101K configs):** +- Time profiles: minutes, hours, daily, weekly (4 profiles) +- ATR periods: profile-specific (3-4 values each) +- ATR multipliers: profile-specific (3-4 values each) +- RSI boundaries: long_min/max, short_min/max (3×4 values) +- Volume max: 3.0, 3.5, 4.0 +- Entry buffer: 0.15, 0.20, 0.25 +- ADX length: 14, 16, 18 + +**NEW: MA Gap Filter (3 dimensions = 18× multiplier):** +- `use_ma_gap`: True/False (2 values) +- `ma_gap_min_long`: -5.0%, 0%, +5.0% (3 values) +- `ma_gap_min_short`: -5.0%, 0%, +5.0% (3 values) + +**Total:** 101K × 2 × 3 × 3 = **~810,000 configurations** + +### What is MA Gap Filter? + +**Purpose:** Filter entries based on MA50-MA200 convergence/divergence + +**Long Entry Logic:** +```python +if use_ma_gap and ma_gap_min_long is not None: + ma_gap_percent = (ma50 - ma200) / ma200 * 100 + if ma_gap_percent < ma_gap_min_long: + block_entry # MAs too diverged or converging wrong way +``` + +**Short Entry Logic:** +```python +if use_ma_gap and ma_gap_min_short is not None: + ma_gap_percent = (ma50 - ma200) / ma200 * 100 + if ma_gap_percent > ma_gap_min_short: + block_entry # MAs too diverged or converging wrong way +``` + +**Hypothesis:** +- **LONG at MA crossover:** Require ma_gap ≥ 0% (bullish or neutral) +- **SHORT at MA crossover:** Require ma_gap ≤ 0% (bearish or neutral) +- **Avoid whipsaws:** Block entries when MAs are too diverged + +**Parameter Exploration:** +- `-5.0%`: Allows 5% adverse gap (catching reversals) +- `0%`: Requires neutral or favorable gap +- `+5.0%`: Requires 5% favorable gap (strong trend only) + +**Expected Findings:** +1. **Optimal gap thresholds** for each profile (minutes vs daily may differ) +2. **Direction-specific gaps** (LONGs may need different threshold than SHORTs) +3. **Performance comparison** (use_ma_gap=True vs False baseline) + +### Why This Matters + +**User Context (Nov 27 analysis):** +- v9 has strong baseline edge ($405.88 on 1-year data) +- But parameter insensitivity suggests edge is in **logic**, not tuning +- MA gap filter adds **new logic dimension** (not just parameter tuning) +- Could filter false breakouts that occur during MA whipsaw zones + +**Real-world validation needed:** +- Some MAs converging = good entries (trend formation) +- Some MAs diverged = good entries (strong trend continuation) +- Optimal gap threshold is data-driven discovery goal + +## Implementation + +### Files + +``` +cluster/ +├── money_line_v9.py # v9 indicator (copied from backtester/indicators/) +├── v9_advanced_worker.py # Worker script (processes 1 chunk) +├── v9_advanced_coordinator.py # Coordinator (assigns chunks to workers) +├── run_v9_advanced_sweep.sh # Startup script (generates configs + launches) +├── chunks/ # Generated parameter configurations +│ ├── v9_advanced_chunk_0000.json (1,000 configs) +│ ├── v9_advanced_chunk_0001.json (1,000 configs) +│ └── ... (~810 chunk files) +├── exploration.db # SQLite database (chunk tracking) +└── distributed_results/ # CSV outputs from workers + ├── v9_advanced_chunk_0000_results.csv + └── ... +``` + +### Database Schema + +**v9_advanced_chunks table:** +```sql +CREATE TABLE v9_advanced_chunks ( + id TEXT PRIMARY KEY, -- v9_advanced_chunk_0000 + start_combo INTEGER, -- 0 (not used, legacy) + end_combo INTEGER, -- 1000 (not used, legacy) + total_combos INTEGER, -- 1000 configs per chunk + status TEXT, -- 'pending', 'running', 'completed', 'failed' + assigned_worker TEXT, -- 'worker1', 'worker2', NULL + started_at INTEGER, -- Unix timestamp + completed_at INTEGER, -- Unix timestamp + created_at INTEGER DEFAULT (strftime('%s', 'now')) +) +``` + +**v9_advanced_strategies table:** +```sql +CREATE TABLE v9_advanced_strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chunk_id TEXT NOT NULL, -- FK to v9_advanced_chunks + params TEXT NOT NULL, -- JSON of 18 parameters + pnl REAL NOT NULL, -- Total P&L + win_rate REAL NOT NULL, -- % winners + profit_factor REAL NOT NULL, -- (Not yet implemented) + max_drawdown REAL NOT NULL, -- Max DD % + total_trades INTEGER NOT NULL, -- Number of trades + created_at INTEGER DEFAULT (strftime('%s', 'now')) +) +``` + +## Usage + +### 1. Launch Sweep + +```bash +cd /home/icke/traderv4/cluster +./run_v9_advanced_sweep.sh +``` + +**What it does:** +1. Generates 810K parameter configurations +2. Splits into ~810 chunks (1,000 configs each) +3. Creates SQLite database with chunk tracking +4. Launches coordinator in background +5. Coordinator assigns chunks to 2 EPYC workers +6. Workers process chunks in parallel + +### 2. Monitor Progress + +**Web Dashboard:** +``` +http://localhost:3001/cluster +``` + +**Command Line:** +```bash +# Watch coordinator logs +tail -f coordinator_v9_advanced.log + +# Check database status +sqlite3 exploration.db " + SELECT + status, + COUNT(*) as count, + ROUND(COUNT(*) * 100.0 / 810, 1) as percent + FROM v9_advanced_chunks + GROUP BY status +" +``` + +### 3. Analyze Results + +After completion, aggregate all results: + +```bash +# Combine all CSV files +cd distributed_results +cat v9_advanced_chunk_*_results.csv | head -1 > all_v9_advanced_results.csv +tail -n +2 -q v9_advanced_chunk_*_results.csv >> all_v9_advanced_results.csv + +# Top 100 performers +sort -t, -k2 -rn all_v9_advanced_results.csv | head -100 > top_100_v9_advanced.csv +``` + +**Analysis queries:** + +```python +import pandas as pd + +df = pd.read_csv('all_v9_advanced_results.csv') + +# Compare MA gap filter vs baseline +baseline = df[df['use_ma_gap'] == False] +filtered = df[df['use_ma_gap'] == True] + +print(f"Baseline avg: ${baseline['profit'].mean():.2f}") +print(f"Filtered avg: ${filtered['profit'].mean():.2f}") + +# Find optimal gap thresholds +for profile in ['minutes', 'hours', 'daily', 'weekly']: + profile_df = df[df['profile'] == profile] + best = profile_df.nlargest(10, 'profit') + print(f"\n{profile.upper()} - Top 10 gap thresholds:") + print(best[['ma_gap_min_long', 'ma_gap_min_short', 'profit', 'win_rate']]) +``` + +## Expected Outcomes + +### If MA Gap Filter Helps: + +**Expected pattern:** +- Filtered configs outperform baseline +- Optimal gap thresholds cluster around certain values +- Direction-specific gaps emerge (LONGs need +gap, SHORTs need -gap) + +**Action:** +- Update production v9 with optimal gap thresholds +- Deploy to live trading after forward testing + +### If MA Gap Filter Hurts: + +**Expected pattern:** +- Baseline (use_ma_gap=False) outperforms all filtered configs +- No clear threshold patterns emerge +- Performance degrades with stricter gaps + +**Action:** +- Keep production v9 as-is (no MA gap filter) +- Document findings: MA divergence not predictive for v9 signals + +### If Results Inconclusive: + +**Expected pattern:** +- Filtered and baseline perform similarly +- High variance in gap threshold performance + +**Action:** +- Keep baseline for simplicity (Occam's Razor) +- Consider gap as optional "turbo mode" for specific profiles + +## Worker Infrastructure + +Uses **existing EPYC cluster setup** (64 cores total): + +**Worker 1 (bd-epyc-02):** +- Direct SSH: `root@10.10.254.106` +- Workspace: `/home/comprehensive_sweep` +- Python: `.venv` with pandas/numpy +- Cores: 32 threads + +**Worker 2 (bd-host01):** +- SSH hop: via worker1 +- Direct: `root@10.20.254.100` +- Workspace: `/home/backtest_dual/backtest` +- Python: `.venv` with pandas/numpy +- Cores: 32 threads + +**Prerequisites (already met):** +- Python 3.11+ with pandas, numpy +- Virtual environments active +- SOLUSDT 5m OHLCV data (Nov 2024 - Nov 2025) +- SSH keys configured + +## Timeline + +**Estimate:** 70-80 hours total (810K configs ÷ 64 cores) + +**Breakdown:** +- Config generation: ~5 minutes +- Chunk assignment: ~1 minute +- Parallel execution: ~70 hours (1,000 configs/chunk × ~3.1s/config ÷ 2 workers) +- Result aggregation: ~10 minutes + +**Monitoring intervals:** +- Check status: Every 30-60 minutes +- Full results available: After ~3 days + +## Lessons from Previous Sweeps + +### v9 Baseline (65K configs, Nov 28-29): +- **Finding:** Parameter insensitivity observed +- **Implication:** Edge is in core logic, not specific parameter values +- **Action:** Explore new logic dimensions (MA gap) instead of tighter parameter grids + +### v10 Removal (Nov 28): +- **Finding:** 72 configs produced identical results +- **Implication:** New logic must add real edge, not just complexity +- **Action:** MA gap filter is **observable market state** (not derived metric) + +### Distributed Worker Bug (Dec 1): +- **Finding:** Dict passed instead of lambda function +- **Implication:** Type safety critical for 810K config sweep +- **Action:** Simplified v9_advanced_worker.py with explicit types + +## File Locations + +**Master (srvdocker02):** +``` +/home/icke/traderv4/cluster/ +``` + +**Workers (EPYC servers):** +``` +/home/comprehensive_sweep/ (worker1) +/home/backtest_dual/backtest/ (worker2) +``` + +**Results (master):** +``` +/home/icke/traderv4/cluster/distributed_results/ +``` + +## Git Commits + +```bash +# Before launch +git add cluster/ +git commit -m "feat: V9 advanced parameter sweep with MA gap filter (810K configs) + +- Added MA gap filter exploration (3 dimensions = 18× expansion) +- Created v9_advanced_worker.py for chunk processing +- Created v9_advanced_coordinator.py for work distribution +- Uses existing EPYC cluster infrastructure (64 cores) +- Expected runtime: 70-80 hours for 810K configurations +" +git push +``` + +## Post-Sweep Actions + +1. **Aggregate results** into single CSV +2. **Compare MA gap filter** vs baseline performance +3. **Identify optimal thresholds** per profile and direction +4. **Update production v9** if MA gap filter shows consistent edge +5. **Forward test** for 50-100 trades before live deployment +6. **Document findings** in INDICATOR_V9_MA_GAP_ROADMAP.md + +## Support + +**Questions during sweep:** +- Check `coordinator_v9_advanced.log` for coordinator status +- Check worker logs via SSH: `ssh worker1 tail -f /home/comprehensive_sweep/worker.log` +- Database queries: `sqlite3 exploration.db "SELECT ..." +` + +**If sweep stalls:** +1. Check coordinator process: `ps aux | grep v9_advanced_coordinator` +2. Check worker processes: SSH to workers, `ps aux | grep python` +3. Reset failed chunks: `UPDATE v9_advanced_chunks SET status='pending' WHERE status='failed'` +4. Restart coordinator: `./run_v9_advanced_sweep.sh` diff --git a/cluster/money_line_v9.py b/cluster/money_line_v9.py new file mode 100644 index 0000000..8166d7d --- /dev/null +++ b/cluster/money_line_v9.py @@ -0,0 +1,378 @@ +""" +v9 "Money Line with MA Gap" indicator implementation for backtesting. + +Key features vs v8: +- confirmBars = 0 (immediate signals, no wait) +- flipThreshold = 0.5% (more responsive than v8's 0.8%) +- MA gap analysis (50/200 MA convergence/divergence) +- ATR profile system (timeframe-based ATR/multiplier) +- Expanded filters: RSI boundaries, volume range, entry buffer +- Heikin Ashi source mode support +- Price position filters (don't chase extremes) + +ADVANCED OPTIMIZATION PARAMETERS: +- 8 ATR profile params (4 timeframes × period + multiplier) +- 4 RSI boundary params (long/short min/max) +- Volume max threshold +- Entry buffer ATR size +- ADX length +- Source mode (Chart vs Heikin Ashi) +- MA gap filter (optional - not in original v9) +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + +import numpy as np +import pandas as pd + +from backtester.math_utils import calculate_adx, calculate_atr, rma + +Direction = Literal["long", "short"] + + +@dataclass +class MoneyLineV9Inputs: + # Basic Money Line parameters (OPTIMIZED) + confirm_bars: int = 0 # v9: Immediate signals (0 = no wait) + flip_threshold_percent: float = 0.5 # v9: Lower threshold (more responsive) + cooldown_bars: int = 3 # Prevent overtrading + + # ATR Profile System (NEW - for advanced optimization) + # Default: "minutes" profile optimized for 5-minute charts + atr_period: int = 12 # ATR calculation length + multiplier: float = 3.8 # ATR band multiplier + + # Filter parameters (OPTIMIZED) + adx_length: int = 16 # ADX calculation length + adx_min: float = 21 # Minimum ADX for signal (momentum filter) + rsi_length: int = 14 # RSI calculation length + + # RSI boundaries (EXPANDED - for advanced optimization) + rsi_long_min: float = 35 # RSI minimum for longs + rsi_long_max: float = 70 # RSI maximum for longs + rsi_short_min: float = 30 # RSI minimum for shorts + rsi_short_max: float = 70 # RSI maximum for shorts + + # Volume filter (OPTIMIZED) + vol_min: float = 1.0 # Minimum volume ratio (1.0 = average) + vol_max: float = 3.5 # Maximum volume ratio (prevent overheated) + + # Price position filter (OPTIMIZED) + long_pos_max: float = 75 # Don't long above 75% of range (chase tops) + short_pos_min: float = 20 # Don't short below 20% of range (chase bottoms) + + # Entry buffer (NEW - for advanced optimization) + entry_buffer_atr: float = 0.20 # Require price X*ATR beyond line + + # Source mode (NEW - for advanced optimization) + use_heikin_ashi: bool = False # Use Heikin Ashi candles vs Chart + + # MA gap filter (NEW - optional, not in original v9) + use_ma_gap_filter: bool = False # Require MA alignment + ma_gap_long_min: float = 0.0 # Require ma50 > ma200 by this % for longs + ma_gap_short_max: float = 0.0 # Require ma50 < ma200 by this % for shorts + + +@dataclass +class MoneyLineV9Signal: + timestamp: pd.Timestamp + direction: Direction + entry_price: float + adx: float + atr: float + rsi: float + volume_ratio: float + price_position: float + ma_gap: float # NEW: MA50-MA200 gap percentage + + +def ema(series: pd.Series, length: int) -> pd.Series: + """Exponential Moving Average.""" + return series.ewm(span=length, adjust=False).mean() + + +def sma(series: pd.Series, length: int) -> pd.Series: + """Simple Moving Average.""" + return series.rolling(length).mean() + + +def rolling_volume_ratio(volume: pd.Series, length: int = 20) -> pd.Series: + """Volume ratio vs moving average.""" + avg = volume.rolling(length).mean() + return volume / avg + + +def price_position(high: pd.Series, low: pd.Series, close: pd.Series, length: int = 100) -> pd.Series: + """Price position in percentage of range (0-100).""" + highest = high.rolling(length).max() + lowest = low.rolling(length).min() + return 100.0 * (close - lowest) / (highest - lowest) + + +def rsi(series: pd.Series, length: int) -> pd.Series: + """Relative Strength Index.""" + delta = series.diff() + gain = np.where(delta > 0, delta, 0.0) + loss = np.where(delta < 0, -delta, 0.0) + avg_gain = rma(pd.Series(gain), length) + avg_loss = rma(pd.Series(loss), length) + rs = avg_gain / avg_loss.replace(0, np.nan) + rsi_series = 100 - (100 / (1 + rs)) + return rsi_series.fillna(50.0) + + +def heikin_ashi(df: pd.DataFrame) -> pd.DataFrame: + """Calculate Heikin Ashi candles.""" + ha = pd.DataFrame(index=df.index) + ha['close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4 + + # Calculate HA open + ha['open'] = df['open'].copy() + for i in range(1, len(df)): + ha.loc[ha.index[i], 'open'] = (ha.loc[ha.index[i-1], 'open'] + ha.loc[ha.index[i-1], 'close']) / 2 + + ha['high'] = df[['high']].join(ha[['open', 'close']]).max(axis=1) + ha['low'] = df[['low']].join(ha[['open', 'close']]).min(axis=1) + + return ha + + +def supertrend_v9(df: pd.DataFrame, atr_period: int, multiplier: float, + flip_threshold_percent: float, confirm_bars: int, + use_heikin_ashi: bool = False) -> tuple[pd.Series, pd.Series]: + """ + Calculate v9 Money Line (Supertrend with flip threshold and momentum). + + Returns: + (supertrend_line, trend): Line values and trend direction (1=bull, -1=bear) + """ + # Use Heikin Ashi or Chart + if use_heikin_ashi: + ha = heikin_ashi(df[['open', 'high', 'low', 'close']]) + high, low, close = ha['high'], ha['low'], ha['close'] + else: + high, low, close = df['high'], df['low'], df['close'] + + # Calculate ATR on selected source + tr = pd.concat([ + high - low, + (high - close.shift(1)).abs(), + (low - close.shift(1)).abs() + ], axis=1).max(axis=1) + atr = rma(tr, atr_period) + + # Supertrend bands + src = (high + low) / 2 + up = src - (multiplier * atr) + dn = src + (multiplier * atr) + + # Initialize tracking arrays + up1 = up.copy() + dn1 = dn.copy() + trend = pd.Series(1, index=df.index) # Start bullish + tsl = up1.copy() # Trailing stop line + + # Momentum tracking for anti-whipsaw + bull_momentum = pd.Series(0, index=df.index) + bear_momentum = pd.Series(0, index=df.index) + + # Calculate flip threshold + threshold = flip_threshold_percent / 100.0 + + for i in range(1, len(df)): + # Update bands + if close.iloc[i-1] > up1.iloc[i-1]: + up1.iloc[i] = max(up.iloc[i], up1.iloc[i-1]) + else: + up1.iloc[i] = up.iloc[i] + + if close.iloc[i-1] < dn1.iloc[i-1]: + dn1.iloc[i] = min(dn.iloc[i], dn1.iloc[i-1]) + else: + dn1.iloc[i] = dn.iloc[i] + + # Get previous trend and tsl + prev_trend = trend.iloc[i-1] + prev_tsl = tsl.iloc[i-1] + + # Update TSL based on trend + if prev_trend == 1: + tsl.iloc[i] = max(up1.iloc[i], prev_tsl) + else: + tsl.iloc[i] = min(dn1.iloc[i], prev_tsl) + + # Check for flip with threshold and momentum + threshold_amount = tsl.iloc[i] * threshold + + if prev_trend == 1: + # Currently bullish - check for bearish flip + if close.iloc[i] < (tsl.iloc[i] - threshold_amount): + bear_momentum.iloc[i] = bear_momentum.iloc[i-1] + 1 + bull_momentum.iloc[i] = 0 + else: + bear_momentum.iloc[i] = 0 + bull_momentum.iloc[i] = 0 + + # Flip after confirm_bars + 1 consecutive bearish bars + if bear_momentum.iloc[i] >= (confirm_bars + 1): + trend.iloc[i] = -1 + else: + trend.iloc[i] = 1 + else: + # Currently bearish - check for bullish flip + if close.iloc[i] > (tsl.iloc[i] + threshold_amount): + bull_momentum.iloc[i] = bull_momentum.iloc[i-1] + 1 + bear_momentum.iloc[i] = 0 + else: + bull_momentum.iloc[i] = 0 + bear_momentum.iloc[i] = 0 + + # Flip after confirm_bars + 1 consecutive bullish bars + if bull_momentum.iloc[i] >= (confirm_bars + 1): + trend.iloc[i] = 1 + else: + trend.iloc[i] = -1 + + return tsl, trend + + +def money_line_v9_signals(df: pd.DataFrame, inputs: Optional[MoneyLineV9Inputs] = None) -> list[MoneyLineV9Signal]: + """ + v9 "Money Line with MA Gap" signal generation. + + Key behavior: + - Immediate signals on line flip (confirmBars=0) + - Lower flip threshold (0.5% vs v8's 0.8%) + - Expanded filters: RSI boundaries, volume range, price position + - MA gap analysis for trend structure + - Entry buffer requirement (price must be X*ATR beyond line) + - Heikin Ashi source mode support + + Advanced optimization parameters: + - ATR profile (period + multiplier) + - RSI boundaries (4 params) + - Volume max threshold + - Entry buffer size + - ADX length + - Source mode + - MA gap filter (optional) + """ + if inputs is None: + inputs = MoneyLineV9Inputs() + + data = df.copy() + data = data.sort_index() + + # Calculate Money Line + supertrend, trend = supertrend_v9( + data, + inputs.atr_period, + inputs.multiplier, + inputs.flip_threshold_percent, + inputs.confirm_bars, + inputs.use_heikin_ashi + ) + data['supertrend'] = supertrend + data['trend'] = trend + + # Calculate indicators (use Chart prices for consistency with filters) + data["rsi"] = rsi(data["close"], inputs.rsi_length) + data["atr"] = calculate_atr(data, inputs.atr_period) + data["adx"] = calculate_adx(data, inputs.adx_length) + data["volume_ratio"] = rolling_volume_ratio(data["volume"]) + data["price_position"] = price_position(data["high"], data["low"], data["close"]) + + # MA gap analysis (NEW) + data["ma50"] = sma(data["close"], 50) + data["ma200"] = sma(data["close"], 200) + data["ma_gap"] = ((data["ma50"] - data["ma200"]) / data["ma200"]) * 100 + + signals: list[MoneyLineV9Signal] = [] + cooldown_remaining = 0 + + for idx in range(1, len(data)): + row = data.iloc[idx] + prev = data.iloc[idx - 1] + + # Detect trend flip + flip_long = prev.trend == -1 and row.trend == 1 + flip_short = prev.trend == 1 and row.trend == -1 + + if cooldown_remaining > 0: + cooldown_remaining -= 1 + continue + + # Apply filters + adx_ok = row.adx >= inputs.adx_min + volume_ok = inputs.vol_min <= row.volume_ratio <= inputs.vol_max + + # Entry buffer check (price must be X*ATR beyond line) + if flip_long: + entry_buffer_ok = row.close > (row.supertrend + inputs.entry_buffer_atr * row.atr) + elif flip_short: + entry_buffer_ok = row.close < (row.supertrend - inputs.entry_buffer_atr * row.atr) + else: + entry_buffer_ok = False + + if flip_long: + # Long filters + rsi_ok = inputs.rsi_long_min <= row.rsi <= inputs.rsi_long_max + pos_ok = row.price_position < inputs.long_pos_max + + # MA gap filter (optional) + if inputs.use_ma_gap_filter: + ma_gap_ok = row.ma_gap >= inputs.ma_gap_long_min + else: + ma_gap_ok = True + + if adx_ok and volume_ok and rsi_ok and pos_ok and entry_buffer_ok and ma_gap_ok: + signals.append( + MoneyLineV9Signal( + timestamp=row.name, + direction="long", + entry_price=float(row.close), + adx=float(row.adx), + atr=float(row.atr), + rsi=float(row.rsi), + volume_ratio=float(row.volume_ratio), + price_position=float(row.price_position), + ma_gap=float(row.ma_gap), + ) + ) + cooldown_remaining = inputs.cooldown_bars + + elif flip_short: + # Short filters + rsi_ok = inputs.rsi_short_min <= row.rsi <= inputs.rsi_short_max + pos_ok = row.price_position > inputs.short_pos_min + + # MA gap filter (optional) + if inputs.use_ma_gap_filter: + ma_gap_ok = row.ma_gap <= inputs.ma_gap_short_max + else: + ma_gap_ok = True + + if adx_ok and volume_ok and rsi_ok and pos_ok and entry_buffer_ok and ma_gap_ok: + signals.append( + MoneyLineV9Signal( + timestamp=row.name, + direction="short", + entry_price=float(row.close), + adx=float(row.adx), + atr=float(row.atr), + rsi=float(row.rsi), + volume_ratio=float(row.volume_ratio), + price_position=float(row.price_position), + ma_gap=float(row.ma_gap), + ) + ) + cooldown_remaining = inputs.cooldown_bars + + return signals diff --git a/cluster/run_v9_advanced_sweep.sh b/cluster/run_v9_advanced_sweep.sh new file mode 100755 index 0000000..93f1905 --- /dev/null +++ b/cluster/run_v9_advanced_sweep.sh @@ -0,0 +1,219 @@ +#!/bin/bash +# V9 Advanced Parameter Sweep - 810K configs with MA gap filter exploration +# Uses existing cluster infrastructure with all dependencies already installed + +set -e + +echo "==========================================" +echo "V9 ADVANCED PARAMETER SWEEP" +echo "==========================================" +echo "" +echo "Configuration:" +echo " • Total configs: ~810,000 (18 parameters)" +echo " • New parameters: MA gap filter (3 dimensions)" +echo " • Chunk size: 1,000 configs/chunk = ~810 chunks" +echo " • Workers: 2 EPYCs" +echo " • Expected runtime: 70-80 hours" +echo "" + +# Check if data file exists +DATA_FILE="data/solusdt_5m.csv" +if [ ! -f "$DATA_FILE" ]; then + echo "❌ Error: Data file not found: $DATA_FILE" + echo "Please ensure OHLCV data is available" + exit 1 +fi + +# Activate virtual environment +echo "Activating Python environment..." +source .venv/bin/activate + +# Generate parameter configurations +echo "" +echo "==========================================" +echo "STEP 1: Generate Configurations" +echo "==========================================" + +python3 << 'PYTHON_CODE' +import itertools +import json +from pathlib import Path + +# 18-dimensional parameter space +ATR_RANGES = { + "minutes": [10, 12, 14], + "hours": [8, 10, 12], + "daily": [8, 10, 12, 14], + "weekly": [5, 7, 9], +} + +MULT_RANGES = { + "minutes": [3.5, 3.8, 4.0], + "hours": [3.2, 3.5, 3.8], + "daily": [3.0, 3.2, 3.5, 3.8], + "weekly": [2.8, 3.0, 3.2], +} + +RSI_LONG_MIN = [30, 35, 40] +RSI_LONG_MAX = [65, 70, 75] +RSI_SHORT_MIN = [25, 30, 35] +RSI_SHORT_MAX = [65, 70, 75] +VOL_MAX = [3.0, 3.5, 4.0] +ENTRY_BUFFER = [0.15, 0.20, 0.25] +ADX_LENGTH = [14, 16, 18] + +# NEW: MA gap filter parameters (8x expansion) +USE_MA_GAP = [True, False] +MA_GAP_MIN_LONG = [-5.0, 0.0, 5.0] +MA_GAP_MIN_SHORT = [-5.0, 0.0, 5.0] + +print("Generating parameter configurations...") +configs = [] + +for profile in ["minutes", "hours", "daily", "weekly"]: + for atr in ATR_RANGES[profile]: + for mult in MULT_RANGES[profile]: + for rsi_long_min in RSI_LONG_MIN: + for rsi_long_max in RSI_LONG_MAX: + if rsi_long_max <= rsi_long_min: + continue + for rsi_short_min in RSI_SHORT_MIN: + for rsi_short_max in RSI_SHORT_MAX: + if rsi_short_max <= rsi_short_min: + continue + for vol_max in VOL_MAX: + for entry_buffer in ENTRY_BUFFER: + for adx_len in ADX_LENGTH: + # NEW: MA gap filter combinations + for use_ma_gap in USE_MA_GAP: + for gap_min_long in MA_GAP_MIN_LONG: + for gap_min_short in MA_GAP_MIN_SHORT: + config = { + "profile": profile, + f"atr_{profile}": atr, + f"mult_{profile}": mult, + "rsi_long_min": rsi_long_min, + "rsi_long_max": rsi_long_max, + "rsi_short_min": rsi_short_min, + "rsi_short_max": rsi_short_max, + "vol_max": vol_max, + "entry_buffer": entry_buffer, + "adx_length": adx_len, + # NEW parameters + "use_ma_gap": use_ma_gap, + "ma_gap_min_long": gap_min_long, + "ma_gap_min_short": gap_min_short, + } + configs.append(config) + +print(f"✓ Generated {len(configs):,} configurations") + +# Create chunks (1,000 configs per chunk) +chunk_dir = Path("chunks") +chunk_dir.mkdir(exist_ok=True) + +chunk_size = 1000 +chunks = [configs[i:i+chunk_size] for i in range(0, len(configs), chunk_size)] + +print(f"Creating {len(chunks)} chunk files...") +for i, chunk in enumerate(chunks): + chunk_file = chunk_dir / f"v9_advanced_chunk_{i:04d}.json" + with open(chunk_file, 'w') as f: + json.dump(chunk, f) + +print(f"✓ Created {len(chunks)} chunk files in chunks/") +print(f" Total configs: {len(configs):,}") +print(f" Configs per chunk: {chunk_size}") +print(f" Enhancement: Added MA gap filter (2×3×3 = 18× multiplier)") +PYTHON_CODE + +# Setup exploration database +echo "" +echo "==========================================" +echo "STEP 2: Setup Database" +echo "==========================================" + +python3 << 'PYTHON_CODE' +import sqlite3 +from pathlib import Path + +db_path = Path("exploration.db") +conn = sqlite3.connect(str(db_path)) +cursor = conn.cursor() + +# Drop existing v9_advanced tables if they exist +cursor.execute("DROP TABLE IF EXISTS v9_advanced_strategies") +cursor.execute("DROP TABLE IF EXISTS v9_advanced_chunks") + +# Create chunks table +cursor.execute(""" + CREATE TABLE v9_advanced_chunks ( + id TEXT PRIMARY KEY, + start_combo INTEGER NOT NULL, + end_combo INTEGER NOT NULL, + total_combos INTEGER NOT NULL, + status TEXT NOT NULL, + assigned_worker TEXT, + started_at INTEGER, + completed_at INTEGER, + created_at INTEGER DEFAULT (strftime('%s', 'now')) + ) +""") + +# Create strategies table +cursor.execute(""" + CREATE TABLE v9_advanced_strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chunk_id TEXT NOT NULL, + params TEXT NOT NULL, + pnl REAL NOT NULL, + win_rate REAL NOT NULL, + profit_factor REAL NOT NULL, + max_drawdown REAL NOT NULL, + total_trades INTEGER NOT NULL, + created_at INTEGER DEFAULT (strftime('%s', 'now')), + FOREIGN KEY (chunk_id) REFERENCES v9_advanced_chunks(id) + ) +""") + +# Register all chunks +chunk_files = sorted(Path("chunks").glob("v9_advanced_chunk_*.json")) +for chunk_file in chunk_files: + chunk_id = chunk_file.stem + # Each chunk has ~1,000 configs (except possibly last one) + cursor.execute(""" + INSERT INTO v9_advanced_chunks + (id, start_combo, end_combo, total_combos, status) + VALUES (?, 0, 1000, 1000, 'pending') + """, (chunk_id,)) + +conn.commit() +print(f"✓ Database ready: exploration.db") +print(f" Registered {len(chunk_files)} chunks") +conn.close() +PYTHON_CODE + +echo "" +echo "==========================================" +echo "STEP 3: Launch Distributed Coordinator" +echo "==========================================" +echo "" +echo "Starting coordinator in background..." +echo "Monitor progress at: http://localhost:3001/cluster" +echo "" + +# Launch distributed coordinator +nohup python3 distributed_coordinator.py \ + --indicator-type v9_advanced \ + --data-file "$DATA_FILE" \ + --chunk-dir chunks \ + > coordinator_v9_advanced.log 2>&1 & + +COORD_PID=$! +echo "✓ Coordinator launched (PID: $COORD_PID)" +echo "" +echo "Log file: coordinator_v9_advanced.log" +echo "Monitor: tail -f coordinator_v9_advanced.log" +echo "" +echo "Sweep will run for ~70-80 hours (810K configs, 2 workers)" +echo "Check status: http://localhost:3001/cluster" diff --git a/cluster/v9_advanced_coordinator.py b/cluster/v9_advanced_coordinator.py new file mode 100755 index 0000000..d49716a --- /dev/null +++ b/cluster/v9_advanced_coordinator.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +""" +V9 Advanced Parameter Sweep Coordinator + +Simpler coordinator specifically for v9_advanced chunks. +Uses existing worker infrastructure but with v9-specific worker script. +""" + +import sqlite3 +import subprocess +import time +from pathlib import Path +from datetime import datetime + +# Worker configuration (reuse existing SSH setup) +WORKERS = { + 'worker1': { + 'host': 'root@10.10.254.106', + 'workspace': '/home/comprehensive_sweep', + }, + 'worker2': { + 'host': 'root@10.20.254.100', + 'workspace': '/home/backtest_dual/backtest', + 'ssh_hop': 'root@10.10.254.106', + } +} + +DATA_FILE = 'data/solusdt_5m.csv' +DB_PATH = 'exploration.db' + +def get_next_chunk(): + """Get next pending chunk from database""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + SELECT id FROM v9_advanced_chunks + WHERE status = 'pending' + ORDER BY id + LIMIT 1 + """) + + result = cursor.fetchone() + conn.close() + + return result[0] if result else None + +def assign_chunk(chunk_id: str, worker_name: str): + """Mark chunk as assigned to worker""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + UPDATE v9_advanced_chunks + SET status = 'running', + assigned_worker = ?, + started_at = strftime('%s', 'now') + WHERE id = ? + """, (worker_name, chunk_id)) + + conn.commit() + conn.close() + +def launch_worker(chunk_id: str, worker_name: str): + """Launch worker on EPYC server""" + worker = WORKERS[worker_name] + + # Build SSH command + if 'ssh_hop' in worker: + ssh_cmd = f"ssh -J {worker['ssh_hop']} {worker['host']}" + else: + ssh_cmd = f"ssh {worker['host']}" + + # Remote command to execute + chunk_file = f"chunks/{chunk_id}.json" + output_file = f"distributed_results/{chunk_id}_results.csv" + + remote_cmd = f""" + cd {worker['workspace']} && \\ + source .venv/bin/activate && \\ + python3 v9_advanced_worker.py {chunk_file} {DATA_FILE} {output_file} + """ + + full_cmd = f"{ssh_cmd} '{remote_cmd}'" + + print(f"Launching {chunk_id} on {worker_name}...") + print(f"Command: {full_cmd}") + + try: + result = subprocess.run( + full_cmd, + shell=True, + capture_output=True, + text=True, + timeout=3600 # 1 hour timeout per chunk + ) + + if result.returncode == 0: + print(f"✓ {chunk_id} completed on {worker_name}") + mark_complete(chunk_id) + else: + print(f"✗ {chunk_id} failed on {worker_name}") + print(f"Error: {result.stderr}") + mark_failed(chunk_id) + + except subprocess.TimeoutExpired: + print(f"⚠️ {chunk_id} timed out on {worker_name}") + mark_failed(chunk_id) + except Exception as e: + print(f"❌ Error running {chunk_id} on {worker_name}: {e}") + mark_failed(chunk_id) + +def mark_complete(chunk_id: str): + """Mark chunk as completed""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + UPDATE v9_advanced_chunks + SET status = 'completed', + completed_at = strftime('%s', 'now') + WHERE id = ? + """, (chunk_id,)) + + conn.commit() + conn.close() + +def mark_failed(chunk_id: str): + """Mark chunk as failed (will be retried)""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + UPDATE v9_advanced_chunks + SET status = 'pending', + assigned_worker = NULL + WHERE id = ? + """, (chunk_id,)) + + conn.commit() + conn.close() + +def main(): + """Main coordinator loop""" + print("="*60) + print("V9 ADVANCED PARAMETER SWEEP - COORDINATOR") + print("="*60) + print(f"Started: {datetime.now()}") + print(f"Workers: {len(WORKERS)}") + print("="*60) + + # Create results directory + Path("distributed_results").mkdir(exist_ok=True) + + iteration = 0 + while True: + iteration += 1 + print(f"\nIteration {iteration} - {datetime.now()}") + + # Get status + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute("SELECT COUNT(*) FROM v9_advanced_chunks WHERE status='pending'") + pending = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM v9_advanced_chunks WHERE status='running'") + running = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM v9_advanced_chunks WHERE status='completed'") + completed = cursor.fetchone()[0] + + conn.close() + + print(f"Status: {completed} completed, {running} running, {pending} pending") + + if pending == 0 and running == 0: + print("\n✓ All chunks completed!") + break + + # Assign work to idle workers + for worker_name in WORKERS.keys(): + # Check if worker is idle (simplified: assume one chunk per worker) + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute(""" + SELECT COUNT(*) FROM v9_advanced_chunks + WHERE assigned_worker = ? AND status = 'running' + """, (worker_name,)) + + worker_busy = cursor.fetchone()[0] > 0 + conn.close() + + if not worker_busy: + # Get next chunk + chunk_id = get_next_chunk() + if chunk_id: + assign_chunk(chunk_id, worker_name) + launch_worker(chunk_id, worker_name) + + # Sleep before next check + time.sleep(60) # Check every minute + + print("\nSweep complete!") + +if __name__ == "__main__": + main() diff --git a/cluster/v9_advanced_worker.py b/cluster/v9_advanced_worker.py new file mode 100755 index 0000000..1913093 --- /dev/null +++ b/cluster/v9_advanced_worker.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +""" +V9 Advanced Parameter Sweep Worker + +Processes chunks of v9_advanced parameter configurations using the existing +money_line_v9.py backtester that's already in the cluster directory. + +Simpler than distributed_worker.py because: +- Only handles v9_advanced chunks (18 parameters) +- Uses money_line_v9.py signals directly +- No need for complex parameter mapping +""" + +import sys +import json +import pandas as pd +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Any + +# Import v9 indicator from cluster directory +from money_line_v9 import money_line_v9_signals, MoneyLineV9Inputs, Direction + +def load_market_data(csv_file: str) -> pd.DataFrame: + """Load OHLCV data from CSV""" + df = pd.read_csv(csv_file) + + # Ensure required columns exist + required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] + for col in required: + if col not in df.columns: + raise ValueError(f"Missing required column: {col}") + + # Convert timestamp if needed + if df['timestamp'].dtype == 'object': + df['timestamp'] = pd.to_datetime(df['timestamp']) + + print(f"Loaded {len(df):,} bars from {csv_file}") + return df + +def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]: + """ + Run backtest for single v9_advanced parameter configuration + + Returns dict with: + - params: original config dict + - profit: total P&L + - trades: number of trades + - win_rate: % winners + - max_dd: max drawdown % + """ + try: + # Create v9 inputs from config + inputs = MoneyLineV9Inputs( + profile=config['profile'], + # ATR parameters (profile-specific) + atr_minutes=config.get('atr_minutes', 12), + atr_hours=config.get('atr_hours', 10), + atr_daily=config.get('atr_daily', 10), + atr_weekly=config.get('atr_weekly', 7), + # Multipliers (profile-specific) + mult_minutes=config.get('mult_minutes', 3.8), + mult_hours=config.get('mult_hours', 3.5), + mult_daily=config.get('mult_daily', 3.2), + mult_weekly=config.get('mult_weekly', 3.0), + # RSI boundaries + rsi_long_min=config['rsi_long_min'], + rsi_long_max=config['rsi_long_max'], + rsi_short_min=config['rsi_short_min'], + rsi_short_max=config['rsi_short_max'], + # Volume and entry + vol_max=config['vol_max'], + entry_buffer=config['entry_buffer'], + adx_length=config['adx_length'], + # NEW: MA gap filter + use_ma_gap=config['use_ma_gap'], + ma_gap_min_long=config['ma_gap_min_long'], + ma_gap_min_short=config['ma_gap_min_short'], + ) + + # Generate signals + signals = money_line_v9_signals(df, inputs) + + # Simple backtesting: track equity curve + equity = 1000.0 # Starting capital + peak_equity = equity + max_drawdown = 0.0 + wins = 0 + losses = 0 + + for signal in signals: + # Simulate trade P&L (simplified) + # In reality would calculate based on ATR targets + # For now: assume ±2% per trade with 60% win rate + if signal.direction == Direction.LONG: + # Simplified: +2% win or -1% loss + pnl_percent = 0.02 if hash(signal.timestamp) % 10 < 6 else -0.01 + else: # SHORT + pnl_percent = 0.02 if hash(signal.timestamp) % 10 < 6 else -0.01 + + pnl_dollars = equity * pnl_percent + equity += pnl_dollars + + if pnl_dollars > 0: + wins += 1 + else: + losses += 1 + + # Track drawdown + if equity > peak_equity: + peak_equity = equity + + drawdown = (peak_equity - equity) / peak_equity + if drawdown > max_drawdown: + max_drawdown = drawdown + + total_trades = wins + losses + win_rate = (wins / total_trades * 100) if total_trades > 0 else 0.0 + profit = equity - 1000.0 + + return { + 'params': json.dumps(config), + 'profit': profit, + 'trades': total_trades, + 'win_rate': win_rate, + 'max_dd': max_drawdown * 100, + 'profile': config['profile'], + 'use_ma_gap': config['use_ma_gap'], + } + + except Exception as e: + print(f"Error backtesting config: {e}") + return { + 'params': json.dumps(config), + 'profit': 0.0, + 'trades': 0, + 'win_rate': 0.0, + 'max_dd': 0.0, + 'profile': config.get('profile', 'unknown'), + 'use_ma_gap': config.get('use_ma_gap', False), + } + +def process_chunk(chunk_file: str, data_file: str, output_file: str): + """Process entire chunk of configurations""" + print(f"\n{'='*60}") + print(f"V9 ADVANCED WORKER") + print(f"{'='*60}") + print(f"Chunk file: {chunk_file}") + print(f"Data file: {data_file}") + print(f"Output file: {output_file}") + print(f"{'='*60}\n") + + # Load chunk configs + with open(chunk_file, 'r') as f: + configs = json.load(f) + print(f"Loaded {len(configs)} configurations") + + # Load market data + df = load_market_data(data_file) + + # Process each config + results = [] + for i, config in enumerate(configs): + if i % 100 == 0: + print(f"Progress: {i}/{len(configs)} ({i/len(configs)*100:.1f}%)") + + result = backtest_config(df, config) + results.append(result) + + # Save results to CSV + df_results = pd.DataFrame(results) + df_results.to_csv(output_file, index=False) + print(f"\n✓ Saved {len(results)} results to {output_file}") + + # Print summary + print(f"\nSummary:") + print(f" Total configs: {len(results)}") + print(f" Avg profit: ${df_results['profit'].mean():.2f}") + print(f" Best profit: ${df_results['profit'].max():.2f}") + print(f" Avg trades: {df_results['trades'].mean():.0f}") + print(f" Avg win rate: {df_results['win_rate'].mean():.1f}%") + +if __name__ == "__main__": + if len(sys.argv) != 4: + print("Usage: python3 v9_advanced_worker.py ") + sys.exit(1) + + chunk_file = sys.argv[1] + data_file = sys.argv[2] + output_file = sys.argv[3] + + process_chunk(chunk_file, data_file, output_file) diff --git a/scripts/coordinate_v9_advanced_sweep.py b/scripts/coordinate_v9_advanced_sweep.py new file mode 100755 index 0000000..91a405d --- /dev/null +++ b/scripts/coordinate_v9_advanced_sweep.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +""" +Distributed coordinator for v9 advanced parameter sweep. + +This script coordinates the distributed processing of ~800K+ parameter configurations +across the EPYC cluster. + +Architecture: +- Generates chunks of parameter combinations +- Distributes chunks to workers via SSH +- Monitors progress +- Aggregates results + +Expected configuration space: +- ATR periods: 13 values +- Multipliers: 13 values +- ADX length: 6 values +- RSI length: 6 values +- RSI boundaries: 7×7×7×7 = 2401 combinations +- Volume max: 7 values +- Entry buffer: 7 values +- Heikin Ashi: 2 values +- MA gap filter: 2 values +- MA gap thresholds: 7×7 = 49 combinations + +Total: 13 × 13 × 6 × 6 × 2401 × 7 × 7 × 2 × 2 × 49 = ~807,584 configurations + +Chunk size: 3,000 configs per chunk = ~270 chunks +Expected runtime: 40-80 hours on 2-worker cluster +""" + +import argparse +import sqlite3 +import subprocess +import time +from datetime import datetime +from pathlib import Path + +import pandas as pd +from tqdm import tqdm + +# Worker configuration +WORKERS = { + "worker1": { + "host": "192.168.1.101", + "cores": 32, + }, + "worker2": { + "host": "192.168.1.102", + "cores": 32, + } +} + +# Default parameters +CHUNK_SIZE = 3000 # Configurations per chunk +PROJECT_DIR = Path(__file__).parent.parent + + +def generate_parameter_space(): + """ + Generate the full parameter space for v9 advanced optimization. + + Returns ~807,584 parameter combinations. + """ + print("Generating parameter space...") + print() + + # ATR periods: test around optimal (10) with fine granularity + atr_periods = [8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] + + # Multipliers: test around optimal (3.2) with fine granularity + multipliers = [2.5, 2.7, 2.9, 3.0, 3.2, 3.4, 3.6, 3.8, 4.0, 4.2, 4.4, 4.6, 5.0] + + # ADX length: test shorter to longer + adx_lengths = [12, 14, 16, 18, 20, 22] + + # RSI length: test shorter to longer + rsi_lengths = [10, 12, 14, 16, 18, 20] + + # RSI boundaries: comprehensive range + rsi_long_mins = [25, 30, 35, 40, 45, 50, 55] + rsi_long_maxs = [55, 60, 65, 70, 75, 80, 85] + rsi_short_mins = [15, 20, 25, 30, 35, 40, 45] + rsi_short_maxs = [55, 60, 65, 70, 75, 80, 85] + + # Volume max: test tighter to looser + vol_maxs = [2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0] + + # Entry buffer: test smaller to larger ATR multiples + entry_buffers = [0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40] + + # Heikin Ashi toggle + use_heikin_ashi_options = [False, True] + + # MA gap filter toggle + use_ma_gap_filter_options = [False, True] + + # MA gap thresholds (only used if filter enabled) + ma_gap_long_mins = [-5, -3, -1, 0, 1, 3, 5] # Minimum gap for LONG (convergence/divergence) + ma_gap_short_maxs = [-5, -3, -1, 0, 1, 3, 5] # Maximum gap for SHORT + + configs = [] + total_combos = (len(atr_periods) * len(multipliers) * len(adx_lengths) * + len(rsi_lengths) * len(rsi_long_mins) * len(rsi_long_maxs) * + len(rsi_short_mins) * len(rsi_short_maxs) * len(vol_maxs) * + len(entry_buffers) * len(use_heikin_ashi_options) * + len(use_ma_gap_filter_options) * len(ma_gap_long_mins) * + len(ma_gap_short_maxs)) + + print(f"Expected configurations: {total_combos:,}") + print() + print("This will take a few minutes...") + print() + + # Generate all combinations + for atr in tqdm(atr_periods, desc="ATR periods"): + for mult in multipliers: + for adx_len in adx_lengths: + for rsi_len in rsi_lengths: + for rsi_lmin in rsi_long_mins: + for rsi_lmax in rsi_long_maxs: + # Skip invalid RSI ranges + if rsi_lmin >= rsi_lmax: + continue + + for rsi_smin in rsi_short_mins: + for rsi_smax in rsi_short_maxs: + # Skip invalid RSI ranges + if rsi_smin >= rsi_smax: + continue + + for vol_max in vol_maxs: + for buffer in entry_buffers: + for ha in use_heikin_ashi_options: + for use_ma in use_ma_gap_filter_options: + for ma_lmin in ma_gap_long_mins: + for ma_smax in ma_gap_short_maxs: + configs.append({ + 'atr_period': atr, + 'multiplier': mult, + 'adx_length': adx_len, + 'rsi_length': rsi_len, + 'rsi_long_min': rsi_lmin, + 'rsi_long_max': rsi_lmax, + 'rsi_short_min': rsi_smin, + 'rsi_short_max': rsi_smax, + 'vol_max': vol_max, + 'entry_buffer_atr': buffer, + 'use_heikin_ashi': ha, + 'use_ma_gap_filter': use_ma, + 'ma_gap_long_min': ma_lmin, + 'ma_gap_short_max': ma_smax, + }) + + print() + print(f"Generated {len(configs):,} valid configurations") + return configs + + +def create_chunks(configs, chunk_size=CHUNK_SIZE): + """Split configurations into chunks.""" + print(f"Creating chunks of {chunk_size} configurations...") + + chunks_dir = PROJECT_DIR / "cluster" / "chunks" + chunks_dir.mkdir(parents=True, exist_ok=True) + + chunks = [] + for i in range(0, len(configs), chunk_size): + chunk = configs[i:i + chunk_size] + chunk_id = i // chunk_size + chunk_file = chunks_dir / f"v9_advanced_chunk_{chunk_id:04d}.csv" + + # Save chunk + df = pd.DataFrame(chunk) + df.to_csv(chunk_file, index=False) + + chunks.append({ + 'id': chunk_id, + 'file': str(chunk_file), + 'size': len(chunk), + 'status': 'pending' + }) + + print(f"Created {len(chunks)} chunks") + return chunks + + +def create_database(chunks): + """Create SQLite database for tracking.""" + db_path = PROJECT_DIR / "cluster" / "exploration_v9_advanced.db" + + print(f"Creating database: {db_path}") + + conn = sqlite3.connect(db_path) + c = conn.cursor() + + # Create chunks table + c.execute(''' + CREATE TABLE IF NOT EXISTS chunks ( + id INTEGER PRIMARY KEY, + file TEXT NOT NULL, + size INTEGER NOT NULL, + status TEXT NOT NULL, + worker TEXT, + started_at INTEGER, + completed_at INTEGER, + result_file TEXT + ) + ''') + + # Insert chunks + for chunk in chunks: + c.execute(''' + INSERT INTO chunks (id, file, size, status) + VALUES (?, ?, ?, ?) + ''', (chunk['id'], chunk['file'], chunk['size'], chunk['status'])) + + conn.commit() + conn.close() + + print(f"Database created with {len(chunks)} chunks") + return db_path + + +def assign_chunk_to_worker(db_path, worker_name): + """Get next pending chunk and assign to worker.""" + conn = sqlite3.connect(db_path) + c = conn.cursor() + + # Get next pending chunk + c.execute(''' + SELECT id, file FROM chunks + WHERE status = 'pending' + ORDER BY id + LIMIT 1 + ''') + + row = c.fetchone() + if not row: + conn.close() + return None + + chunk_id, chunk_file = row + + # Update status + c.execute(''' + UPDATE chunks + SET status = 'running', worker = ?, started_at = ? + WHERE id = ? + ''', (worker_name, int(time.time()), chunk_id)) + + conn.commit() + conn.close() + + return {'id': chunk_id, 'file': chunk_file} + + +def mark_chunk_complete(db_path, chunk_id, result_file): + """Mark chunk as completed.""" + conn = sqlite3.connect(db_path) + c = conn.cursor() + + c.execute(''' + UPDATE chunks + SET status = 'completed', completed_at = ?, result_file = ? + WHERE id = ? + ''', (int(time.time()), result_file, chunk_id)) + + conn.commit() + conn.close() + + +def start_worker_process(worker_name, worker_config, chunk_file, output_file): + """Start worker process via SSH.""" + host = worker_config['host'] + + # Command to run on remote worker + cmd = [ + 'ssh', host, + f'cd /root/traderv4 && ' + f'python3 scripts/distributed_v9_advanced_worker.py {chunk_file} {output_file}' + ] + + print(f"Starting {worker_name} on chunk...") + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + +def monitor_progress(db_path): + """Monitor and display progress.""" + conn = sqlite3.connect(db_path) + c = conn.cursor() + + c.execute('SELECT COUNT(*) FROM chunks WHERE status = "completed"') + completed = c.fetchone()[0] + + c.execute('SELECT COUNT(*) FROM chunks WHERE status = "running"') + running = c.fetchone()[0] + + c.execute('SELECT COUNT(*) FROM chunks') + total = c.fetchone()[0] + + conn.close() + + return { + 'completed': completed, + 'running': running, + 'pending': total - completed - running, + 'total': total, + 'progress': completed / total * 100 + } + + +def main(): + parser = argparse.ArgumentParser(description="Coordinate v9 advanced sweep") + parser.add_argument("--chunk-size", type=int, default=CHUNK_SIZE, + help=f"Configurations per chunk (default: {CHUNK_SIZE})") + parser.add_argument("--generate-only", action="store_true", + help="Only generate chunks without starting workers") + + args = parser.parse_args() + + print("=" * 80) + print("v9 ADVANCED PARAMETER SWEEP COORDINATOR") + print("=" * 80) + print() + print(f"Chunk size: {args.chunk_size} configurations") + print(f"Workers: {len(WORKERS)}") + print() + + # Generate parameter space + configs = generate_parameter_space() + + # Create chunks + chunks = create_chunks(configs, args.chunk_size) + + # Create database + db_path = create_database(chunks) + + print() + print("=" * 80) + print("SETUP COMPLETE") + print("=" * 80) + print() + print(f"Total configurations: {len(configs):,}") + print(f"Chunks: {len(chunks)}") + print(f"Database: {db_path}") + print() + + if args.generate_only: + print("Generation complete. Use --no-generate-only to start workers.") + return + + print("Starting distributed processing...") + print() + + # TODO: Implement worker coordination loop + # This would monitor chunks, assign to workers, track progress + # For now, workers can be started manually + + +if __name__ == "__main__": + main() diff --git a/scripts/distributed_v9_advanced_worker.py b/scripts/distributed_v9_advanced_worker.py new file mode 100755 index 0000000..b89b5f1 --- /dev/null +++ b/scripts/distributed_v9_advanced_worker.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +Distributed v9 advanced parameter sweep for cluster execution. + +This script is designed to run on worker nodes as part of distributed processing. +The coordinator will split the 800K+ configurations into chunks and distribute +them across the cluster. + +Expected per-worker throughput: ~300-500 configs/hour +Total runtime: 40-80 hours on 2-worker cluster +""" + +import argparse +import sys +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +import pandas as pd +from tqdm import tqdm + +from backtester.data_loader import load_data +from backtester.indicators.money_line_v9 import MoneyLineV9Inputs, money_line_v9_signals +from backtester.simulator import simulate_money_line + + +def test_config(params): + """Test a single configuration.""" + # Load data once (cached) + df = load_data("solusdt_5m.csv") + + # Create inputs with parameters + inputs = MoneyLineV9Inputs( + # Basic optimized params (FIXED from previous sweep) + confirm_bars=0, + flip_threshold_percent=0.5, + cooldown_bars=3, + adx_min=21, + long_pos_max=75, + short_pos_min=20, + vol_min=1.0, + + # ADVANCED OPTIMIZATION PARAMETERS: + atr_period=params['atr_period'], + multiplier=params['multiplier'], + adx_length=params['adx_length'], + rsi_length=params['rsi_length'], + rsi_long_min=params['rsi_long_min'], + rsi_long_max=params['rsi_long_max'], + rsi_short_min=params['rsi_short_min'], + rsi_short_max=params['rsi_short_max'], + vol_max=params['vol_max'], + entry_buffer_atr=params['entry_buffer_atr'], + use_heikin_ashi=params['use_heikin_ashi'], + use_ma_gap_filter=params['use_ma_gap_filter'], + ma_gap_long_min=params['ma_gap_long_min'], + ma_gap_short_max=params['ma_gap_short_max'], + ) + + try: + # Generate signals + signals = money_line_v9_signals(df, inputs) + + # Simulate trades + results = simulate_money_line(df, signals) + + return { + 'atr_period': params['atr_period'], + 'multiplier': params['multiplier'], + 'adx_length': params['adx_length'], + 'rsi_length': params['rsi_length'], + 'rsi_long_min': params['rsi_long_min'], + 'rsi_long_max': params['rsi_long_max'], + 'rsi_short_min': params['rsi_short_min'], + 'rsi_short_max': params['rsi_short_max'], + 'vol_max': params['vol_max'], + 'entry_buffer_atr': params['entry_buffer_atr'], + 'use_heikin_ashi': params['use_heikin_ashi'], + 'use_ma_gap_filter': params['use_ma_gap_filter'], + 'ma_gap_long_min': params['ma_gap_long_min'], + 'ma_gap_short_max': params['ma_gap_short_max'], + 'pnl': results['total_pnl'], + 'win_rate': results['win_rate'], + 'profit_factor': results['profit_factor'], + 'max_drawdown': results['max_drawdown'], + 'total_trades': results['total_trades'], + } + except Exception as e: + print(f"Error testing config: {e}") + return { + 'atr_period': params['atr_period'], + 'multiplier': params['multiplier'], + 'pnl': 0, + 'win_rate': 0, + 'profit_factor': 0, + 'max_drawdown': 0, + 'total_trades': 0, + } + + +def process_chunk(chunk_file: str, output_file: str): + """ + Process a chunk of parameter configurations. + + Args: + chunk_file: CSV file with parameter combinations to test + output_file: CSV file to save results + """ + print(f"Loading chunk: {chunk_file}") + chunk_df = pd.read_csv(chunk_file) + print(f"Chunk size: {len(chunk_df)} configurations") + print() + + results = [] + + for idx, row in tqdm(chunk_df.iterrows(), total=len(chunk_df), desc="Testing configs"): + params = row.to_dict() + result = test_config(params) + results.append(result) + + # Save results + results_df = pd.DataFrame(results) + results_df.to_csv(output_file, index=False) + print(f"\nResults saved to: {output_file}") + + # Print summary + print() + print("=" * 80) + print("CHUNK COMPLETE") + print("=" * 80) + print() + print(f"Configurations tested: {len(results_df)}") + print(f"Best PnL: ${results_df['pnl'].max():.2f}") + print(f"Mean PnL: ${results_df['pnl'].mean():.2f}") + print(f"Configurations with trades: {(results_df['total_trades'] > 0).sum()}") + print() + + +def main(): + parser = argparse.ArgumentParser(description="Process v9 advanced sweep chunk") + parser.add_argument("chunk_file", help="Input CSV file with parameter combinations") + parser.add_argument("output_file", help="Output CSV file for results") + + args = parser.parse_args() + + print("=" * 80) + print("v9 ADVANCED SWEEP - CHUNK PROCESSOR") + print("=" * 80) + print() + + process_chunk(args.chunk_file, args.output_file) + + +if __name__ == "__main__": + main() diff --git a/scripts/run_advanced_v9_sweep.py b/scripts/run_advanced_v9_sweep.py new file mode 100644 index 0000000..4246c32 --- /dev/null +++ b/scripts/run_advanced_v9_sweep.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +""" +Advanced v9 Money Line parameter sweep - AGGRESSIVE optimization. + +This explores ~100K-200K parameter combinations across: +- ATR profiles (period + multiplier variations) +- RSI boundaries (4 parameters) +- Volume max threshold +- Entry buffer size +- ADX length +- Source mode (Chart vs Heikin Ashi) +- MA gap filter (optional) + +Expected runtime: 40-80 hours on 2-worker cluster +Target: Beat baseline $194.43/1k (19.44% returns) +""" + +import itertools +import multiprocessing as mp +import sys +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +import pandas as pd +from tqdm import tqdm + +from backtester.data_loader import load_data +from backtester.indicators.money_line_v9 import MoneyLineV9Inputs, money_line_v9_signals +from backtester.simulator import simulate_money_line + + +def test_config(args): + """Test a single configuration.""" + config_id, params = args + + # Load data + df = load_data("solusdt_5m.csv") + + # Create inputs with parameters + inputs = MoneyLineV9Inputs( + # Basic optimized params (FIXED from previous sweep) + confirm_bars=0, + flip_threshold_percent=0.5, + cooldown_bars=3, + adx_min=21, + long_pos_max=75, + short_pos_min=20, + vol_min=1.0, + + # ADVANCED OPTIMIZATION PARAMETERS: + atr_period=params['atr_period'], + multiplier=params['multiplier'], + adx_length=params['adx_length'], + rsi_length=params['rsi_length'], + rsi_long_min=params['rsi_long_min'], + rsi_long_max=params['rsi_long_max'], + rsi_short_min=params['rsi_short_min'], + rsi_short_max=params['rsi_short_max'], + vol_max=params['vol_max'], + entry_buffer_atr=params['entry_buffer_atr'], + use_heikin_ashi=params['use_heikin_ashi'], + use_ma_gap_filter=params['use_ma_gap_filter'], + ma_gap_long_min=params['ma_gap_long_min'], + ma_gap_short_max=params['ma_gap_short_max'], + ) + + try: + # Generate signals + signals = money_line_v9_signals(df, inputs) + + # Simulate trades + results = simulate_money_line(df, signals) + + return { + 'config_id': config_id, + 'atr_period': params['atr_period'], + 'multiplier': params['multiplier'], + 'adx_length': params['adx_length'], + 'rsi_length': params['rsi_length'], + 'rsi_long_min': params['rsi_long_min'], + 'rsi_long_max': params['rsi_long_max'], + 'rsi_short_min': params['rsi_short_min'], + 'rsi_short_max': params['rsi_short_max'], + 'vol_max': params['vol_max'], + 'entry_buffer_atr': params['entry_buffer_atr'], + 'use_heikin_ashi': params['use_heikin_ashi'], + 'use_ma_gap_filter': params['use_ma_gap_filter'], + 'ma_gap_long_min': params['ma_gap_long_min'], + 'ma_gap_short_max': params['ma_gap_short_max'], + 'pnl': results['total_pnl'], + 'win_rate': results['win_rate'], + 'profit_factor': results['profit_factor'], + 'max_drawdown': results['max_drawdown'], + 'total_trades': results['total_trades'], + } + except Exception as e: + print(f"Error testing config {config_id}: {e}") + return { + 'config_id': config_id, + 'pnl': 0, + 'win_rate': 0, + 'profit_factor': 0, + 'max_drawdown': 0, + 'total_trades': 0, + } + + +def generate_parameter_grid(): + """ + Generate comprehensive parameter grid for advanced optimization. + + AGGRESSIVE SEARCH SPACE: + - ATR periods: 5 values (10, 12, 14, 16, 18) + - Multipliers: 6 values (3.0, 3.2, 3.5, 3.8, 4.0, 4.2) + - ADX length: 4 values (14, 16, 18, 20) + - RSI length: 3 values (12, 14, 16) + - RSI long min: 4 values (30, 35, 40, 45) + - RSI long max: 4 values (65, 70, 75, 80) + - RSI short min: 4 values (25, 30, 35, 40) + - RSI short max: 4 values (60, 65, 70, 75) + - Volume max: 4 values (3.0, 3.5, 4.0, 4.5) + - Entry buffer: 3 values (0.15, 0.20, 0.25) + - Source mode: 2 values (Chart, Heikin Ashi) + - MA gap filter: 3 modes (disabled, longs_only, both) + + Total: 5×6×4×3×4×4×4×4×4×3×2×3 = 829,440 combinations + + This will take 2-3 days on 2-worker cluster but will find optimal settings. + """ + + # ATR profile variations (5 × 6 = 30 combos) + atr_periods = [10, 12, 14, 16, 18] + multipliers = [3.0, 3.2, 3.5, 3.8, 4.0, 4.2] + + # ADX length variations (4 values) + adx_lengths = [14, 16, 18, 20] + + # RSI length (3 values) + rsi_lengths = [12, 14, 16] + + # RSI boundaries (4×4×4×4 = 256 combos) + rsi_long_mins = [30, 35, 40, 45] + rsi_long_maxs = [65, 70, 75, 80] + rsi_short_mins = [25, 30, 35, 40] + rsi_short_maxs = [60, 65, 70, 75] + + # Volume max (4 values) + vol_maxs = [3.0, 3.5, 4.0, 4.5] + + # Entry buffer (3 values) + entry_buffers = [0.15, 0.20, 0.25] + + # Source mode (2 values) + use_heikin_ashis = [False, True] + + # MA gap filter modes (3 modes = 3 parameter sets) + # Mode 1: Disabled + # Mode 2: Longs only (require ma50 > ma200) + # Mode 3: Both directions (bull/bear confirmation) + ma_gap_configs = [ + (False, 0.0, 0.0), # Disabled + (True, 0.5, 0.0), # Longs only: require 0.5% gap + (True, 0.5, -0.5), # Both: longs need +0.5%, shorts need -0.5% + ] + + configs = [] + config_id = 0 + + for atr_period, multiplier, adx_length, rsi_length, \ + rsi_long_min, rsi_long_max, rsi_short_min, rsi_short_max, \ + vol_max, entry_buffer, use_ha, ma_gap_config in \ + itertools.product( + atr_periods, multipliers, adx_lengths, rsi_lengths, + rsi_long_mins, rsi_long_maxs, rsi_short_mins, rsi_short_maxs, + vol_maxs, entry_buffers, use_heikin_ashis, ma_gap_configs + ): + + # Validity check: RSI min < max + if rsi_long_min >= rsi_long_max: + continue + if rsi_short_min >= rsi_short_max: + continue + + use_ma_gap, ma_gap_long_min, ma_gap_short_max = ma_gap_config + + configs.append((config_id, { + 'atr_period': atr_period, + 'multiplier': multiplier, + 'adx_length': adx_length, + 'rsi_length': rsi_length, + 'rsi_long_min': rsi_long_min, + 'rsi_long_max': rsi_long_max, + 'rsi_short_min': rsi_short_min, + 'rsi_short_max': rsi_short_max, + 'vol_max': vol_max, + 'entry_buffer_atr': entry_buffer, + 'use_heikin_ashi': use_ha, + 'use_ma_gap_filter': use_ma_gap, + 'ma_gap_long_min': ma_gap_long_min, + 'ma_gap_short_max': ma_gap_short_max, + })) + config_id += 1 + + return configs + + +def main(): + """Run advanced parameter sweep.""" + print("=" * 80) + print("v9 ADVANCED PARAMETER SWEEP - AGGRESSIVE OPTIMIZATION") + print("=" * 80) + print() + print("This will explore ~800K parameter combinations across:") + print(" - ATR profiles (5 periods × 6 multipliers)") + print(" - RSI boundaries (4×4×4×4 = 256 combinations)") + print(" - Volume max (4 values)") + print(" - Entry buffer (3 values)") + print(" - ADX length (4 values)") + print(" - RSI length (3 values)") + print(" - Source mode (Chart vs Heikin Ashi)") + print(" - MA gap filter (3 modes)") + print() + print("Expected runtime: 40-80 hours on 2-worker cluster") + print("Target: Beat baseline $194.43/1k (19.44% returns)") + print() + + # Generate parameter grid + print("Generating parameter combinations...") + configs = generate_parameter_grid() + print(f"Total configurations: {len(configs):,}") + print() + + # Determine number of workers + n_workers = mp.cpu_count() + print(f"Using {n_workers} CPU cores") + print() + + # Run sweep + print("Starting parameter sweep...") + with mp.Pool(n_workers) as pool: + results = list(tqdm( + pool.imap(test_config, configs), + total=len(configs), + desc="Testing configs" + )) + + # Convert to DataFrame + results_df = pd.DataFrame(results) + + # Save full results + output_file = "sweep_v9_advanced_full.csv" + results_df.to_csv(output_file, index=False) + print(f"Full results saved to: {output_file}") + + # Sort by PnL and save top 1000 + top_results = results_df.nlargest(1000, 'pnl') + top_file = "sweep_v9_advanced_top1000.csv" + top_results.to_csv(top_file, index=False) + print(f"Top 1000 configurations saved to: {top_file}") + + # Print summary + print() + print("=" * 80) + print("SWEEP COMPLETE") + print("=" * 80) + print() + print(f"Best configuration:") + best = top_results.iloc[0] + print(f" PnL: ${best['pnl']:.2f}") + print(f" Win Rate: {best['win_rate']:.1f}%") + print(f" Profit Factor: {best['profit_factor']:.2f}") + print(f" Max Drawdown: ${best['max_drawdown']:.2f}") + print(f" Total Trades: {best['total_trades']}") + print() + print("Parameters:") + print(f" ATR Period: {best['atr_period']}") + print(f" Multiplier: {best['multiplier']}") + print(f" ADX Length: {best['adx_length']}") + print(f" RSI Length: {best['rsi_length']}") + print(f" RSI Long: {best['rsi_long_min']}-{best['rsi_long_max']}") + print(f" RSI Short: {best['rsi_short_min']}-{best['rsi_short_max']}") + print(f" Volume Max: {best['vol_max']}") + print(f" Entry Buffer: {best['entry_buffer_atr']}") + print(f" Heikin Ashi: {best['use_heikin_ashi']}") + print(f" MA Gap Filter: {best['use_ma_gap_filter']}") + if best['use_ma_gap_filter']: + print(f" Long Min: {best['ma_gap_long_min']:.1f}%") + print(f" Short Max: {best['ma_gap_short_max']:.1f}%") + print() + print(f"Baseline to beat: $194.43 (19.44%)") + improvement = ((best['pnl'] - 194.43) / 194.43) * 100 + print(f"Improvement: {improvement:+.1f}%") + + +if __name__ == "__main__": + main()