- Added telegram_command_bot.py with slash commands (/buySOL, /sellBTC, etc) - Docker compose setup with DNS configuration - Sends trades as plain text to n8n webhook (same format as TradingView) - Improved Telegram success message formatting - Only responds to authorized chat ID (579304651) - Commands: /buySOL, /sellSOL, /buyBTC, /sellBTC, /buyETH, /sellETH
81 lines
2.7 KiB
Python
81 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Telegram to n8n Webhook Bridge
|
|
Monitors your Telegram chat for trade commands and forwards to n8n webhook
|
|
"""
|
|
import os
|
|
import requests
|
|
from telegram import Update
|
|
from telegram.ext import Application, MessageHandler, filters, ContextTypes
|
|
|
|
# Configuration
|
|
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', 'YOUR_BOT_TOKEN')
|
|
N8N_WEBHOOK_URL = os.getenv('N8N_WEBHOOK_URL', 'https://your-n8n.com/webhook/manual-telegram-trade')
|
|
ALLOWED_CHAT_ID = int(os.getenv('TELEGRAM_CHAT_ID', '579304651'))
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle incoming Telegram messages"""
|
|
|
|
# Only process messages from your chat
|
|
if update.message.chat_id != ALLOWED_CHAT_ID:
|
|
return
|
|
|
|
message_text = update.message.text.lower()
|
|
|
|
# Only process trade commands (containing buy/sell/long/short)
|
|
if not any(word in message_text for word in ['buy', 'sell', 'long', 'short']):
|
|
return
|
|
|
|
print(f"📨 Received trade command: {message_text}")
|
|
|
|
# Forward to n8n webhook
|
|
try:
|
|
response = requests.post(
|
|
N8N_WEBHOOK_URL,
|
|
json={'message': message_text},
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
print(f"✅ Forwarded to n8n: {message_text}")
|
|
# Send confirmation
|
|
await update.message.reply_text(
|
|
f"🤖 Processing: {message_text}\n"
|
|
f"Forwarded to trading bot..."
|
|
)
|
|
else:
|
|
print(f"❌ Webhook error: {response.status_code}")
|
|
await update.message.reply_text(f"❌ Error: Webhook returned {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error forwarding to webhook: {e}")
|
|
await update.message.reply_text(f"❌ Error: {str(e)}")
|
|
|
|
def main():
|
|
"""Start the bot"""
|
|
|
|
if TELEGRAM_BOT_TOKEN == 'YOUR_BOT_TOKEN':
|
|
print("❌ Error: Set TELEGRAM_BOT_TOKEN environment variable")
|
|
return
|
|
|
|
if N8N_WEBHOOK_URL == 'https://your-n8n.com/webhook/manual-telegram-trade':
|
|
print("❌ Error: Set N8N_WEBHOOK_URL environment variable")
|
|
return
|
|
|
|
print(f"🚀 Starting Telegram to n8n bridge...")
|
|
print(f"📱 Monitoring chat ID: {ALLOWED_CHAT_ID}")
|
|
print(f"🔗 Webhook URL: {N8N_WEBHOOK_URL}")
|
|
|
|
# Create application
|
|
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
|
|
|
|
# Add message handler
|
|
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
|
|
# Start polling
|
|
print("✅ Bot started! Send trade commands to your Telegram chat.")
|
|
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|