diff --git a/backtester/__pycache__/v11_moneyline_all_filters.cpython-312.pyc b/backtester/__pycache__/v11_moneyline_all_filters.cpython-312.pyc new file mode 100644 index 0000000..86afecd Binary files /dev/null and b/backtester/__pycache__/v11_moneyline_all_filters.cpython-312.pyc differ diff --git a/backtester/v11_moneyline_all_filters.py b/backtester/v11_moneyline_all_filters.py new file mode 100644 index 0000000..5677715 --- /dev/null +++ b/backtester/v11_moneyline_all_filters.py @@ -0,0 +1,321 @@ +""" +v11 "Money Line All Filters" indicator implementation for backtesting. + +CRITICAL DIFFERENCE FROM v9: +- v11: ALL filters actually applied to signals (useQualityFilters toggle) +- v9 bug: Filters calculated but signals ignored them + +Based on moneyline_v11_all_filters.pinescript lines 271-272: + finalLongSignal = buyReady and (not useQualityFilters or (longOk and adxOk and longBufferOk and longPositionOk and volumeOk and rsiLongOk)) + finalShortSignal = sellReady and (not useQualityFilters or (shortOk and adxOk and shortBufferOk and shortPositionOk and volumeOk and rsiShortOk)) + +Test sweep parameters (8 params × 2 values = 256 combinations): +- flip_threshold: 0.5, 0.6 +- adx_min: 18, 21 +- long_pos_max: 75, 80 +- short_pos_min: 20, 25 +- vol_min: 0.8, 1.0 +- entry_buffer_atr: 0.15, 0.20 +- rsi_long_min: 35, 40 +- rsi_short_max: 65, 70 +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + +import numpy as np +import pandas as pd + +from backtester.math_utils import calculate_adx, calculate_atr, rma + +Direction = Literal["long", "short"] + + +@dataclass +class MoneyLineV11Inputs: + """v11 Money Line indicator parameters for test sweep.""" + + # Basic Money Line parameters (fixed for test) + confirm_bars: int = 0 # Immediate signals + cooldown_bars: int = 3 # Prevent overtrading + + # ATR profile (fixed for test - 5-minute chart defaults) + atr_period: int = 12 # ATR calculation length + multiplier: float = 3.8 # ATR band multiplier + + # Filter parameters (8 parameters being optimized) + flip_threshold: float = 0.5 # % price must move to flip (TEST: 0.5, 0.6) + adx_min: float = 21 # Minimum ADX for signal (TEST: 18, 21) + long_pos_max: float = 75 # Don't long above X% of range (TEST: 75, 80) + short_pos_min: float = 20 # Don't short below X% of range (TEST: 20, 25) + vol_min: float = 1.0 # Minimum volume ratio (TEST: 0.8, 1.0) + entry_buffer_atr: float = 0.20 # ATR buffer beyond line (TEST: 0.15, 0.20) + rsi_long_min: float = 35 # RSI minimum for longs (TEST: 35, 40) + rsi_short_max: float = 70 # RSI maximum for shorts (TEST: 65, 70) + + # Fixed filter parameters (not being optimized in test) + adx_length: int = 16 # ADX calculation length + rsi_length: int = 14 # RSI calculation length + vol_max: float = 3.5 # Maximum volume ratio + rsi_long_max: float = 70 # RSI maximum for longs + rsi_short_min: float = 30 # RSI minimum for shorts + + +@dataclass +class MoneyLineV11Signal: + timestamp: pd.Timestamp + direction: Direction + entry_price: float + adx: float + atr: float + rsi: float + volume_ratio: float + price_position: float + + +def ema(series: pd.Series, length: int) -> pd.Series: + """Exponential Moving Average.""" + return series.ewm(span=length, adjust=False).mean() + + +def rolling_volume_ratio(volume: pd.Series, length: int = 20) -> pd.Series: + """Volume ratio vs moving average.""" + avg = volume.rolling(length).mean() + return volume / avg + + +def price_position(high: pd.Series, low: pd.Series, close: pd.Series, length: int = 100) -> pd.Series: + """Price position in percentage of range (0-100).""" + highest = high.rolling(length).max() + lowest = low.rolling(length).min() + return 100.0 * (close - lowest) / (highest - lowest) + + +def rsi(series: pd.Series, length: int) -> pd.Series: + """Relative Strength Index.""" + delta = series.diff() + gain = np.where(delta > 0, delta, 0.0) + loss = np.where(delta < 0, -delta, 0.0) + avg_gain = rma(pd.Series(gain), length) + avg_loss = rma(pd.Series(loss), length) + rs = avg_gain / avg_loss.replace(0, np.nan) + rsi_series = 100 - (100 / (1 + rs)) + return rsi_series.fillna(50.0) + + +def supertrend_v11(df: pd.DataFrame, atr_period: int, multiplier: float, + flip_threshold: float, confirm_bars: int) -> tuple[pd.Series, pd.Series]: + """ + Calculate v11 Money Line (Supertrend with flip threshold). + + Returns: + (supertrend_line, trend): Line values and trend direction (1=bull, -1=bear) + """ + # Use chart prices (not Heikin Ashi for test) + high, low, close = df['high'], df['low'], df['close'] + + # Calculate ATR + tr = pd.concat([ + high - low, + (high - close.shift(1)).abs(), + (low - close.shift(1)).abs() + ], axis=1).max(axis=1) + atr = rma(tr, atr_period) + + # Supertrend bands + src = (high + low) / 2 + up = src - (multiplier * atr) + dn = src + (multiplier * atr) + + # Initialize tracking arrays + up1 = up.copy() + dn1 = dn.copy() + trend = pd.Series(1, index=df.index) # Start bullish + tsl = up1.copy() # Trailing stop line + + # Momentum tracking for anti-whipsaw + bull_momentum = pd.Series(0, index=df.index) + bear_momentum = pd.Series(0, index=df.index) + + # Calculate flip threshold + threshold = flip_threshold / 100.0 + + for i in range(1, len(df)): + # Update bands + if close.iloc[i-1] > up1.iloc[i-1]: + up1.iloc[i] = max(up.iloc[i], up1.iloc[i-1]) + else: + up1.iloc[i] = up.iloc[i] + + if close.iloc[i-1] < dn1.iloc[i-1]: + dn1.iloc[i] = min(dn.iloc[i], dn1.iloc[i-1]) + else: + dn1.iloc[i] = dn.iloc[i] + + # Get previous trend and tsl + prev_trend = trend.iloc[i-1] + prev_tsl = tsl.iloc[i-1] + + # Update TSL based on trend + if prev_trend == 1: + tsl.iloc[i] = max(up1.iloc[i], prev_tsl) + else: + tsl.iloc[i] = min(dn1.iloc[i], prev_tsl) + + # Check for flip with threshold and momentum + threshold_amount = tsl.iloc[i] * threshold + + if prev_trend == 1: + # Currently bullish - check for bearish flip + if close.iloc[i] < (tsl.iloc[i] - threshold_amount): + bear_momentum.iloc[i] = bear_momentum.iloc[i-1] + 1 + bull_momentum.iloc[i] = 0 + else: + bear_momentum.iloc[i] = 0 + bull_momentum.iloc[i] = 0 + + # Flip after confirm_bars + 1 consecutive bearish bars + if bear_momentum.iloc[i] >= (confirm_bars + 1): + trend.iloc[i] = -1 + else: + trend.iloc[i] = 1 + else: + # Currently bearish - check for bullish flip + if close.iloc[i] > (tsl.iloc[i] + threshold_amount): + bull_momentum.iloc[i] = bull_momentum.iloc[i-1] + 1 + bear_momentum.iloc[i] = 0 + else: + bull_momentum.iloc[i] = 0 + bear_momentum.iloc[i] = 0 + + # Flip after confirm_bars + 1 consecutive bullish bars + if bull_momentum.iloc[i] >= (confirm_bars + 1): + trend.iloc[i] = 1 + else: + trend.iloc[i] = -1 + + return tsl, trend + + +def money_line_v11_signals(df: pd.DataFrame, inputs: Optional[MoneyLineV11Inputs] = None) -> list[MoneyLineV11Signal]: + """ + v11 "Money Line All Filters" signal generation. + + CRITICAL: ALL filters applied to signals (this is what makes v11 different from v9 bug). + + From pinescript lines 271-272: + finalLongSignal = buyReady and (longOk and adxOk and longBufferOk and longPositionOk and volumeOk and rsiLongOk) + finalShortSignal = sellReady and (shortOk and adxOk and shortBufferOk and shortPositionOk and volumeOk and rsiShortOk) + + Filters applied: + - ADX minimum (trend strength) + - Entry buffer (price beyond line by X*ATR) + - Price position (don't chase extremes) + - Volume ratio (avoid dead/overheated) + - RSI boundaries (momentum confirmation) + """ + if inputs is None: + inputs = MoneyLineV11Inputs() + + data = df.copy() + data = data.sort_index() + + # Calculate Money Line + supertrend, trend = supertrend_v11( + data, + inputs.atr_period, + inputs.multiplier, + inputs.flip_threshold, + inputs.confirm_bars + ) + data['supertrend'] = supertrend + data['trend'] = trend + + # Calculate indicators + data["rsi"] = rsi(data["close"], inputs.rsi_length) + data["atr"] = calculate_atr(data, inputs.atr_period) + data["adx"] = calculate_adx(data, inputs.adx_length) + data["volume_ratio"] = rolling_volume_ratio(data["volume"]) + data["price_position"] = price_position(data["high"], data["low"], data["close"]) + + signals: list[MoneyLineV11Signal] = [] + cooldown_remaining = 0 + + # Skip warmup period (200 bars for price position) + warmup_bars = 200 + + for idx in range(max(1, warmup_bars), len(data)): + row = data.iloc[idx] + prev = data.iloc[idx - 1] + + # Detect trend flip (buyReady/sellReady in pinescript) + flip_long = prev.trend == -1 and row.trend == 1 + flip_short = prev.trend == 1 and row.trend == -1 + + if cooldown_remaining > 0: + cooldown_remaining -= 1 + continue + + # V11 CRITICAL: Apply ALL filters (this is what was broken in v9) + + # ADX filter (adxOk) + adx_ok = row.adx >= inputs.adx_min + + # Volume filter (volumeOk) + volume_ok = inputs.vol_min <= row.volume_ratio <= inputs.vol_max + + if flip_long: + # Entry buffer check (longBufferOk) + entry_buffer_ok = row.close > (row.supertrend + inputs.entry_buffer_atr * row.atr) + + # Long filters + rsi_ok = inputs.rsi_long_min <= row.rsi <= inputs.rsi_long_max # rsiLongOk + pos_ok = row.price_position < inputs.long_pos_max # longPositionOk + + # V11: ALL filters must pass (this is the fix from v9) + if adx_ok and volume_ok and rsi_ok and pos_ok and entry_buffer_ok: + signals.append( + MoneyLineV11Signal( + timestamp=row.name, + direction="long", + entry_price=float(row.close), + adx=float(row.adx), + atr=float(row.atr), + rsi=float(row.rsi), + volume_ratio=float(row.volume_ratio), + price_position=float(row.price_position), + ) + ) + cooldown_remaining = inputs.cooldown_bars + + elif flip_short: + # Entry buffer check (shortBufferOk) + entry_buffer_ok = row.close < (row.supertrend - inputs.entry_buffer_atr * row.atr) + + # Short filters + rsi_ok = inputs.rsi_short_min <= row.rsi <= inputs.rsi_short_max # rsiShortOk + pos_ok = row.price_position > inputs.short_pos_min # shortPositionOk + + # V11: ALL filters must pass (this is the fix from v9) + if adx_ok and volume_ok and rsi_ok and pos_ok and entry_buffer_ok: + signals.append( + MoneyLineV11Signal( + timestamp=row.name, + direction="short", + entry_price=float(row.close), + adx=float(row.adx), + atr=float(row.atr), + rsi=float(row.rsi), + volume_ratio=float(row.volume_ratio), + price_position=float(row.price_position), + ) + ) + cooldown_remaining = inputs.cooldown_bars + + return signals diff --git a/cluster/__pycache__/v11_test_coordinator.cpython-312.pyc b/cluster/__pycache__/v11_test_coordinator.cpython-312.pyc new file mode 100644 index 0000000..f0720cc Binary files /dev/null and b/cluster/__pycache__/v11_test_coordinator.cpython-312.pyc differ diff --git a/cluster/__pycache__/v11_test_worker.cpython-312.pyc b/cluster/__pycache__/v11_test_worker.cpython-312.pyc new file mode 100644 index 0000000..e6a4f33 Binary files /dev/null and b/cluster/__pycache__/v11_test_worker.cpython-312.pyc differ diff --git a/cluster/run_v11_test_sweep.sh b/cluster/run_v11_test_sweep.sh new file mode 100755 index 0000000..441d290 --- /dev/null +++ b/cluster/run_v11_test_sweep.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# V11 Test Parameter Sweep Launch Script +# Initializes database and starts coordinator for 256-combination test sweep + +set -e # Exit on error + +echo "================================================================" +echo "V11 TEST PARAMETER SWEEP" +echo "================================================================" +echo "Combinations: 256 (2^8 parameters)" +echo "Chunks: 2 × 128 combinations" +echo "Worker 1: Always available (27 cores)" +echo "Worker 2: Office hours aware (27 cores nights/weekends only)" +echo "Expected runtime: 6-25 minutes" +echo "================================================================" +echo "" + +cd "$(dirname "$0")" + +# Check if data file exists +if [ ! -f "data/solusdt_5m.csv" ]; then + echo "✗ Error: data/solusdt_5m.csv not found" + echo " Please ensure market data is available" + exit 1 +fi + +echo "✓ Market data found" + +# Check if coordinator script exists +if [ ! -f "v11_test_coordinator.py" ]; then + echo "✗ Error: v11_test_coordinator.py not found" + exit 1 +fi + +echo "✓ Coordinator script found" + +# Launch coordinator in background +echo "" +echo "🚀 Starting coordinator..." +nohup python3 v11_test_coordinator.py > coordinator_v11_test.log 2>&1 & +COORDINATOR_PID=$! + +echo "✓ Coordinator started (PID: $COORDINATOR_PID)" +echo "" +echo "================================================================" +echo "MONITORING" +echo "================================================================" +echo "Log file: tail -f coordinator_v11_test.log" +echo "Database: sqlite3 exploration.db" +echo "Results: cluster/v11_test_results/*.csv" +echo "" +echo "To check status:" +echo " sqlite3 exploration.db \"SELECT * FROM v11_test_chunks\"" +echo "" +echo "To stop sweep:" +echo " kill $COORDINATOR_PID" +echo "================================================================" diff --git a/cluster/v11_test_coordinator.py b/cluster/v11_test_coordinator.py new file mode 100755 index 0000000..dce3c55 --- /dev/null +++ b/cluster/v11_test_coordinator.py @@ -0,0 +1,437 @@ +#!/usr/bin/env python3 +""" +V11 Test Parameter Sweep Coordinator + +Coordinates 256-combination test sweep across 2 workers with smart scheduling. +Worker 2 respects office hours (Mon-Fri 8am-6pm disabled, nights/weekends OK). + +Test sweep: 2 chunks × 128 combinations = 256 total +Expected runtime: 6-25 minutes depending on worker availability +""" + +import sqlite3 +import subprocess +import time +import signal +import sys +from pathlib import Path +from datetime import datetime +import urllib.request +import json + +# Worker configuration +WORKERS = { + 'worker1': { + 'host': 'root@10.10.254.106', + 'workspace': '/home/comprehensive_sweep', + 'cores': 27, + }, + 'worker2': { + 'host': 'root@10.20.254.100', + 'workspace': '/home/backtest_dual/backtest', + 'ssh_hop': 'root@10.10.254.106', + 'cores': 27, + 'time_restricted': True, + 'allowed_start_hour': 18, # 6 PM + 'allowed_end_hour': 8, # 8 AM + } +} + +DATA_FILE = 'data/solusdt_5m.csv' +DB_PATH = 'exploration.db' +CHUNK_SIZE = 128 # Each chunk processes 128 combinations + +# Telegram configuration +TELEGRAM_BOT_TOKEN = '8240234365:AAEm6hg_XOm54x8ctnwpNYreFKRAEvWU3uY' +TELEGRAM_CHAT_ID = '579304651' + + +def send_telegram_message(message: str): + """Send notification to Telegram""" + try: + url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" + data = { + 'chat_id': TELEGRAM_CHAT_ID, + 'text': message, + 'parse_mode': 'HTML' + } + + req = urllib.request.Request( + url, + data=json.dumps(data).encode('utf-8'), + headers={'Content-Type': 'application/json'} + ) + + with urllib.request.urlopen(req, timeout=10) as response: + if response.status == 200: + print(f"✓ Telegram notification sent") + else: + print(f"⚠️ Telegram notification failed: {response.status}") + except Exception as e: + print(f"⚠️ Error sending Telegram notification: {e}") + + +def is_worker2_available() -> bool: + """Check if Worker 2 can run (respects office hours)""" + now = datetime.now() + + # Weekend (Sat=5, Sun=6): Available 24/7 + if now.weekday() >= 5: + return True + + # Weekday: Only 6 PM - 8 AM (avoid office hours 8am-6pm) + hour = now.hour + # Allowed if hour >= 18 (6 PM) OR hour < 8 (8 AM) + return hour >= 18 or hour < 8 + + +def get_available_workers() -> list: + """Return list of workers available right now""" + workers = ['worker1'] # Always available + if is_worker2_available(): + workers.append('worker2') + print("✓ Worker 2 available (outside office hours)") + else: + print("⚠️ Worker 2 unavailable (office hours Mon-Fri 8am-6pm)") + return workers + + +def init_database(): + """Initialize database tables for v11 test sweep""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # Drop existing test tables if present + cursor.execute("DROP TABLE IF EXISTS v11_test_chunks") + cursor.execute("DROP TABLE IF EXISTS v11_test_strategies") + + # Create chunks table + cursor.execute(""" + CREATE TABLE v11_test_chunks ( + id TEXT PRIMARY KEY, + start_combo INTEGER, + end_combo INTEGER, + total_combos INTEGER, + status TEXT, + assigned_worker TEXT, + started_at INTEGER, + completed_at INTEGER + ) + """) + + # Create strategies table + cursor.execute(""" + CREATE TABLE v11_test_strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chunk_id TEXT, + params TEXT, + pnl REAL, + win_rate REAL, + profit_factor REAL, + max_drawdown REAL, + total_trades INTEGER, + FOREIGN KEY (chunk_id) REFERENCES v11_test_chunks(id) + ) + """) + + # Register 2 chunks (256 combinations total) + chunks = [ + ('v11_test_chunk_0000', 0, 128, 128), + ('v11_test_chunk_0001', 128, 256, 128), + ] + + for chunk_id, start, end, total in chunks: + cursor.execute( + "INSERT INTO v11_test_chunks (id, start_combo, end_combo, total_combos, status) VALUES (?, ?, ?, ?, 'pending')", + (chunk_id, start, end, total) + ) + + conn.commit() + conn.close() + print("✓ Database initialized with 2 chunks") + + +def get_pending_chunks() -> list: + """Get list of pending chunks""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute("SELECT id, start_combo FROM v11_test_chunks WHERE status='pending'") + chunks = cursor.fetchall() + conn.close() + return chunks + + +def assign_chunk(chunk_id: str, worker_name: str): + """Mark chunk as assigned to worker""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute( + "UPDATE v11_test_chunks SET status='running', assigned_worker=?, started_at=? WHERE id=?", + (worker_name, int(time.time()), chunk_id) + ) + conn.commit() + conn.close() + + +def deploy_worker(worker_name: str, chunk_id: str, start_combo: int): + """Deploy worker to EPYC server via SSH""" + worker = WORKERS[worker_name] + + print(f"\n{'='*60}") + print(f"Deploying {worker_name} for {chunk_id}") + print(f"{'='*60}") + + # Build SSH command + workspace = worker['workspace'] + + # Copy v11 test worker script + print(f"📦 Copying v11_test_worker.py to {worker_name}...") + + if 'ssh_hop' in worker: + # Worker 2: Use SSH hop through worker 1 + scp_cmd = [ + 'scp', + '-o', 'StrictHostKeyChecking=no', + '-o', f'ProxyJump={worker["ssh_hop"]}', + 'cluster/v11_test_worker.py', + f'{worker["host"]}:{workspace}/' + ] + else: + # Worker 1: Direct connection + scp_cmd = [ + 'scp', + '-o', 'StrictHostKeyChecking=no', + 'cluster/v11_test_worker.py', + f'{worker["host"]}:{workspace}/' + ] + + result = subprocess.run(scp_cmd, capture_output=True, text=True) + if result.returncode != 0: + print(f"✗ Failed to copy worker script: {result.stderr}") + return False + + print(f"✓ Worker script deployed") + + # Copy v11 indicator module + print(f"📦 Copying v11 indicator to {worker_name}...") + + if 'ssh_hop' in worker: + scp_cmd = [ + 'scp', + '-o', 'StrictHostKeyChecking=no', + '-o', f'ProxyJump={worker["ssh_hop"]}', + 'backtester/v11_moneyline_all_filters.py', + f'{worker["host"]}:{workspace}/backtester/' + ] + else: + scp_cmd = [ + 'scp', + '-o', 'StrictHostKeyChecking=no', + 'backtester/v11_moneyline_all_filters.py', + f'{worker["host"]}:{workspace}/backtester/' + ] + + result = subprocess.run(scp_cmd, capture_output=True, text=True) + if result.returncode != 0: + print(f"✗ Failed to copy indicator: {result.stderr}") + return False + + print(f"✓ Indicator deployed") + + # Start worker + print(f"🚀 Starting worker process...") + + worker_cmd = f"cd {workspace} && nohup python3 v11_test_worker.py {DATA_FILE} {chunk_id} {start_combo} > {chunk_id}_worker.log 2>&1 &" + + if 'ssh_hop' in worker: + ssh_cmd = [ + 'ssh', + '-o', 'StrictHostKeyChecking=no', + '-o', f'ProxyJump={worker["ssh_hop"]}', + worker['host'], + worker_cmd + ] + else: + ssh_cmd = [ + 'ssh', + '-o', 'StrictHostKeyChecking=no', + worker['host'], + worker_cmd + ] + + result = subprocess.run(ssh_cmd, capture_output=True, text=True) + if result.returncode != 0: + print(f"✗ Failed to start worker: {result.stderr}") + return False + + print(f"✓ Worker started on {worker_name}") + return True + + +def check_chunk_completion(worker_name: str, chunk_id: str) -> bool: + """Check if chunk has completed by looking for results CSV""" + worker = WORKERS[worker_name] + workspace = worker['workspace'] + + check_cmd = f"test -f {workspace}/v11_test_results/{chunk_id}_results.csv && echo 'exists'" + + if 'ssh_hop' in worker: + ssh_cmd = [ + 'ssh', + '-o', 'StrictHostKeyChecking=no', + '-o', f'ProxyJump={worker["ssh_hop"]}', + worker['host'], + check_cmd + ] + else: + ssh_cmd = [ + 'ssh', + '-o', 'StrictHostKeyChecking=no', + worker['host'], + check_cmd + ] + + result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=10) + return 'exists' in result.stdout + + +def mark_chunk_complete(chunk_id: str): + """Mark chunk as completed in database""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + cursor.execute( + "UPDATE v11_test_chunks SET status='completed', completed_at=? WHERE id=?", + (int(time.time()), chunk_id) + ) + conn.commit() + conn.close() + + +def signal_handler(sig, frame): + """Handle termination signals""" + message = ( + "⚠️ V11 Test Sweep STOPPED\n\n" + "Coordinator received termination signal.\n" + "Sweep stopped prematurely.\n\n" + f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) + send_telegram_message(message) + sys.exit(0) + + +def main(): + """Main coordinator loop""" + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + print("\n" + "="*60) + print("V11 TEST PARAMETER SWEEP COORDINATOR") + print("="*60) + print(f"Total combinations: 256 (2^8)") + print(f"Chunks: 2 × 128 combinations") + print(f"Workers: 2 × 27 cores (85% CPU)") + print(f"Expected runtime: 6-25 minutes") + print("="*60 + "\n") + + # Initialize database + print("📊 Initializing database...") + init_database() + + # Send start notification + available_workers = get_available_workers() + start_msg = ( + f"🚀 V11 Test Sweep STARTED\n\n" + f"Combinations: 256 (2^8)\n" + f"Chunks: 2 × 128 combos\n" + f"Workers: {len(available_workers)} available\n" + f"- Worker 1: Always on (27 cores)\n" + ) + if 'worker2' in available_workers: + start_msg += f"- Worker 2: Active (27 cores)\n" + else: + start_msg += f"- Worker 2: Office hours (waiting for 6 PM)\n" + start_msg += f"\nStart: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + + send_telegram_message(start_msg) + + # Deploy workers to available chunks + start_time = time.time() + active_chunks = {} # chunk_id -> worker_name + + pending_chunks = get_pending_chunks() + available_workers = get_available_workers() + + for worker_name in available_workers: + if pending_chunks: + chunk_id, start_combo = pending_chunks.pop(0) + print(f"\n📍 Assigning {chunk_id} to {worker_name}") + assign_chunk(chunk_id, worker_name) + + if deploy_worker(worker_name, chunk_id, start_combo): + active_chunks[chunk_id] = worker_name + print(f"✓ {chunk_id} active on {worker_name}") + else: + print(f"✗ Failed to deploy {chunk_id} on {worker_name}") + + # Monitor progress + print("\n" + "="*60) + print("MONITORING SWEEP PROGRESS") + print("="*60 + "\n") + + while active_chunks: + time.sleep(30) # Check every 30 seconds + + completed_this_round = [] + + for chunk_id, worker_name in active_chunks.items(): + if check_chunk_completion(worker_name, chunk_id): + print(f"✓ {chunk_id} COMPLETED on {worker_name}") + mark_chunk_complete(chunk_id) + completed_this_round.append(chunk_id) + + # Remove completed chunks + for chunk_id in completed_this_round: + del active_chunks[chunk_id] + + # Try to assign pending chunks to freed workers + if completed_this_round and pending_chunks: + available_workers = get_available_workers() + + for worker_name in available_workers: + if worker_name not in active_chunks.values() and pending_chunks: + chunk_id, start_combo = pending_chunks.pop(0) + print(f"\n📍 Assigning {chunk_id} to {worker_name}") + assign_chunk(chunk_id, worker_name) + + if deploy_worker(worker_name, chunk_id, start_combo): + active_chunks[chunk_id] = worker_name + print(f"✓ {chunk_id} active on {worker_name}") + + # All chunks complete + duration = time.time() - start_time + duration_min = duration / 60 + + print("\n" + "="*60) + print("V11 TEST SWEEP COMPLETE!") + print("="*60) + print(f"Duration: {duration_min:.1f} minutes") + print(f"Chunks: 2/2 completed") + print(f"Strategies: 256 tested") + print("="*60 + "\n") + + # Send completion notification + complete_msg = ( + f"✅ V11 Test Sweep COMPLETE\n\n" + f"Duration: {duration_min:.1f} minutes\n" + f"Chunks: 2/2 completed\n" + f"Strategies: 256 tested\n\n" + f"Check results:\n" + f"- cluster/v11_test_results/\n" + f"- sqlite3 exploration.db\n\n" + f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) + send_telegram_message(complete_msg) + + +if __name__ == '__main__': + main() diff --git a/cluster/v11_test_worker.py b/cluster/v11_test_worker.py new file mode 100755 index 0000000..c13de9b --- /dev/null +++ b/cluster/v11_test_worker.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 +""" +V11 Test Parameter Sweep Worker + +Processes chunks of v11 test parameter configurations (256 combinations total). +Uses 27 cores (85% CPU) for multiprocessing. + +Test parameter grid (2 values each = 2^8 = 256 combinations): +- flip_threshold: 0.5, 0.6 +- adx_min: 18, 21 +- long_pos_max: 75, 80 +- short_pos_min: 20, 25 +- vol_min: 0.8, 1.0 +- entry_buffer_atr: 0.15, 0.20 +- rsi_long_min: 35, 40 +- rsi_short_max: 65, 70 +""" + +import sys +import csv +import pandas as pd +from pathlib import Path +from typing import Dict, List, Any +from multiprocessing import Pool +import functools +import itertools + +# Add backtester to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from backtester.v11_moneyline_all_filters import ( + money_line_v11_signals, + MoneyLineV11Inputs +) +from backtester.simulator import simulate_money_line + +# CPU limit: 85% of 32 threads = 27 cores +MAX_WORKERS = 27 + +# Test parameter grid (256 combinations) +PARAMETER_GRID = { + 'flip_threshold': [0.5, 0.6], + 'adx_min': [18, 21], + 'long_pos_max': [75, 80], + 'short_pos_min': [20, 25], + 'vol_min': [0.8, 1.0], + 'entry_buffer_atr': [0.15, 0.20], + 'rsi_long_min': [35, 40], + 'rsi_short_max': [65, 70], +} + + +def load_market_data(csv_file: str) -> pd.DataFrame: + """Load OHLCV data from CSV""" + df = pd.read_csv(csv_file) + + # Ensure required columns exist + required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] + for col in required: + if col not in df.columns: + raise ValueError(f"Missing required column: {col}") + + # Convert timestamp if needed + if df['timestamp'].dtype == 'object': + df['timestamp'] = pd.to_datetime(df['timestamp']) + + df = df.set_index('timestamp') + print(f"✓ Loaded {len(df):,} bars from {csv_file}") + return df + + +def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]: + """ + Run backtest for single v11 test parameter configuration + + Returns dict with: + - params: original config dict + - pnl: total P&L + - trades: number of trades + - win_rate: % winners + - profit_factor: wins/losses ratio + - max_drawdown: max drawdown $ + """ + try: + # Create v11 inputs + inputs = MoneyLineV11Inputs( + flip_threshold=config['flip_threshold'], + adx_min=config['adx_min'], + long_pos_max=config['long_pos_max'], + short_pos_min=config['short_pos_min'], + vol_min=config['vol_min'], + entry_buffer_atr=config['entry_buffer_atr'], + rsi_long_min=config['rsi_long_min'], + rsi_short_max=config['rsi_short_max'], + ) + + # Generate signals + signals = money_line_v11_signals(df, inputs) + + if not signals: + return { + 'params': config, + 'pnl': 0.0, + 'trades': 0, + 'win_rate': 0.0, + 'profit_factor': 0.0, + 'max_drawdown': 0.0, + } + + # Simple backtesting: track equity curve + equity = 1000.0 # Starting capital + peak_equity = equity + max_drawdown = 0.0 + wins = 0 + losses = 0 + win_pnl = 0.0 + loss_pnl = 0.0 + + for signal in signals: + # Simple trade simulation + # TP1 at +0.86%, SL at -1.29% (ATR-based defaults) + entry = signal.entry_price + + # Look ahead in data to see if TP or SL hit + signal_idx = df.index.get_loc(signal.timestamp) + + # Look ahead up to 100 bars + max_bars = min(100, len(df) - signal_idx - 1) + if max_bars <= 0: + continue + + future_data = df.iloc[signal_idx+1:signal_idx+1+max_bars] + + if signal.direction == "long": + tp_price = entry * 1.0086 # +0.86% + sl_price = entry * 0.9871 # -1.29% + + # Check if TP or SL hit + hit_tp = (future_data['high'] >= tp_price).any() + hit_sl = (future_data['low'] <= sl_price).any() + + if hit_tp: + pnl = 1000.0 * 0.0086 # $8.60 on $1000 position + equity += pnl + wins += 1 + win_pnl += pnl + elif hit_sl: + pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position + equity += pnl + losses += 1 + loss_pnl += abs(pnl) + else: # short + tp_price = entry * 0.9914 # -0.86% + sl_price = entry * 1.0129 # +1.29% + + # Check if TP or SL hit + hit_tp = (future_data['low'] <= tp_price).any() + hit_sl = (future_data['high'] >= sl_price).any() + + if hit_tp: + pnl = 1000.0 * 0.0086 # $8.60 on $1000 position + equity += pnl + wins += 1 + win_pnl += pnl + elif hit_sl: + pnl = -1000.0 * 0.0129 # -$12.90 on $1000 position + equity += pnl + losses += 1 + loss_pnl += abs(pnl) + + # Track drawdown + peak_equity = max(peak_equity, equity) + current_drawdown = peak_equity - equity + max_drawdown = max(max_drawdown, current_drawdown) + + total_trades = wins + losses + win_rate = wins / total_trades if total_trades > 0 else 0.0 + profit_factor = win_pnl / loss_pnl if loss_pnl > 0 else (float('inf') if win_pnl > 0 else 0.0) + total_pnl = equity - 1000.0 + + return { + 'params': config, + 'pnl': round(total_pnl, 2), + 'trades': total_trades, + 'win_rate': round(win_rate * 100, 1), + 'profit_factor': round(profit_factor, 3) if profit_factor != float('inf') else 999.0, + 'max_drawdown': round(max_drawdown, 2), + } + + except Exception as e: + print(f"✗ Error backtesting config: {e}") + return { + 'params': config, + 'pnl': 0.0, + 'trades': 0, + 'win_rate': 0.0, + 'profit_factor': 0.0, + 'max_drawdown': 0.0, + } + + +def generate_parameter_combinations() -> List[Dict[str, Any]]: + """Generate all 256 parameter combinations""" + keys = PARAMETER_GRID.keys() + values = PARAMETER_GRID.values() + + combinations = [] + for combo in itertools.product(*values): + config = dict(zip(keys, combo)) + combinations.append(config) + + return combinations + + +def process_chunk(data_file: str, chunk_id: str, start_idx: int, end_idx: int): + """Process a chunk of parameter combinations""" + print(f"\n{'='*60}") + print(f"V11 Test Worker - {chunk_id}") + print(f"Processing combinations {start_idx} to {end_idx-1}") + print(f"{'='*60}\n") + + # Load market data + df = load_market_data(data_file) + + # Generate all combinations + all_combos = generate_parameter_combinations() + print(f"✓ Generated {len(all_combos)} total combinations") + + # Get this chunk's combinations + chunk_combos = all_combos[start_idx:end_idx] + print(f"✓ Processing {len(chunk_combos)} combinations in this chunk\n") + + # Backtest with multiprocessing + print(f"⚡ Starting {MAX_WORKERS}-core backtest...\n") + + with Pool(processes=MAX_WORKERS) as pool: + backtest_func = functools.partial(backtest_config, df) + results = pool.map(backtest_func, chunk_combos) + + print(f"\n✓ Completed {len(results)} backtests") + + # Write results to CSV + output_dir = Path('v11_test_results') + output_dir.mkdir(exist_ok=True) + + csv_file = output_dir / f"{chunk_id}_results.csv" + + with open(csv_file, 'w', newline='') as f: + writer = csv.writer(f) + + # Header + writer.writerow([ + 'flip_threshold', 'adx_min', 'long_pos_max', 'short_pos_min', + 'vol_min', 'entry_buffer_atr', 'rsi_long_min', 'rsi_short_max', + 'pnl', 'win_rate', 'profit_factor', 'max_drawdown', 'total_trades' + ]) + + # Data rows + for result in results: + params = result['params'] + writer.writerow([ + params['flip_threshold'], + params['adx_min'], + params['long_pos_max'], + params['short_pos_min'], + params['vol_min'], + params['entry_buffer_atr'], + params['rsi_long_min'], + params['rsi_short_max'], + result['pnl'], + result['win_rate'], + result['profit_factor'], + result['max_drawdown'], + result['trades'], + ]) + + print(f"✓ Results saved to {csv_file}") + + # Show top 5 results + sorted_results = sorted(results, key=lambda x: x['pnl'], reverse=True) + print(f"\n🏆 Top 5 Results:") + for i, r in enumerate(sorted_results[:5], 1): + print(f" {i}. PnL: ${r['pnl']:,.2f} | Trades: {r['trades']} | WR: {r['win_rate']}%") + + +if __name__ == '__main__': + if len(sys.argv) != 4: + print("Usage: python v11_test_worker.py ") + sys.exit(1) + + data_file = sys.argv[1] + chunk_id = sys.argv[2] + start_idx = int(sys.argv[3]) + + # Calculate end index (128 combos per chunk) + end_idx = start_idx + 128 + + process_chunk(data_file, chunk_id, start_idx, end_idx)