""" Alert notification tasks """ import httpx import structlog from app.workers.celery_app import celery_app from app.core.config import settings logger = structlog.get_logger() @celery_app.task(name="app.workers.tasks.alert_tasks.send_telegram_alert") def send_telegram_alert(message: str): """Send alert via Telegram.""" if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: logger.warning("Telegram not configured") return {"sent": False, "reason": "not_configured"} try: url = f"https://api.telegram.org/bot{settings.TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": settings.TELEGRAM_CHAT_ID, "text": message, "parse_mode": "Markdown", } with httpx.Client() as client: response = client.post(url, json=payload) response.raise_for_status() logger.info("Telegram alert sent") return {"sent": True} except Exception as e: logger.error("Failed to send Telegram alert", error=str(e)) return {"sent": False, "error": str(e)} @celery_app.task(name="app.workers.tasks.alert_tasks.send_discord_alert") def send_discord_alert(message: str, embed: dict = None): """Send alert via Discord webhook.""" if not settings.DISCORD_WEBHOOK_URL: logger.warning("Discord not configured") return {"sent": False, "reason": "not_configured"} try: payload = {"content": message} if embed: payload["embeds"] = [embed] with httpx.Client() as client: response = client.post(settings.DISCORD_WEBHOOK_URL, json=payload) response.raise_for_status() logger.info("Discord alert sent") return {"sent": True} except Exception as e: logger.error("Failed to send Discord alert", error=str(e)) return {"sent": False, "error": str(e)} @celery_app.task(name="app.workers.tasks.alert_tasks.send_buy_signal_alert") def send_buy_signal_alert(signal_data: dict): """Send formatted buy signal alert to all configured channels.""" logger.info("Sending buy signal alert", symbol=signal_data.get("symbol")) # Format message symbol = signal_data.get("symbol", "UNKNOWN") confidence = signal_data.get("confidence", 0) * 100 current_price = signal_data.get("price", 0) drawdown = signal_data.get("drawdown", 0) expected_recovery = signal_data.get("expected_recovery", 0) message = f""" 🚨 *BUY SIGNAL: ${symbol}* 🚨 📊 *Confidence:* {confidence:.1f}% 💰 *Current Price:* ${current_price:.2f} 📉 *Drawdown:* {drawdown:.1f}% 📈 *Expected Recovery:* {expected_recovery:.1f}% _"Buy when there's blood in the streets"_ """.strip() results = { "telegram": None, "discord": None, } # Send to Telegram if settings.TELEGRAM_BOT_TOKEN: results["telegram"] = send_telegram_alert.delay(message).get() # Send to Discord with embed if settings.DISCORD_WEBHOOK_URL: embed = { "title": f"🚨 BUY SIGNAL: ${symbol}", "color": 0x00ff00, # Green "fields": [ {"name": "Confidence", "value": f"{confidence:.1f}%", "inline": True}, {"name": "Price", "value": f"${current_price:.2f}", "inline": True}, {"name": "Drawdown", "value": f"{drawdown:.1f}%", "inline": True}, {"name": "Expected Recovery", "value": f"{expected_recovery:.1f}%", "inline": True}, ], "footer": {"text": "MarketScanner • Buy the Fear"}, } results["discord"] = send_discord_alert.delay("", embed).get() return results @celery_app.task(name="app.workers.tasks.alert_tasks.send_panic_alert") def send_panic_alert(panic_data: dict): """Send formatted panic detection alert.""" logger.info("Sending panic alert", symbol=panic_data.get("symbol")) symbol = panic_data.get("symbol", "UNKNOWN") sentiment = panic_data.get("sentiment", 0) price_drop = panic_data.get("price_drop", 0) news_count = panic_data.get("news_count", 0) message = f""" 🔴 *PANIC DETECTED: ${symbol}* 🔴 😱 *Sentiment Score:* {sentiment:.1f} 📉 *Price Drop:* {price_drop:.1f}% 📰 *News Volume:* {news_count} articles ⏳ Monitoring for buying opportunity... """.strip() results = {} if settings.TELEGRAM_BOT_TOKEN: results["telegram"] = send_telegram_alert.delay(message).get() if settings.DISCORD_WEBHOOK_URL: embed = { "title": f"🔴 PANIC DETECTED: ${symbol}", "color": 0xff0000, # Red "fields": [ {"name": "Sentiment", "value": f"{sentiment:.1f}", "inline": True}, {"name": "Price Drop", "value": f"{price_drop:.1f}%", "inline": True}, {"name": "News Volume", "value": f"{news_count} articles", "inline": True}, ], "footer": {"text": "MarketScanner • Watching for opportunity"}, } results["discord"] = send_discord_alert.delay("", embed).get() return results