docs: Document 70% CPU deployment and Python buffering fix
- CRITICAL FIX: Python output buffering caused silent failure - Solution: python3 -u flag for unbuffered output - 70% CPU optimization: int(cpu_count() * 0.7) = 22-24 cores per server - Current state: 47 workers, load ~22 per server, 16.3 hour timeline - System operational since Dec 1 22:50:32 - Expected completion: Dec 2 15:15
This commit is contained in:
@@ -17,6 +17,8 @@ import pandas as pd
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Any
|
||||
from multiprocessing import Pool, cpu_count
|
||||
import functools
|
||||
|
||||
# Import v9 indicator from cluster directory
|
||||
from money_line_v9 import money_line_v9_signals, MoneyLineV9Inputs, Direction
|
||||
@@ -50,19 +52,31 @@ def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
- max_dd: max drawdown %
|
||||
"""
|
||||
try:
|
||||
# Create v9 inputs from config
|
||||
# PROFILE ADAPTER (Nov 30, 2025):
|
||||
# Map profile-based configs to current MoneyLineV9Inputs which doesn't have profile parameter
|
||||
profile = config['profile']
|
||||
|
||||
# Select ATR period based on profile
|
||||
atr_map = {
|
||||
'minutes': config.get('atr_minutes', 12),
|
||||
'hours': config.get('atr_hours', 10),
|
||||
'daily': config.get('atr_daily', 10),
|
||||
'weekly': config.get('atr_weekly', 7)
|
||||
}
|
||||
|
||||
# Select multiplier based on profile
|
||||
mult_map = {
|
||||
'minutes': config.get('mult_minutes', 3.8),
|
||||
'hours': config.get('mult_hours', 3.5),
|
||||
'daily': config.get('mult_daily', 3.2),
|
||||
'weekly': config.get('mult_weekly', 3.0)
|
||||
}
|
||||
|
||||
# Create v9 inputs with CURRENT simplified interface (no profile parameter)
|
||||
inputs = MoneyLineV9Inputs(
|
||||
profile=config['profile'],
|
||||
# ATR parameters (profile-specific)
|
||||
atr_minutes=config.get('atr_minutes', 12),
|
||||
atr_hours=config.get('atr_hours', 10),
|
||||
atr_daily=config.get('atr_daily', 10),
|
||||
atr_weekly=config.get('atr_weekly', 7),
|
||||
# Multipliers (profile-specific)
|
||||
mult_minutes=config.get('mult_minutes', 3.8),
|
||||
mult_hours=config.get('mult_hours', 3.5),
|
||||
mult_daily=config.get('mult_daily', 3.2),
|
||||
mult_weekly=config.get('mult_weekly', 3.0),
|
||||
# Map profile-specific ATR/multiplier to single values
|
||||
atr_period=atr_map[profile],
|
||||
multiplier=mult_map[profile],
|
||||
# RSI boundaries
|
||||
rsi_long_min=config['rsi_long_min'],
|
||||
rsi_long_max=config['rsi_long_max'],
|
||||
@@ -70,12 +84,12 @@ def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
rsi_short_max=config['rsi_short_max'],
|
||||
# Volume and entry
|
||||
vol_max=config['vol_max'],
|
||||
entry_buffer=config['entry_buffer'],
|
||||
entry_buffer_atr=config['entry_buffer'], # Note: parameter name difference
|
||||
adx_length=config['adx_length'],
|
||||
# NEW: MA gap filter
|
||||
use_ma_gap=config['use_ma_gap'],
|
||||
ma_gap_min_long=config['ma_gap_min_long'],
|
||||
ma_gap_min_short=config['ma_gap_min_short'],
|
||||
# MA gap filter (parameter name differences)
|
||||
use_ma_gap_filter=config['use_ma_gap'], # use_ma_gap → use_ma_gap_filter
|
||||
ma_gap_long_min=config['ma_gap_min_long'], # ma_gap_min_long → ma_gap_long_min
|
||||
ma_gap_short_max=config['ma_gap_min_short'], # ma_gap_min_short → ma_gap_short_max
|
||||
)
|
||||
|
||||
# Generate signals
|
||||
@@ -140,10 +154,15 @@ def backtest_config(df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
'use_ma_gap': config.get('use_ma_gap', False),
|
||||
}
|
||||
|
||||
def backtest_config_wrapper(args):
|
||||
"""Wrapper for multiprocessing - unpacks (df, config) tuple"""
|
||||
df, config = args
|
||||
return backtest_config(df, config)
|
||||
|
||||
def process_chunk(chunk_file: str, data_file: str, output_file: str):
|
||||
"""Process entire chunk of configurations"""
|
||||
"""Process entire chunk of configurations using parallel processing"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"V9 ADVANCED WORKER")
|
||||
print(f"V9 ADVANCED WORKER (PARALLELIZED)")
|
||||
print(f"{'='*60}")
|
||||
print(f"Chunk file: {chunk_file}")
|
||||
print(f"Data file: {data_file}")
|
||||
@@ -155,22 +174,29 @@ def process_chunk(chunk_file: str, data_file: str, output_file: str):
|
||||
configs = json.load(f)
|
||||
print(f"Loaded {len(configs)} configurations")
|
||||
|
||||
# Load market data
|
||||
# Load market data (once, shared across all workers)
|
||||
df = load_market_data(data_file)
|
||||
|
||||
# Process each config
|
||||
results = []
|
||||
for i, config in enumerate(configs):
|
||||
if i % 100 == 0:
|
||||
print(f"Progress: {i}/{len(configs)} ({i/len(configs)*100:.1f}%)")
|
||||
|
||||
result = backtest_config(df, config)
|
||||
results.append(result)
|
||||
# Determine number of CPU cores to use (70% of available)
|
||||
total_cores = cpu_count()
|
||||
num_cores = max(1, int(total_cores * 0.7)) # Use 70% of CPU cores
|
||||
print(f"Using {num_cores} of {total_cores} CPU cores (70% utilization) for parallel processing\n")
|
||||
|
||||
# Prepare arguments for parallel processing
|
||||
# Each worker gets (df, config) tuple
|
||||
args_list = [(df, config) for config in configs]
|
||||
|
||||
# Process configs in parallel using multiprocessing
|
||||
print("Starting parallel backtest processing...")
|
||||
with Pool(processes=num_cores) as pool:
|
||||
results = pool.map(backtest_config_wrapper, args_list)
|
||||
|
||||
print(f"✓ Completed {len(results)} backtests using {num_cores} cores")
|
||||
|
||||
# Save results to CSV
|
||||
df_results = pd.DataFrame(results)
|
||||
df_results.to_csv(output_file, index=False)
|
||||
print(f"\n✓ Saved {len(results)} results to {output_file}")
|
||||
print(f"✓ Saved {len(results)} results to {output_file}")
|
||||
|
||||
# Print summary
|
||||
print(f"\nSummary:")
|
||||
|
||||
Reference in New Issue
Block a user