Features: - FastAPI backend with stocks, news, signals, watchlist, analytics endpoints - React frontend with TailwindCSS dark mode trading dashboard - Celery workers for news fetching, sentiment analysis, pattern detection - TimescaleDB schema for time-series stock data - Docker Compose setup for all services - OpenAI integration for sentiment analysis
138 lines
4.8 KiB
Python
138 lines
4.8 KiB
Python
"""
|
|
Sentiment analysis tasks
|
|
"""
|
|
|
|
from typing import Optional
|
|
import structlog
|
|
from openai import OpenAI
|
|
|
|
from app.workers.celery_app import celery_app
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
def get_openai_client() -> Optional[OpenAI]:
|
|
"""Get OpenAI client if configured."""
|
|
if settings.OPENAI_API_KEY:
|
|
return OpenAI(api_key=settings.OPENAI_API_KEY)
|
|
return None
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.sentiment_tasks.process_unanalyzed_news")
|
|
def process_unanalyzed_news():
|
|
"""Process all news articles that haven't been sentiment analyzed."""
|
|
logger.info("Starting sentiment analysis batch")
|
|
|
|
# Placeholder - would query database for unprocessed articles
|
|
processed_count = 0
|
|
|
|
logger.info("Sentiment analysis complete", processed=processed_count)
|
|
return {"processed": processed_count}
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.sentiment_tasks.analyze_sentiment")
|
|
def analyze_sentiment(article_id: str, title: str, content: str):
|
|
"""Analyze sentiment of a single article using OpenAI."""
|
|
logger.info("Analyzing sentiment", article_id=article_id)
|
|
|
|
client = get_openai_client()
|
|
if not client:
|
|
logger.warning("OpenAI not configured, using fallback")
|
|
return fallback_sentiment_analysis(title, content)
|
|
|
|
try:
|
|
# Prepare text (limit length)
|
|
text = f"Title: {title}\n\nContent: {content[:2000]}"
|
|
|
|
response = client.chat.completions.create(
|
|
model=settings.OPENAI_MODEL,
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": """You are a financial sentiment analyzer. Analyze the given news article and respond with a JSON object containing:
|
|
- score: a number from -100 (extremely negative/panic) to +100 (extremely positive/euphoric)
|
|
- label: one of "negative", "neutral", or "positive"
|
|
- confidence: a number from 0 to 1 indicating confidence in the analysis
|
|
- stocks: list of stock symbols mentioned (if any)
|
|
- summary: one-sentence summary of the sentiment
|
|
|
|
Focus on:
|
|
- Financial impact
|
|
- Market reaction implications
|
|
- Panic/fear indicators
|
|
- Earnings/guidance implications
|
|
"""
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": text
|
|
}
|
|
],
|
|
response_format={"type": "json_object"},
|
|
temperature=0.3,
|
|
)
|
|
|
|
result = response.choices[0].message.content
|
|
logger.info("Sentiment analyzed", article_id=article_id, result=result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error("Sentiment analysis failed", article_id=article_id, error=str(e))
|
|
return fallback_sentiment_analysis(title, content)
|
|
|
|
|
|
def fallback_sentiment_analysis(title: str, content: str) -> dict:
|
|
"""Simple keyword-based sentiment analysis as fallback."""
|
|
text = f"{title} {content}".lower()
|
|
|
|
negative_words = [
|
|
"crash", "plunge", "collapse", "scandal", "fraud", "lawsuit",
|
|
"investigation", "bankruptcy", "layoffs", "miss", "decline",
|
|
"warning", "downgrade", "sell", "bear", "crisis", "fear",
|
|
"panic", "loss", "debt", "default", "recession"
|
|
]
|
|
|
|
positive_words = [
|
|
"surge", "rally", "growth", "profit", "beat", "upgrade",
|
|
"buy", "bull", "record", "breakout", "opportunity",
|
|
"dividend", "expansion", "innovation", "deal", "acquisition"
|
|
]
|
|
|
|
neg_count = sum(1 for word in negative_words if word in text)
|
|
pos_count = sum(1 for word in positive_words if word in text)
|
|
|
|
total = neg_count + pos_count
|
|
if total == 0:
|
|
score = 0
|
|
label = "neutral"
|
|
else:
|
|
score = ((pos_count - neg_count) / total) * 100
|
|
if score < -20:
|
|
label = "negative"
|
|
elif score > 20:
|
|
label = "positive"
|
|
else:
|
|
label = "neutral"
|
|
|
|
return {
|
|
"score": round(score, 2),
|
|
"label": label,
|
|
"confidence": min(0.3 + (total * 0.1), 0.7), # Low confidence for fallback
|
|
"stocks": [],
|
|
"summary": "Analyzed using keyword matching (fallback)",
|
|
}
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.sentiment_tasks.batch_analyze")
|
|
def batch_analyze(article_ids: list):
|
|
"""Analyze multiple articles in batch."""
|
|
logger.info("Starting batch analysis", count=len(article_ids))
|
|
|
|
results = []
|
|
for article_id in article_ids:
|
|
# Would fetch article from database and analyze
|
|
results.append({"article_id": article_id, "status": "pending"})
|
|
|
|
return {"analyzed": len(results), "results": results}
|