#!/usr/bin/env python3 """ Telegram Trade Bot - SECURE Command-based Only responds to YOUR commands in YOUR chat """ import os import time import requests from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters def retry_request(func, max_retries=3, initial_delay=2): """ Retry wrapper for DNS/connection failures Similar to Node.js retryOperation() logic """ for attempt in range(max_retries): try: return func() except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, Exception) as e: error_msg = str(e).lower() # Check for transient DNS/connection errors if 'name or service not known' in error_msg or \ 'failed to resolve' in error_msg or \ 'connection' in error_msg: if attempt < max_retries - 1: delay = initial_delay * (2 ** attempt) print(f"ā³ DNS/connection error (attempt {attempt + 1}/{max_retries}): {e}", flush=True) print(f" Retrying in {delay}s...", flush=True) time.sleep(delay) continue # Non-transient error or max retries reached raise raise Exception(f"Max retries ({max_retries}) exceeded") # Configuration TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') N8N_WEBHOOK_URL = os.getenv('N8N_WEBHOOK_URL') TRADING_BOT_URL = os.getenv('TRADING_BOT_URL', 'http://trading-bot-v4:3000') API_SECRET_KEY = os.getenv('API_SECRET_KEY', '') ALLOWED_CHAT_ID = int(os.getenv('TELEGRAM_CHAT_ID', '579304651')) SYMBOL_MAP = { 'sol': { 'tradingview': 'SOLUSDT', 'label': 'SOL' }, 'eth': { 'tradingview': 'ETHUSDT', 'label': 'ETH' }, 'btc': { 'tradingview': 'BTCUSDT', 'label': 'BTC' }, } MANUAL_METRICS = { 'long': { 'atr': 0.43, # Updated Nov 17, 2025: Based on 162 SOL trades, median = 0.43 (~0.32% of price) 'adx': 32, 'rsi': 58, 'volumeRatio': 1.25, 'pricePosition': 55, }, 'short': { 'atr': 0.43, # Updated Nov 17, 2025: Based on 162 SOL trades, median = 0.43 (~0.32% of price) 'adx': 32, 'rsi': 42, 'volumeRatio': 1.25, 'pricePosition': 45, }, } async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /status command - show current open positions""" # Only process from YOUR chat if update.message.chat_id != ALLOWED_CHAT_ID: await update.message.reply_text("āŒ Unauthorized") return print(f"šŸ“Š /status command received", flush=True) try: # Fetch positions from trading bot API with retry logic response = retry_request(lambda: requests.get( f"{TRADING_BOT_URL}/api/trading/positions", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, timeout=10 )) print(f"šŸ“„ API Response: {response.status_code}", flush=True) if not response.ok: await update.message.reply_text(f"āŒ Error fetching positions: {response.status_code}") return data = response.json() if not data.get('success'): await update.message.reply_text("āŒ Failed to fetch positions") return # Check if there are active positions positions = data.get('positions', []) if not positions: await update.message.reply_text("šŸ“Š *No open positions*\n\nAll clear! Ready for new signals.", parse_mode='Markdown') return # Format position information for pos in positions: symbol = pos['symbol'] direction = pos['direction'].upper() entry = pos['entryPrice'] current = pos['currentPrice'] size = pos['currentSize'] leverage = pos['leverage'] # P&L pnl_pct = pos['profitPercent'] account_pnl = pos['accountPnL'] unrealized_pnl = pos['unrealizedPnL'] # Targets sl = pos['stopLoss'] tp1 = pos['takeProfit1'] tp2 = pos['takeProfit2'] tp1_hit = pos['tp1Hit'] # Age age_min = pos['ageMinutes'] # Build status message emoji = "🟢" if account_pnl > 0 else "šŸ”“" if account_pnl < 0 else "⚪" direction_emoji = "šŸ“ˆ" if direction == "LONG" else "šŸ“‰" message = f"{emoji} *{symbol}* {direction_emoji} {direction}\n\n" message += f"šŸ’° *P&L:* ${unrealized_pnl:.2f} ({account_pnl:+.2f}% account)\n" message += f"šŸ“Š *Price Change:* {pnl_pct:+.2f}%\n\n" message += f"*Entry:* ${entry:.4f}\n" message += f"*Current:* ${current:.4f}\n\n" message += f"*Targets:*\n" message += f" TP1: ${tp1:.4f} {'āœ…' if tp1_hit else 'ā³'}\n" message += f" TP2: ${tp2:.4f}\n" message += f" SL: ${sl:.4f}\n\n" message += f"*Position:* ${size:.2f} @ {leverage}x\n" message += f"*Age:* {age_min} min" await update.message.reply_text(message, parse_mode='Markdown') print(f"āœ… Status sent: {len(positions)} position(s)", flush=True) except Exception as e: print(f"āŒ Error: {e}", flush=True) await update.message.reply_text(f"āŒ Error: {str(e)}") async def scale_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /scale command - add to existing position and adjust TP/SL""" # Only process from YOUR chat if update.message.chat_id != ALLOWED_CHAT_ID: await update.message.reply_text("āŒ Unauthorized") return print(f"šŸ“ˆ /scale command received", flush=True) try: # First, get the current open position pos_response = requests.get( f"{TRADING_BOT_URL}/api/trading/positions", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, timeout=10 ) if not pos_response.ok: await update.message.reply_text(f"āŒ Error fetching positions: {pos_response.status_code}") return pos_data = pos_response.json() positions = pos_data.get('positions', []) if not positions: await update.message.reply_text("āŒ No open positions to scale") return if len(positions) > 1: await update.message.reply_text("āŒ Multiple positions open. Please close extras first.") return position = positions[0] trade_id = position['id'] # Determine scale percent from command argument scale_percent = 50 # Default if context.args and len(context.args) > 0: try: scale_percent = int(context.args[0]) if scale_percent < 10 or scale_percent > 200: await update.message.reply_text("āŒ Scale percent must be between 10 and 200") return except ValueError: await update.message.reply_text("āŒ Invalid scale percent. Usage: /scale [percent]") return # Send scaling request response = requests.post( f"{TRADING_BOT_URL}/api/trading/scale-position", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, json={'tradeId': trade_id, 'scalePercent': scale_percent}, timeout=30 ) print(f"šŸ“„ API Response: {response.status_code}", flush=True) if not response.ok: data = response.json() await update.message.reply_text(f"āŒ Error: {data.get('message', 'Unknown error')}") return data = response.json() if not data.get('success'): await update.message.reply_text(f"āŒ {data.get('message', 'Failed to scale position')}") return # Build success message message = f"āœ… *Position Scaled by {scale_percent}%*\n\n" message += f"*{position['symbol']} {position['direction'].upper()}*\n\n" message += f"*Entry Price:*\n" message += f" Old: ${data['oldEntry']:.2f}\n" message += f" New: ${data['newEntry']:.2f}\n\n" message += f"*Position Size:*\n" message += f" Old: ${data['oldSize']:.0f}\n" message += f" New: ${data['newSize']:.0f}\n\n" message += f"*New Targets:*\n" message += f" TP1: ${data['newTP1']:.2f}\n" message += f" TP2: ${data['newTP2']:.2f}\n" message += f" SL: ${data['newSL']:.2f}\n\n" message += f"šŸŽÆ All TP/SL orders updated!" await update.message.reply_text(message, parse_mode='Markdown') print(f"āœ… Position scaled: {scale_percent}%", flush=True) except Exception as e: print(f"āŒ Error: {e}", flush=True) await update.message.reply_text(f"āŒ Error: {str(e)}") async def reduce_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /reduce command - take partial profits and adjust TP/SL""" # Only process from YOUR chat if update.message.chat_id != ALLOWED_CHAT_ID: await update.message.reply_text("āŒ Unauthorized") return print(f"šŸ“‰ /reduce command received", flush=True) try: # First, get the current open position pos_response = requests.get( f"{TRADING_BOT_URL}/api/trading/positions", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, timeout=10 ) if not pos_response.ok: await update.message.reply_text(f"āŒ Error fetching positions: {pos_response.status_code}") return pos_data = pos_response.json() positions = pos_data.get('positions', []) if not positions: await update.message.reply_text("āŒ No open positions to reduce") return if len(positions) > 1: await update.message.reply_text("āŒ Multiple positions open. Please close extras first.") return position = positions[0] trade_id = position['id'] # Determine reduce percent from command argument reduce_percent = 50 # Default if context.args and len(context.args) > 0: try: reduce_percent = int(context.args[0]) if reduce_percent < 10 or reduce_percent > 100: await update.message.reply_text("āŒ Reduce percent must be between 10 and 100") return except ValueError: await update.message.reply_text("āŒ Invalid reduce percent. Usage: /reduce [percent]") return # Send reduce request response = requests.post( f"{TRADING_BOT_URL}/api/trading/reduce-position", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, json={'tradeId': trade_id, 'reducePercent': reduce_percent}, timeout=30 ) print(f"šŸ“„ API Response: {response.status_code}", flush=True) if not response.ok: data = response.json() await update.message.reply_text(f"āŒ Error: {data.get('message', 'Unknown error')}") return data = response.json() if not data.get('success'): await update.message.reply_text(f"āŒ {data.get('message', 'Failed to reduce position')}") return # Build success message message = f"āœ… *Position Reduced by {reduce_percent}%*\n\n" message += f"*{position['symbol']} {position['direction'].upper()}*\n\n" message += f"*Closed:*\n" message += f" Size: ${data['closedSize']:.0f}\n" message += f" Price: ${data['closePrice']:.2f}\n" message += f" P&L: ${data['realizedPnL']:.2f}\n\n" message += f"*Remaining:*\n" message += f" Size: ${data['remainingSize']:.0f}\n" message += f" Entry: ${position['entryPrice']:.2f}\n\n" message += f"*Updated Targets:*\n" message += f" TP1: ${data['newTP1']:.2f}\n" message += f" TP2: ${data['newTP2']:.2f}\n" message += f" SL: ${data['newSL']:.2f}\n\n" message += f"šŸŽÆ TP/SL orders updated for remaining size!" await update.message.reply_text(message, parse_mode='Markdown') print(f"āœ… Position reduced: {reduce_percent}%", flush=True) except Exception as e: print(f"āŒ Error: {e}", flush=True) await update.message.reply_text(f"āŒ Error: {str(e)}") async def close_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /close command - close entire position and cancel all orders""" # Only process from YOUR chat if update.message.chat_id != ALLOWED_CHAT_ID: await update.message.reply_text("āŒ Unauthorized") return print(f"šŸ”“ /close command received", flush=True) try: # First, get the current open position pos_response = requests.get( f"{TRADING_BOT_URL}/api/trading/positions", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, timeout=10 ) if not pos_response.ok: await update.message.reply_text(f"āŒ Error fetching positions: {pos_response.status_code}") return pos_data = pos_response.json() positions = pos_data.get('positions', []) if not positions: await update.message.reply_text("āŒ No open positions to close") return if len(positions) > 1: await update.message.reply_text("āŒ Multiple positions open. Specify symbol or use /reduce") return position = positions[0] symbol = position['symbol'] direction = position['direction'].upper() entry = position['entryPrice'] size = position['currentSize'] # Close position at market (100%) response = requests.post( f"{TRADING_BOT_URL}/api/trading/close", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, json={'symbol': symbol, 'percentToClose': 100}, timeout=30 ) print(f"šŸ“„ API Response: {response.status_code}", flush=True) if not response.ok: data = response.json() await update.message.reply_text(f"āŒ Error: {data.get('message', 'Unknown error')}") return data = response.json() if not data.get('success'): await update.message.reply_text(f"āŒ {data.get('message', 'Failed to close position')}") return # Build success message close_price = data.get('closePrice', 0) realized_pnl = data.get('realizedPnL', 0) emoji = "šŸ’š" if realized_pnl > 0 else "ā¤ļø" if realized_pnl < 0 else "šŸ’›" message = f"{emoji} *Position Closed*\n\n" message += f"*{symbol} {direction}*\n\n" message += f"*Entry:* ${entry:.4f}\n" message += f"*Exit:* ${close_price:.4f}\n" message += f"*Size:* ${size:.2f}\n\n" message += f"*P&L:* ${realized_pnl:.2f}\n\n" message += f"āœ… Position closed at market\n" message += f"āœ… All TP/SL orders cancelled" await update.message.reply_text(message, parse_mode='Markdown') print(f"āœ… Position closed: {symbol} | P&L: ${realized_pnl:.2f}", flush=True) except Exception as e: print(f"āŒ Error: {e}", flush=True) await update.message.reply_text(f"āŒ Error: {str(e)}") async def validate_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /validate command - check position consistency""" # Only process from YOUR chat if update.message.chat_id != ALLOWED_CHAT_ID: await update.message.reply_text("āŒ Unauthorized") return print(f"šŸ” /validate command received", flush=True) try: # Fetch validation from trading bot API response = requests.post( f"{TRADING_BOT_URL}/api/trading/validate-positions", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, timeout=10 ) print(f"šŸ“„ API Response: {response.status_code}", flush=True) if not response.ok: await update.message.reply_text(f"āŒ Error validating positions: {response.status_code}") return data = response.json() if not data.get('success'): await update.message.reply_text("āŒ Failed to validate positions") return # Get summary summary = data.get('summary', {}) config = data.get('config', {}) positions = data.get('positions', []) if not positions: await update.message.reply_text("šŸ“Š *No positions to validate*\n\nAll clear!", parse_mode='Markdown') return # Build validation report message = "šŸ” *Position Validation Report*\n\n" message += f"*Current Settings:*\n" message += f" Leverage: {config.get('leverage')}x\n" message += f" Position Size: ${config.get('positionSize')}\n" message += f" TP1: {config.get('tp1Percent')}%\n" message += f" TP2: {config.get('tp2Percent')}%\n" message += f" SL: {config.get('stopLossPercent')}%\n\n" message += f"*Summary:*\n" message += f" Total: {summary.get('totalPositions')}\n" message += f" āœ… Valid: {summary.get('validPositions')}\n" message += f" āš ļø Issues: {summary.get('positionsWithIssues')}\n\n" # Show each position with issues for pos in positions: if not pos['isValid']: message += f"*{pos['symbol']} {pos['direction'].upper()}*\n" message += f"Entry: ${pos['entryPrice']:.4f}\n" for issue in pos['issues']: emoji = "āŒ" if issue['type'] == 'error' else "āš ļø" message += f"{emoji} {issue['message']}\n" message += "\n" if summary.get('validPositions') == summary.get('totalPositions'): message = "āœ… *All positions valid!*\n\n" + message await update.message.reply_text(message, parse_mode='Markdown') print(f"āœ… Validation sent", flush=True) except Exception as e: print(f"āŒ Error: {e}", flush=True) await update.message.reply_text(f"āŒ Error: {str(e)}") async def trade_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle trade commands like /buySOL, /sellBTC, etc.""" # Only process from YOUR chat if update.message.chat_id != ALLOWED_CHAT_ID: await update.message.reply_text("āŒ Unauthorized") return # Extract command (remove the /) command = update.message.text[1:].lower() # e.g., "buysol" # Parse action and symbol if command.startswith('buy'): action = 'buy' symbol = command[3:] # e.g., "sol" elif command.startswith('sell'): action = 'sell' symbol = command[4:] # e.g., "btc" else: await update.message.reply_text("ā“ Unknown command") return message = f"{action} {symbol}" print(f"šŸ“Ø Command: {message}", flush=True) # Forward to n8n webhook - send as plain text body like TradingView does try: print(f"šŸ“¤ Sending: {message}", flush=True) response = requests.post( N8N_WEBHOOK_URL, data=message, # Plain text, not JSON headers={'Content-Type': 'text/plain'}, timeout=10 ) print(f"šŸ“„ Response status: {response.status_code}", flush=True) print(f"šŸ“„ Response body: {response.text[:200]}", flush=True) if response.ok: print(f"āœ… Sent: {message}", flush=True) await update.message.reply_text( f"šŸ¤– {action.upper()} {symbol.upper()}\n" f"āœ… Trade command sent!" ) else: print(f"āŒ Error: {response.status_code}", flush=True) await update.message.reply_text(f"āŒ Error: {response.status_code}") except Exception as e: print(f"āŒ Error: {e}", flush=True) await update.message.reply_text(f"āŒ Error: {str(e)}") async def manual_trade_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): """Execute manual long/short commands sent as plain text with analytics validation.""" if update.message is None: return if update.message.chat_id != ALLOWED_CHAT_ID: return text = update.message.text.strip().lower() parts = text.split() # Check for --force flag force_trade = '--force' in parts if force_trade: parts.remove('--force') if len(parts) != 2: return direction, symbol_key = parts[0], parts[1] if direction not in ('long', 'short'): return symbol_info = SYMBOL_MAP.get(symbol_key) if not symbol_info: return # Convert to Drift format for analytics check drift_symbol_map = { 'sol': 'SOL-PERP', 'eth': 'ETH-PERP', 'btc': 'BTC-PERP' } drift_symbol = drift_symbol_map.get(symbol_key) # šŸ†• PHASE 1: Check analytics before executing (unless forced) if not force_trade: try: print(f"šŸ” Checking re-entry analytics for {direction.upper()} {drift_symbol}", flush=True) analytics_response = requests.post( f"{TRADING_BOT_URL}/api/analytics/reentry-check", json={'symbol': drift_symbol, 'direction': direction}, timeout=10 ) if analytics_response.ok: analytics = analytics_response.json() if not analytics.get('should_enter'): # Build rejection message with data source info data_source = analytics.get('data_source', 'unknown') data_age = analytics.get('data_age_seconds') data_emoji = { 'tradingview_real': 'āœ…', 'fallback_historical': 'āš ļø', 'no_data': 'āŒ' } data_icon = data_emoji.get(data_source, 'ā“') data_age_text = f" ({data_age}s old)" if data_age else "" message = ( f"šŸ›‘ *Analytics suggest NOT entering {direction.upper()} {symbol_info['label']}*\n\n" f"*Reason:* {analytics.get('reason', 'Unknown')}\n" f"*Score:* {analytics.get('score', 0)}/100\n" f"*Data:* {data_icon} {data_source}{data_age_text}\n\n" f"Use `{text} --force` to override" ) await update.message.reply_text(message, parse_mode='Markdown') print(f"āŒ Trade blocked by analytics (score: {analytics.get('score')})", flush=True) return # Analytics passed - show confirmation data_age = analytics.get('data_age_seconds') data_source = analytics.get('data_source', 'unknown') data_age_text = f" ({data_age}s old)" if data_age else "" confirm_message = ( f"āœ… *Analytics check passed ({analytics.get('score')}/100)*\n" f"Data: {data_source}{data_age_text}\n" f"Proceeding with {direction.upper()} {symbol_info['label']}..." ) await update.message.reply_text(confirm_message, parse_mode='Markdown') print(f"āœ… Analytics passed (score: {analytics.get('score')})", flush=True) else: # Analytics endpoint failed - proceed with trade (fail-open) print(f"āš ļø Analytics check failed ({analytics_response.status_code}) - proceeding anyway", flush=True) except Exception as analytics_error: # Analytics check error - proceed with trade (fail-open) print(f"āš ļø Analytics error: {analytics_error} - proceeding anyway", flush=True) # Execute the trade metrics = MANUAL_METRICS[direction] payload = { 'symbol': symbol_info['tradingview'], 'direction': direction, 'timeframe': 'manual', 'signalStrength': 'manual', 'atr': metrics['atr'], 'adx': metrics['adx'], 'rsi': metrics['rsi'], 'volumeRatio': metrics['volumeRatio'], 'pricePosition': metrics['pricePosition'], } try: print(f"šŸš€ Manual trade: {direction.upper()} {symbol_info['label']}{' (FORCED)' if force_trade else ''}", flush=True) response = retry_request(lambda: requests.post( f"{TRADING_BOT_URL}/api/trading/execute", headers={'Authorization': f'Bearer {API_SECRET_KEY}'}, json=payload, timeout=60, )) print(f"šŸ“„ Manual trade response: {response.status_code}", flush=True) # Parse JSON even for error responses to get detailed error messages try: data = response.json() except Exception: await update.message.reply_text(f"āŒ Execution error ({response.status_code})") return if not data.get('success'): # CRITICAL: Show detailed error message (may contain "CLOSE POSITION MANUALLY") message = data.get('message') or data.get('error') or 'Trade rejected' await update.message.reply_text(f"āŒ {message}") return entry_price = data.get('entryPrice') notional = data.get('positionSize') leverage = data.get('leverage') tp1 = data.get('takeProfit1') tp2 = data.get('takeProfit2') sl = data.get('stopLoss') entry_text = f"${entry_price:.4f}" if entry_price is not None else 'n/a' size_text = ( f"${notional:.2f} @ {leverage}x" if notional is not None and leverage is not None else 'n/a' ) tp1_text = f"${tp1:.4f}" if tp1 is not None else 'n/a' tp2_text = f"${tp2:.4f}" if tp2 is not None else 'n/a' sl_text = f"${sl:.4f}" if sl is not None else 'n/a' force_indicator = " (FORCED)" if force_trade else "" success_message = ( f"āœ… OPENED {direction.upper()} {symbol_info['label']}{force_indicator}\n" f"Entry: {entry_text}\n" f"Size: {size_text}\n" f"TP1: {tp1_text}\nTP2: {tp2_text}\nSL: {sl_text}" ) await update.message.reply_text(success_message) except Exception as exc: print(f"āŒ Manual trade failed: {exc}", flush=True) await update.message.reply_text(f"āŒ Error: {exc}") def main(): """Start the bot""" print(f"šŸš€ Telegram Trade Bot Starting...", flush=True) print(f"šŸ“± Allowed Chat ID: {ALLOWED_CHAT_ID}", flush=True) print(f"šŸ”— Webhook: {N8N_WEBHOOK_URL}", flush=True) print(f"šŸ¤– Trading Bot: {TRADING_BOT_URL}", flush=True) print(f"\nāœ… Commands:", flush=True) print(f" /status - Show open positions", flush=True) print(f" /validate - Validate positions against settings", flush=True) print(f" /scale [percent] - Scale position (default 50%)", flush=True) print(f" /reduce [percent] - Take partial profits (default 50%)", flush=True) print(f" /buySOL, /sellSOL", flush=True) print(f" /buyBTC, /sellBTC", flush=True) print(f" /buyETH, /sellETH", flush=True) print(f" long sol | short btc (plain text)", flush=True) # Create application application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # Add command handlers application.add_handler(CommandHandler("status", status_command)) application.add_handler(CommandHandler("close", close_command)) application.add_handler(CommandHandler("validate", validate_command)) application.add_handler(CommandHandler("scale", scale_command)) application.add_handler(CommandHandler("reduce", reduce_command)) application.add_handler(CommandHandler("buySOL", trade_command)) application.add_handler(CommandHandler("sellSOL", trade_command)) application.add_handler(CommandHandler("buyBTC", trade_command)) application.add_handler(CommandHandler("sellBTC", trade_command)) application.add_handler(CommandHandler("buyETH", trade_command)) application.add_handler(CommandHandler("sellETH", trade_command)) application.add_handler(MessageHandler( filters.TEXT & (~filters.COMMAND), manual_trade_handler, )) # Start polling print("\nšŸ¤– Bot ready! Send commands to your Telegram.\n", flush=True) application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == '__main__': main()