Files
trading_bot_v4/telegram_command_bot.py
mindesbunister 23277b7c87 feat: Manual trades wait for fresh 1-minute ATR datapoint
PHASE 2 ENHANCED: Manual trades now wait for next 1-minute datapoint
instead of using cached/stale data. Guarantees fresh ATR (<60s old).

User requirement: 'when i send a telegram message to enter the market,
the bot will simply wait for the next 1 minute datapoint'

Implementation:
- Add wait_for_fresh_market_data() async helper function
- Polls market data cache every 5 seconds (max 60s)
- Detects fresh data by timestamp change
- Extracts real ATR/ADX/RSI from 1-minute TradingView data
- User sees waiting message + confirmation when fresh data arrives
- Falls back to preset ATR 0.43 on timeout (fail-safe)

Benefits:
- Adaptive targets match CURRENT volatility (not historical)
- No stale data risk (guaranteed <60s old)
- Better than Phase 2 v1 (5-minute tolerance)
- Consistent with automated trades (same 1-min data source)

User Experience:
1. User: /long sol
2. Bot:  Waiting for next 1-minute datapoint...
3. [Wait 15-45 seconds typically]
4. Bot:  Fresh ATR: 0.4523 | ADX: 34.2 | RSI: 56.8
5. Bot:  Position opened with adaptive targets

Changes:
- Add asyncio import for async sleep
- Add wait_for_fresh_market_data() before manual_trade_handler
- Replace Phase 2 v1 (5min tolerance) with polling logic
- Add 3 user messages (waiting, confirmation, timeout)
- Extract ATR/ADX/RSI from fresh data or fallback

Files:
- telegram_command_bot.py: +70 lines polling logic
2025-12-02 19:35:24 +01:00

898 lines
35 KiB
Python

#!/usr/bin/env python3
"""
Telegram Trade Bot - SECURE Command-based
Only responds to YOUR commands in YOUR chat
"""
import os
import time
import asyncio
import requests
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
def retry_request(func, max_retries=3, initial_delay=2):
"""
Retry wrapper for DNS/connection failures
Similar to Node.js retryOperation() logic
"""
for attempt in range(max_retries):
try:
return func()
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
Exception) as e:
error_msg = str(e).lower()
# Check for transient DNS/connection errors
if 'name or service not known' in error_msg or \
'failed to resolve' in error_msg or \
'connection' in error_msg:
if attempt < max_retries - 1:
delay = initial_delay * (2 ** attempt)
print(f"⏳ DNS/connection error (attempt {attempt + 1}/{max_retries}): {e}", flush=True)
print(f" Retrying in {delay}s...", flush=True)
time.sleep(delay)
continue
# Non-transient error or max retries reached
raise
raise Exception(f"Max retries ({max_retries}) exceeded")
# Configuration
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
N8N_WEBHOOK_URL = os.getenv('N8N_WEBHOOK_URL')
TRADING_BOT_URL = os.getenv('TRADING_BOT_URL', 'http://trading-bot-v4:3000')
API_SECRET_KEY = os.getenv('API_SECRET_KEY', '')
ALLOWED_CHAT_ID = int(os.getenv('TELEGRAM_CHAT_ID', '579304651'))
SYMBOL_MAP = {
'sol': {
'tradingview': 'SOLUSDT',
'label': 'SOL'
},
'eth': {
'tradingview': 'ETHUSDT',
'label': 'ETH'
},
'btc': {
'tradingview': 'BTCUSDT',
'label': 'BTC'
},
}
MANUAL_METRICS = {
'long': {
'atr': 0.43, # Updated Nov 17, 2025: Based on 162 SOL trades, median = 0.43 (~0.32% of price)
'adx': 32,
'rsi': 58,
'volumeRatio': 1.25,
'pricePosition': 55,
},
'short': {
'atr': 0.43, # Updated Nov 17, 2025: Based on 162 SOL trades, median = 0.43 (~0.32% of price)
'adx': 32,
'rsi': 42,
'volumeRatio': 1.25,
'pricePosition': 45,
},
}
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /status command - show current open positions"""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
print(f"📊 /status command received", flush=True)
try:
# Fetch positions from trading bot API with retry logic
response = retry_request(lambda: requests.get(
f"{TRADING_BOT_URL}/api/trading/positions",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
timeout=10
))
print(f"📥 API Response: {response.status_code}", flush=True)
if not response.ok:
await update.message.reply_text(f"❌ Error fetching positions: {response.status_code}")
return
data = response.json()
if not data.get('success'):
await update.message.reply_text("❌ Failed to fetch positions")
return
# Check if there are active positions
positions = data.get('positions', [])
if not positions:
await update.message.reply_text("📊 *No open positions*\n\nAll clear! Ready for new signals.", parse_mode='Markdown')
return
# Format position information
for pos in positions:
symbol = pos['symbol']
direction = pos['direction'].upper()
entry = pos['entryPrice']
current = pos['currentPrice']
size = pos['currentSize']
leverage = pos['leverage']
# P&L
pnl_pct = pos['profitPercent']
account_pnl = pos['accountPnL']
unrealized_pnl = pos['unrealizedPnL']
# Targets
sl = pos['stopLoss']
tp1 = pos['takeProfit1']
tp2 = pos['takeProfit2']
tp1_hit = pos['tp1Hit']
# Age
age_min = pos['ageMinutes']
# Build status message
emoji = "🟢" if account_pnl > 0 else "🔴" if account_pnl < 0 else ""
direction_emoji = "📈" if direction == "LONG" else "📉"
message = f"{emoji} *{symbol}* {direction_emoji} {direction}\n\n"
message += f"💰 *P&L:* ${unrealized_pnl:.2f} ({account_pnl:+.2f}% account)\n"
message += f"📊 *Price Change:* {pnl_pct:+.2f}%\n\n"
message += f"*Entry:* ${entry:.4f}\n"
message += f"*Current:* ${current:.4f}\n\n"
message += f"*Targets:*\n"
message += f" TP1: ${tp1:.4f} {'' if tp1_hit else ''}\n"
message += f" TP2: ${tp2:.4f}\n"
message += f" SL: ${sl:.4f}\n\n"
message += f"*Position:* ${size:.2f} @ {leverage}x\n"
message += f"*Age:* {age_min} min"
await update.message.reply_text(message, parse_mode='Markdown')
print(f"✅ Status sent: {len(positions)} position(s)", flush=True)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def scale_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /scale command - add to existing position and adjust TP/SL"""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
print(f"📈 /scale command received", flush=True)
try:
# First, get the current open position
pos_response = requests.get(
f"{TRADING_BOT_URL}/api/trading/positions",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
timeout=10
)
if not pos_response.ok:
await update.message.reply_text(f"❌ Error fetching positions: {pos_response.status_code}")
return
pos_data = pos_response.json()
positions = pos_data.get('positions', [])
if not positions:
await update.message.reply_text("❌ No open positions to scale")
return
if len(positions) > 1:
await update.message.reply_text("❌ Multiple positions open. Please close extras first.")
return
position = positions[0]
trade_id = position['id']
# Determine scale percent from command argument
scale_percent = 50 # Default
if context.args and len(context.args) > 0:
try:
scale_percent = int(context.args[0])
if scale_percent < 10 or scale_percent > 200:
await update.message.reply_text("❌ Scale percent must be between 10 and 200")
return
except ValueError:
await update.message.reply_text("❌ Invalid scale percent. Usage: /scale [percent]")
return
# Send scaling request
response = requests.post(
f"{TRADING_BOT_URL}/api/trading/scale-position",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
json={'tradeId': trade_id, 'scalePercent': scale_percent},
timeout=30
)
print(f"📥 API Response: {response.status_code}", flush=True)
if not response.ok:
data = response.json()
await update.message.reply_text(f"❌ Error: {data.get('message', 'Unknown error')}")
return
data = response.json()
if not data.get('success'):
await update.message.reply_text(f"{data.get('message', 'Failed to scale position')}")
return
# Build success message
message = f"✅ *Position Scaled by {scale_percent}%*\n\n"
message += f"*{position['symbol']} {position['direction'].upper()}*\n\n"
message += f"*Entry Price:*\n"
message += f" Old: ${data['oldEntry']:.2f}\n"
message += f" New: ${data['newEntry']:.2f}\n\n"
message += f"*Position Size:*\n"
message += f" Old: ${data['oldSize']:.0f}\n"
message += f" New: ${data['newSize']:.0f}\n\n"
message += f"*New Targets:*\n"
message += f" TP1: ${data['newTP1']:.2f}\n"
message += f" TP2: ${data['newTP2']:.2f}\n"
message += f" SL: ${data['newSL']:.2f}\n\n"
message += f"🎯 All TP/SL orders updated!"
await update.message.reply_text(message, parse_mode='Markdown')
print(f"✅ Position scaled: {scale_percent}%", flush=True)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def reduce_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /reduce command - take partial profits and adjust TP/SL"""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
print(f"📉 /reduce command received", flush=True)
try:
# First, get the current open position
pos_response = requests.get(
f"{TRADING_BOT_URL}/api/trading/positions",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
timeout=10
)
if not pos_response.ok:
await update.message.reply_text(f"❌ Error fetching positions: {pos_response.status_code}")
return
pos_data = pos_response.json()
positions = pos_data.get('positions', [])
if not positions:
await update.message.reply_text("❌ No open positions to reduce")
return
if len(positions) > 1:
await update.message.reply_text("❌ Multiple positions open. Please close extras first.")
return
position = positions[0]
trade_id = position['id']
# Determine reduce percent from command argument
reduce_percent = 50 # Default
if context.args and len(context.args) > 0:
try:
reduce_percent = int(context.args[0])
if reduce_percent < 10 or reduce_percent > 100:
await update.message.reply_text("❌ Reduce percent must be between 10 and 100")
return
except ValueError:
await update.message.reply_text("❌ Invalid reduce percent. Usage: /reduce [percent]")
return
# Send reduce request
response = requests.post(
f"{TRADING_BOT_URL}/api/trading/reduce-position",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
json={'tradeId': trade_id, 'reducePercent': reduce_percent},
timeout=30
)
print(f"📥 API Response: {response.status_code}", flush=True)
if not response.ok:
data = response.json()
await update.message.reply_text(f"❌ Error: {data.get('message', 'Unknown error')}")
return
data = response.json()
if not data.get('success'):
await update.message.reply_text(f"{data.get('message', 'Failed to reduce position')}")
return
# Build success message
message = f"✅ *Position Reduced by {reduce_percent}%*\n\n"
message += f"*{position['symbol']} {position['direction'].upper()}*\n\n"
message += f"*Closed:*\n"
message += f" Size: ${data['closedSize']:.0f}\n"
message += f" Price: ${data['closePrice']:.2f}\n"
message += f" P&L: ${data['realizedPnL']:.2f}\n\n"
message += f"*Remaining:*\n"
message += f" Size: ${data['remainingSize']:.0f}\n"
message += f" Entry: ${position['entryPrice']:.2f}\n\n"
message += f"*Updated Targets:*\n"
message += f" TP1: ${data['newTP1']:.2f}\n"
message += f" TP2: ${data['newTP2']:.2f}\n"
message += f" SL: ${data['newSL']:.2f}\n\n"
message += f"🎯 TP/SL orders updated for remaining size!"
await update.message.reply_text(message, parse_mode='Markdown')
print(f"✅ Position reduced: {reduce_percent}%", flush=True)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def close_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /close command - close entire position and cancel all orders"""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
print(f"🔴 /close command received", flush=True)
try:
# First, get the current open position
pos_response = requests.get(
f"{TRADING_BOT_URL}/api/trading/positions",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
timeout=10
)
if not pos_response.ok:
await update.message.reply_text(f"❌ Error fetching positions: {pos_response.status_code}")
return
pos_data = pos_response.json()
positions = pos_data.get('positions', [])
if not positions:
await update.message.reply_text("❌ No open positions to close")
return
if len(positions) > 1:
await update.message.reply_text("❌ Multiple positions open. Specify symbol or use /reduce")
return
position = positions[0]
symbol = position['symbol']
direction = position['direction'].upper()
entry = position['entryPrice']
size = position['currentSize']
# Close position at market (100%)
response = requests.post(
f"{TRADING_BOT_URL}/api/trading/close",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
json={'symbol': symbol, 'percentToClose': 100},
timeout=30
)
print(f"📥 API Response: {response.status_code}", flush=True)
if not response.ok:
data = response.json()
await update.message.reply_text(f"❌ Error: {data.get('message', 'Unknown error')}")
return
data = response.json()
if not data.get('success'):
await update.message.reply_text(f"{data.get('message', 'Failed to close position')}")
return
# Build success message
close_price = data.get('closePrice', 0)
realized_pnl = data.get('realizedPnL', 0)
emoji = "💚" if realized_pnl > 0 else "❤️" if realized_pnl < 0 else "💛"
message = f"{emoji} *Position Closed*\n\n"
message += f"*{symbol} {direction}*\n\n"
message += f"*Entry:* ${entry:.4f}\n"
message += f"*Exit:* ${close_price:.4f}\n"
message += f"*Size:* ${size:.2f}\n\n"
message += f"*P&L:* ${realized_pnl:.2f}\n\n"
message += f"✅ Position closed at market\n"
message += f"✅ All TP/SL orders cancelled"
await update.message.reply_text(message, parse_mode='Markdown')
print(f"✅ Position closed: {symbol} | P&L: ${realized_pnl:.2f}", flush=True)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def validate_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /validate command - check position consistency"""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
print(f"🔍 /validate command received", flush=True)
try:
# Fetch validation from trading bot API
response = requests.post(
f"{TRADING_BOT_URL}/api/trading/validate-positions",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
timeout=10
)
print(f"📥 API Response: {response.status_code}", flush=True)
if not response.ok:
await update.message.reply_text(f"❌ Error validating positions: {response.status_code}")
return
data = response.json()
if not data.get('success'):
await update.message.reply_text("❌ Failed to validate positions")
return
# Get summary
summary = data.get('summary', {})
config = data.get('config', {})
positions = data.get('positions', [])
if not positions:
await update.message.reply_text("📊 *No positions to validate*\n\nAll clear!", parse_mode='Markdown')
return
# Build validation report
message = "🔍 *Position Validation Report*\n\n"
message += f"*Current Settings:*\n"
message += f" Leverage: {config.get('leverage')}x\n"
message += f" Position Size: ${config.get('positionSize')}\n"
message += f" TP1: {config.get('tp1Percent')}%\n"
message += f" TP2: {config.get('tp2Percent')}%\n"
message += f" SL: {config.get('stopLossPercent')}%\n\n"
message += f"*Summary:*\n"
message += f" Total: {summary.get('totalPositions')}\n"
message += f" ✅ Valid: {summary.get('validPositions')}\n"
message += f" ⚠️ Issues: {summary.get('positionsWithIssues')}\n\n"
# Show each position with issues
for pos in positions:
if not pos['isValid']:
message += f"*{pos['symbol']} {pos['direction'].upper()}*\n"
message += f"Entry: ${pos['entryPrice']:.4f}\n"
for issue in pos['issues']:
emoji = "" if issue['type'] == 'error' else "⚠️"
message += f"{emoji} {issue['message']}\n"
message += "\n"
if summary.get('validPositions') == summary.get('totalPositions'):
message = "✅ *All positions valid!*\n\n" + message
await update.message.reply_text(message, parse_mode='Markdown')
print(f"✅ Validation sent", flush=True)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def trade_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle trade commands like /buySOL, /sellBTC, etc."""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
# Extract command (remove the /)
command = update.message.text[1:].lower() # e.g., "buysol"
# Parse action and symbol
if command.startswith('buy'):
action = 'buy'
symbol = command[3:] # e.g., "sol"
elif command.startswith('sell'):
action = 'sell'
symbol = command[4:] # e.g., "btc"
else:
await update.message.reply_text("❓ Unknown command")
return
message = f"{action} {symbol}"
print(f"📨 Command: {message}", flush=True)
# Forward to n8n webhook - send as plain text body like TradingView does
try:
print(f"📤 Sending: {message}", flush=True)
response = requests.post(
N8N_WEBHOOK_URL,
data=message, # Plain text, not JSON
headers={'Content-Type': 'text/plain'},
timeout=10
)
print(f"📥 Response status: {response.status_code}", flush=True)
print(f"📥 Response body: {response.text[:200]}", flush=True)
if response.ok:
print(f"✅ Sent: {message}", flush=True)
await update.message.reply_text(
f"🤖 {action.upper()} {symbol.upper()}\n"
f"✅ Trade command sent!"
)
else:
print(f"❌ Error: {response.status_code}", flush=True)
await update.message.reply_text(f"❌ Error: {response.status_code}")
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def wait_for_fresh_market_data(symbol: str, max_wait: int = 60):
"""
Poll market data cache until fresh data arrives (new timestamp detected).
Args:
symbol: Drift symbol (e.g., 'SOL-PERP', 'ETH-PERP')
max_wait: Maximum seconds to wait (default 60)
Returns:
dict: Fresh market data with atr/adx/rsi/timestamp
None: Timeout or error
"""
start_time = time.time()
last_timestamp = None
poll_count = 0
print(f"⏳ Waiting for fresh 1-minute data: {symbol} (max {max_wait}s)", flush=True)
while (time.time() - start_time) < max_wait:
try:
response = requests.get(
f"{TRADING_BOT_URL}/api/trading/market-data",
timeout=5
)
if response.ok:
data = response.json()
if data.get('success') and data.get('cache'):
symbol_data = data['cache'].get(symbol)
if symbol_data:
current_timestamp = symbol_data.get('timestamp')
data_age = symbol_data.get('ageSeconds', 999)
poll_count += 1
print(f"🔍 Poll #{poll_count}: timestamp={current_timestamp}, age={data_age}s", flush=True)
# Fresh data detected (timestamp changed from last poll)
if last_timestamp and current_timestamp != last_timestamp:
print(f"✅ Fresh data detected after {poll_count} polls ({time.time() - start_time:.1f}s)", flush=True)
return symbol_data
last_timestamp = current_timestamp
else:
print(f"⚠️ No data for {symbol} in cache (poll #{poll_count + 1})", flush=True)
poll_count += 1
else:
print(f"⚠️ No cache data in response (poll #{poll_count + 1})", flush=True)
poll_count += 1
else:
print(f"⚠️ Market data fetch failed: {response.status_code} (poll #{poll_count + 1})", flush=True)
poll_count += 1
except Exception as e:
print(f"⚠️ Market data poll error: {e} (poll #{poll_count + 1})", flush=True)
poll_count += 1
# Wait 5 seconds before next poll
await asyncio.sleep(5)
print(f"❌ Timeout after {poll_count} polls ({max_wait}s) - no fresh data received", flush=True)
return None
async def manual_trade_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Execute manual long/short commands sent as plain text with analytics validation."""
if update.message is None:
return
if update.message.chat_id != ALLOWED_CHAT_ID:
return
text = update.message.text.strip().lower()
parts = text.split()
# Check for --force flag
force_trade = '--force' in parts
if force_trade:
parts.remove('--force')
if len(parts) != 2:
return
direction, symbol_key = parts[0], parts[1]
if direction not in ('long', 'short'):
return
symbol_info = SYMBOL_MAP.get(symbol_key)
if not symbol_info:
return
# Convert to Drift format for analytics check
drift_symbol_map = {
'sol': 'SOL-PERP',
'eth': 'ETH-PERP',
'btc': 'BTC-PERP'
}
drift_symbol = drift_symbol_map.get(symbol_key)
# 🆕 PHASE 1: Check analytics before executing (unless forced)
if not force_trade:
try:
print(f"🔍 Checking re-entry analytics for {direction.upper()} {drift_symbol}", flush=True)
analytics_response = requests.post(
f"{TRADING_BOT_URL}/api/analytics/reentry-check",
json={'symbol': drift_symbol, 'direction': direction},
timeout=10
)
if analytics_response.ok:
analytics = analytics_response.json()
if not analytics.get('should_enter'):
# Build rejection message with data source info
data_source = analytics.get('data_source', 'unknown')
data_age = analytics.get('data_age_seconds')
data_emoji = {
'tradingview_real': '',
'fallback_historical': '⚠️',
'no_data': ''
}
data_icon = data_emoji.get(data_source, '')
data_age_text = f" ({data_age}s old)" if data_age else ""
message = (
f"🛑 *Analytics suggest NOT entering {direction.upper()} {symbol_info['label']}*\n\n"
f"*Reason:* {analytics.get('reason', 'Unknown')}\n"
f"*Score:* {analytics.get('score', 0)}/100\n"
f"*Data:* {data_icon} {data_source}{data_age_text}\n\n"
f"Use `{text} --force` to override"
)
await update.message.reply_text(message, parse_mode='Markdown')
print(f"❌ Trade blocked by analytics (score: {analytics.get('score')})", flush=True)
return
# Analytics passed - show confirmation
data_age = analytics.get('data_age_seconds')
data_source = analytics.get('data_source', 'unknown')
data_age_text = f" ({data_age}s old)" if data_age else ""
confirm_message = (
f"✅ *Analytics check passed ({analytics.get('score')}/100)*\n"
f"Data: {data_source}{data_age_text}\n"
f"Proceeding with {direction.upper()} {symbol_info['label']}..."
)
await update.message.reply_text(confirm_message, parse_mode='Markdown')
print(f"✅ Analytics passed (score: {analytics.get('score')})", flush=True)
else:
# Analytics endpoint failed - proceed with trade (fail-open)
print(f"⚠️ Analytics check failed ({analytics_response.status_code}) - proceeding anyway", flush=True)
except Exception as analytics_error:
# Analytics check error - proceed with trade (fail-open)
print(f"⚠️ Analytics error: {analytics_error} - proceeding anyway", flush=True)
# 🆕 PHASE 2 ENHANCED: Wait for fresh 1-minute datapoint (Dec 2, 2025)
# User requirement: "when i send a telegram message to enter the market,
# the bot will simply wait for the next 1 minute datapoint"
# Send waiting message to user
await update.message.reply_text(
f"⏳ *Waiting for next 1-minute datapoint...*\n"
f"Will execute with fresh ATR (max 60s)",
parse_mode='Markdown'
)
# Poll for fresh data (new timestamp = new datapoint arrived)
fresh_data = await wait_for_fresh_market_data(drift_symbol, max_wait=60)
# Extract metrics from fresh data or fallback to preset
metrics = MANUAL_METRICS[direction] # Start with preset defaults
if fresh_data:
# Use real-time metrics from fresh 1-minute data
fresh_atr = fresh_data.get('atr')
fresh_adx = fresh_data.get('adx')
fresh_rsi = fresh_data.get('rsi')
data_age = fresh_data.get('ageSeconds', 0)
if fresh_atr and fresh_atr > 0:
metrics = {
'atr': fresh_atr,
'adx': fresh_adx if fresh_adx else metrics['adx'], # Fallback if missing
'rsi': fresh_rsi if fresh_rsi else metrics['rsi'], # Fallback if missing
'volumeRatio': metrics['volumeRatio'], # Keep preset (not in 1-min data)
'pricePosition': metrics['pricePosition'], # Keep preset (not in 1-min data)
}
print(f"✅ Using fresh metrics: ATR={metrics['atr']:.4f}, ADX={metrics['adx']:.1f}, RSI={metrics['rsi']:.1f} ({data_age}s old)", flush=True)
await update.message.reply_text(
f"✅ *Fresh data received*\n"
f"ATR: {metrics['atr']:.4f} | ADX: {metrics['adx']:.1f} | RSI: {metrics['rsi']:.1f}\n"
f"Executing {direction.upper()} {symbol_info['label']}...",
parse_mode='Markdown'
)
else:
print(f"⚠️ Fresh data invalid (ATR={fresh_atr}), using preset metrics", flush=True)
await update.message.reply_text(
f"⚠️ *Fresh data invalid*\n"
f"Using preset ATR: {metrics['atr']}\n"
f"Executing {direction.upper()} {symbol_info['label']}...",
parse_mode='Markdown'
)
else:
# Timeout - fallback to preset with warning
print(f"⚠️ Timeout waiting for fresh data - using preset metrics: ATR={metrics['atr']}", flush=True)
await update.message.reply_text(
f"⚠️ *Timeout waiting for fresh data*\n"
f"Using preset ATR: {metrics['atr']}\n"
f"Executing {direction.upper()} {symbol_info['label']}...",
parse_mode='Markdown'
)
# Execute the trade with fresh or fallback metrics
payload = {
'symbol': symbol_info['tradingview'],
'direction': direction,
'timeframe': 'manual',
'signalStrength': 'manual',
'atr': metrics['atr'], # 🆕 Fresh ATR from 1-minute data or preset fallback
'adx': metrics['adx'],
'rsi': metrics['rsi'],
'volumeRatio': metrics['volumeRatio'],
'pricePosition': metrics['pricePosition'],
}
try:
print(f"🚀 Manual trade: {direction.upper()} {symbol_info['label']}{' (FORCED)' if force_trade else ''}", flush=True)
response = retry_request(lambda: requests.post(
f"{TRADING_BOT_URL}/api/trading/execute",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
json=payload,
timeout=60,
))
print(f"📥 Manual trade response: {response.status_code}", flush=True)
# Parse JSON even for error responses to get detailed error messages
try:
data = response.json()
except Exception:
await update.message.reply_text(f"❌ Execution error ({response.status_code})")
return
if not data.get('success'):
# CRITICAL: Show detailed error message (may contain "CLOSE POSITION MANUALLY")
message = data.get('message') or data.get('error') or 'Trade rejected'
await update.message.reply_text(f"{message}")
return
entry_price = data.get('entryPrice')
notional = data.get('positionSize')
leverage = data.get('leverage')
tp1 = data.get('takeProfit1')
tp2 = data.get('takeProfit2')
sl = data.get('stopLoss')
entry_text = f"${entry_price:.4f}" if entry_price is not None else 'n/a'
size_text = (
f"${notional:.2f} @ {leverage}x"
if notional is not None and leverage is not None
else 'n/a'
)
tp1_text = f"${tp1:.4f}" if tp1 is not None else 'n/a'
tp2_text = f"${tp2:.4f}" if tp2 is not None else 'n/a'
sl_text = f"${sl:.4f}" if sl is not None else 'n/a'
force_indicator = " (FORCED)" if force_trade else ""
success_message = (
f"✅ OPENED {direction.upper()} {symbol_info['label']}{force_indicator}\n"
f"Entry: {entry_text}\n"
f"Size: {size_text}\n"
f"TP1: {tp1_text}\nTP2: {tp2_text}\nSL: {sl_text}"
)
await update.message.reply_text(success_message)
except Exception as exc:
print(f"❌ Manual trade failed: {exc}", flush=True)
await update.message.reply_text(f"❌ Error: {exc}")
def main():
"""Start the bot"""
print(f"🚀 Telegram Trade Bot Starting...", flush=True)
print(f"📱 Allowed Chat ID: {ALLOWED_CHAT_ID}", flush=True)
print(f"🔗 Webhook: {N8N_WEBHOOK_URL}", flush=True)
print(f"🤖 Trading Bot: {TRADING_BOT_URL}", flush=True)
print(f"\n✅ Commands:", flush=True)
print(f" /status - Show open positions", flush=True)
print(f" /validate - Validate positions against settings", flush=True)
print(f" /scale [percent] - Scale position (default 50%)", flush=True)
print(f" /reduce [percent] - Take partial profits (default 50%)", flush=True)
print(f" /buySOL, /sellSOL", flush=True)
print(f" /buyBTC, /sellBTC", flush=True)
print(f" /buyETH, /sellETH", flush=True)
print(f" long sol | short btc (plain text)", flush=True)
# Create application
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Add command handlers
application.add_handler(CommandHandler("status", status_command))
application.add_handler(CommandHandler("close", close_command))
application.add_handler(CommandHandler("validate", validate_command))
application.add_handler(CommandHandler("scale", scale_command))
application.add_handler(CommandHandler("reduce", reduce_command))
application.add_handler(CommandHandler("buySOL", trade_command))
application.add_handler(CommandHandler("sellSOL", trade_command))
application.add_handler(CommandHandler("buyBTC", trade_command))
application.add_handler(CommandHandler("sellBTC", trade_command))
application.add_handler(CommandHandler("buyETH", trade_command))
application.add_handler(CommandHandler("sellETH", trade_command))
application.add_handler(MessageHandler(
filters.TEXT & (~filters.COMMAND),
manual_trade_handler,
))
# Start polling
print("\n🤖 Bot ready! Send commands to your Telegram.\n", flush=True)
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()