Files
trading_bot_v4/scripts/export_binance_ohlcv.py
mindesbunister 5f7702469e remove: V10 momentum system - backtest proved it adds no value
- Removed v10 TradingView indicator (moneyline_v10_momentum_dots.pinescript)
- Removed v10 penalty system from signal-quality.ts (-30/-25 point penalties)
- Removed backtest result files (sweep_*.csv)
- Updated copilot-instructions.md to remove v10 references
- Simplified direction-specific quality thresholds (LONG 90+, SHORT 80+)

Rationale:
- 1,944 parameter combinations tested in backtest
- All top results IDENTICAL (568 trades, $498 P&L, 61.09% WR)
- Momentum parameters had ZERO impact on trade selection
- Profit factor 1.027 too low (barely profitable after fees)
- Max drawdown -$1,270 vs +$498 profit = terrible risk-reward
- v10 penalties were blocking good trades (bug: applied to wrong positions)

Keeping v9 as production system - simpler, proven, effective.
2025-11-28 22:35:32 +01:00

162 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""Download historical OHLCV data from Binance and store it as CSV."""
from __future__ import annotations
import argparse
import json
import sys
import time
from pathlib import Path
from typing import Dict, List
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import pandas as pd
BINANCE_REST_BASE = "https://api.binance.com/api/v3/klines"
MAX_LIMIT = 1000
# Interval → milliseconds mapping taken from Binance documentation
INTERVAL_MS: Dict[str, int] = {
"1m": 60_000,
"3m": 180_000,
"5m": 300_000,
"15m": 900_000,
"30m": 1_800_000,
"1h": 3_600_000,
"2h": 7_200_000,
"4h": 14_400_000,
"6h": 21_600_000,
"8h": 28_800_000,
"12h": 43_200_000,
"1d": 86_400_000,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Export OHLCV candles from Binance")
parser.add_argument("--symbol", default="SOLUSDT", help="Binance symbol, e.g. SOLUSDT")
parser.add_argument(
"--interval",
default="5m",
choices=sorted(INTERVAL_MS.keys()),
help="Binance interval (default: 5m)",
)
parser.add_argument(
"--start",
required=True,
help="Start timestamp (ISO 8601, e.g. 2024-01-01 or 2024-01-01T00:00:00Z)",
)
parser.add_argument(
"--end",
required=True,
help="End timestamp (ISO 8601, inclusive)",
)
parser.add_argument(
"--output",
type=Path,
required=True,
help="Path to output CSV file (directories created automatically)",
)
parser.add_argument(
"--rate-limit-wait",
type=float,
default=0.2,
help="Seconds to sleep between paginated requests (default: 0.2s)",
)
return parser.parse_args()
def to_millis(value: str) -> int:
ts = pd.Timestamp(value)
if ts.tzinfo is None:
ts = ts.tz_localize("UTC")
else:
ts = ts.tz_convert("UTC")
return int(ts.timestamp() * 1000)
def fetch_chunk(symbol: str, interval: str, start_ms: int, end_ms: int) -> List[List[float]]:
params = {
"symbol": symbol.upper(),
"interval": interval,
"startTime": start_ms,
"endTime": end_ms,
"limit": MAX_LIMIT,
}
query = urlencode(params)
url = f"{BINANCE_REST_BASE}?{query}"
req = Request(url, headers={"User-Agent": "binance-export/1.0"})
with urlopen(req, timeout=30) as resp:
payload = resp.read()
data = json.loads(payload)
if isinstance(data, dict) and data.get("code"):
raise RuntimeError(f"Binance error {data['code']}: {data.get('msg')}")
return data # type: ignore[return-value]
def main() -> int:
args = parse_args()
interval = args.interval
if interval not in INTERVAL_MS:
raise ValueError(f"Unsupported interval: {interval}")
start_ms = to_millis(args.start)
end_ms = to_millis(args.end)
if end_ms <= start_ms:
raise ValueError("End time must be after start time")
interval_ms = INTERVAL_MS[interval]
cursor = start_ms
rows: List[List[float]] = []
request_count = 0
while cursor < end_ms:
chunk = fetch_chunk(args.symbol, interval, cursor, end_ms)
if not chunk:
break
rows.extend(chunk)
request_count += 1
last_open = chunk[-1][0]
cursor = last_open + interval_ms
if len(chunk) < MAX_LIMIT:
break
time.sleep(args.rate_limit_wait)
if not rows:
raise RuntimeError("No data returned from Binance")
columns = [
"open_time",
"open",
"high",
"low",
"close",
"volume",
"close_time",
"quote_asset_volume",
"number_of_trades",
"taker_buy_base",
"taker_buy_quote",
"ignore",
]
df = pd.DataFrame(rows, columns=columns)
df = df.drop(columns=["close_time", "quote_asset_volume", "ignore"])
df["timestamp"] = pd.to_datetime(df["open_time"], unit="ms", utc=True).dt.tz_convert(None)
df = df[["timestamp", "open", "high", "low", "close", "volume", "number_of_trades", "taker_buy_base", "taker_buy_quote"]]
df[["open", "high", "low", "close", "volume"]] = df[["open", "high", "low", "close", "volume"]].astype(float)
args.output.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(args.output, index=False)
first = df.iloc[0].timestamp
last = df.iloc[-1].timestamp
duration_days = (last - first).days
print(f"Saved {len(df):,} candles for {args.symbol} ({interval}) spanning ~{duration_days} days")
print(f"Requests made: {request_count}")
print(f"Output: {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())