Files
trading_bot_v4/telegram_command_bot.py
mindesbunister dde25ad2c1 Add position validation endpoint and Telegram /validate command
- New API endpoint: /api/trading/validate-positions
- Validates TP1, TP2, SL, leverage, and position size against current settings
- Fixed position size calculation: config stores collateral, positions store total value
- Added /validate command to Telegram bot for remote checking
- Returns detailed report of any mismatches with expected vs actual values
2025-10-27 19:20:36 +01:00

264 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Telegram Trade Bot - SECURE Command-based
Only responds to YOUR commands in YOUR chat
"""
import os
import requests
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
# Configuration
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
N8N_WEBHOOK_URL = os.getenv('N8N_WEBHOOK_URL')
TRADING_BOT_URL = os.getenv('TRADING_BOT_URL', 'http://trading-bot-v4:3000')
API_SECRET_KEY = os.getenv('API_SECRET_KEY', '')
ALLOWED_CHAT_ID = int(os.getenv('TELEGRAM_CHAT_ID', '579304651'))
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /status command - show current open positions"""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
print(f"📊 /status command received", flush=True)
try:
# Fetch positions from trading bot API
response = requests.get(
f"{TRADING_BOT_URL}/api/trading/positions",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
timeout=10
)
print(f"📥 API Response: {response.status_code}", flush=True)
if not response.ok:
await update.message.reply_text(f"❌ Error fetching positions: {response.status_code}")
return
data = response.json()
if not data.get('success'):
await update.message.reply_text("❌ Failed to fetch positions")
return
# Check if there are active positions
positions = data.get('positions', [])
if not positions:
await update.message.reply_text("📊 *No open positions*\n\nAll clear! Ready for new signals.", parse_mode='Markdown')
return
# Format position information
for pos in positions:
symbol = pos['symbol']
direction = pos['direction'].upper()
entry = pos['entryPrice']
current = pos['currentPrice']
size = pos['currentSize']
leverage = pos['leverage']
# P&L
pnl_pct = pos['profitPercent']
account_pnl = pos['accountPnL']
unrealized_pnl = pos['unrealizedPnL']
# Targets
sl = pos['stopLoss']
tp1 = pos['takeProfit1']
tp2 = pos['takeProfit2']
tp1_hit = pos['tp1Hit']
# Age
age_min = pos['ageMinutes']
# Build status message
emoji = "🟢" if account_pnl > 0 else "🔴" if account_pnl < 0 else ""
direction_emoji = "📈" if direction == "LONG" else "📉"
message = f"{emoji} *{symbol}* {direction_emoji} {direction}\n\n"
message += f"💰 *P&L:* ${unrealized_pnl:.2f} ({account_pnl:+.2f}% account)\n"
message += f"📊 *Price Change:* {pnl_pct:+.2f}%\n\n"
message += f"*Entry:* ${entry:.4f}\n"
message += f"*Current:* ${current:.4f}\n\n"
message += f"*Targets:*\n"
message += f" TP1: ${tp1:.4f} {'' if tp1_hit else ''}\n"
message += f" TP2: ${tp2:.4f}\n"
message += f" SL: ${sl:.4f}\n\n"
message += f"*Position:* ${size:.2f} @ {leverage}x\n"
message += f"*Age:* {age_min} min"
await update.message.reply_text(message, parse_mode='Markdown')
print(f"✅ Status sent: {len(positions)} position(s)", flush=True)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def validate_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /validate command - check if positions match settings"""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
print(f"🔍 /validate command received", flush=True)
try:
# Fetch validation from trading bot API
response = requests.post(
f"{TRADING_BOT_URL}/api/trading/validate-positions",
headers={'Authorization': f'Bearer {API_SECRET_KEY}'},
timeout=10
)
print(f"📥 API Response: {response.status_code}", flush=True)
if not response.ok:
await update.message.reply_text(f"❌ Error validating positions: {response.status_code}")
return
data = response.json()
if not data.get('success'):
await update.message.reply_text("❌ Failed to validate positions")
return
# Get summary
summary = data.get('summary', {})
config = data.get('config', {})
positions = data.get('positions', [])
if not positions:
await update.message.reply_text("📊 *No positions to validate*\n\nAll clear!", parse_mode='Markdown')
return
# Build validation report
message = "🔍 *Position Validation Report*\n\n"
message += f"*Current Settings:*\n"
message += f" Leverage: {config.get('leverage')}x\n"
message += f" Position Size: ${config.get('positionSize')}\n"
message += f" TP1: {config.get('tp1Percent')}%\n"
message += f" TP2: {config.get('tp2Percent')}%\n"
message += f" SL: {config.get('stopLossPercent')}%\n\n"
message += f"*Summary:*\n"
message += f" Total: {summary.get('totalPositions')}\n"
message += f" ✅ Valid: {summary.get('validPositions')}\n"
message += f" ⚠️ Issues: {summary.get('positionsWithIssues')}\n\n"
# Show each position with issues
for pos in positions:
if not pos['isValid']:
message += f"*{pos['symbol']} {pos['direction'].upper()}*\n"
message += f"Entry: ${pos['entryPrice']:.4f}\n"
for issue in pos['issues']:
emoji = "" if issue['type'] == 'error' else "⚠️"
message += f"{emoji} {issue['message']}\n"
message += "\n"
if summary.get('validPositions') == summary.get('totalPositions'):
message = "✅ *All positions valid!*\n\n" + message
await update.message.reply_text(message, parse_mode='Markdown')
print(f"✅ Validation sent", flush=True)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
async def trade_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle trade commands like /buySOL, /sellBTC, etc."""
# Only process from YOUR chat
if update.message.chat_id != ALLOWED_CHAT_ID:
await update.message.reply_text("❌ Unauthorized")
return
# Extract command (remove the /)
command = update.message.text[1:].lower() # e.g., "buysol"
# Parse action and symbol
if command.startswith('buy'):
action = 'buy'
symbol = command[3:] # e.g., "sol"
elif command.startswith('sell'):
action = 'sell'
symbol = command[4:] # e.g., "btc"
else:
await update.message.reply_text("❓ Unknown command")
return
message = f"{action} {symbol}"
print(f"📨 Command: {message}", flush=True)
# Forward to n8n webhook - send as plain text body like TradingView does
try:
print(f"📤 Sending: {message}", flush=True)
response = requests.post(
N8N_WEBHOOK_URL,
data=message, # Plain text, not JSON
headers={'Content-Type': 'text/plain'},
timeout=10
)
print(f"📥 Response status: {response.status_code}", flush=True)
print(f"📥 Response body: {response.text[:200]}", flush=True)
if response.ok:
print(f"✅ Sent: {message}", flush=True)
await update.message.reply_text(
f"🤖 {action.upper()} {symbol.upper()}\n"
f"✅ Trade command sent!"
)
else:
print(f"❌ Error: {response.status_code}", flush=True)
await update.message.reply_text(f"❌ Error: {response.status_code}")
except Exception as e:
print(f"❌ Error: {e}", flush=True)
await update.message.reply_text(f"❌ Error: {str(e)}")
def main():
"""Start the bot"""
print(f"🚀 Telegram Trade Bot Starting...", flush=True)
print(f"📱 Allowed Chat ID: {ALLOWED_CHAT_ID}", flush=True)
print(f"🔗 Webhook: {N8N_WEBHOOK_URL}", flush=True)
print(f"🤖 Trading Bot: {TRADING_BOT_URL}", flush=True)
print(f"\n✅ Commands:", flush=True)
print(f" /status - Show open positions", flush=True)
print(f" /validate - Validate positions against settings", flush=True)
print(f" /buySOL, /sellSOL", flush=True)
print(f" /buyBTC, /sellBTC", flush=True)
print(f" /buyETH, /sellETH", flush=True)
# Create application
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Add command handlers
application.add_handler(CommandHandler("status", status_command))
application.add_handler(CommandHandler("validate", validate_command))
application.add_handler(CommandHandler("buySOL", trade_command))
application.add_handler(CommandHandler("sellSOL", trade_command))
application.add_handler(CommandHandler("buyBTC", trade_command))
application.add_handler(CommandHandler("sellBTC", trade_command))
application.add_handler(CommandHandler("buyETH", trade_command))
application.add_handler(CommandHandler("sellETH", trade_command))
# Start polling
print("\n🤖 Bot ready! Send commands to your Telegram.\n", flush=True)
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()