Features: - FastAPI backend with stocks, news, signals, watchlist, analytics endpoints - React frontend with TailwindCSS dark mode trading dashboard - Celery workers for news fetching, sentiment analysis, pattern detection - TimescaleDB schema for time-series stock data - Docker Compose setup for all services - OpenAI integration for sentiment analysis
103 lines
3.1 KiB
Python
103 lines
3.1 KiB
Python
"""
|
|
Stock data fetching tasks
|
|
"""
|
|
|
|
import yfinance as yf
|
|
import structlog
|
|
|
|
from app.workers.celery_app import celery_app
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.stock_tasks.update_stock_prices")
|
|
def update_stock_prices():
|
|
"""Update prices for all tracked stocks."""
|
|
logger.info("Starting stock price update")
|
|
|
|
# Placeholder - would get active stocks from database
|
|
# For now, just demonstrate the concept
|
|
|
|
updated_count = 0
|
|
|
|
logger.info("Stock prices updated", count=updated_count)
|
|
return {"updated": updated_count}
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.stock_tasks.fetch_stock_price")
|
|
def fetch_stock_price(symbol: str):
|
|
"""Fetch current price for a single stock."""
|
|
logger.info("Fetching price", symbol=symbol)
|
|
|
|
try:
|
|
ticker = yf.Ticker(symbol)
|
|
info = ticker.info
|
|
|
|
return {
|
|
"symbol": symbol,
|
|
"price": info.get("currentPrice") or info.get("regularMarketPrice"),
|
|
"previous_close": info.get("previousClose"),
|
|
"volume": info.get("volume"),
|
|
"market_cap": info.get("marketCap"),
|
|
}
|
|
except Exception as e:
|
|
logger.error("Failed to fetch price", symbol=symbol, error=str(e))
|
|
return None
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.stock_tasks.fetch_historical_data")
|
|
def fetch_historical_data(symbol: str, period: str = "10y"):
|
|
"""Fetch historical price data for a stock."""
|
|
logger.info("Fetching historical data", symbol=symbol, period=period)
|
|
|
|
try:
|
|
ticker = yf.Ticker(symbol)
|
|
hist = ticker.history(period=period)
|
|
|
|
# Convert to list of dicts for storage
|
|
records = []
|
|
for idx, row in hist.iterrows():
|
|
records.append({
|
|
"time": idx.isoformat(),
|
|
"open": row["Open"],
|
|
"high": row["High"],
|
|
"low": row["Low"],
|
|
"close": row["Close"],
|
|
"volume": row["Volume"],
|
|
})
|
|
|
|
logger.info(
|
|
"Historical data fetched",
|
|
symbol=symbol,
|
|
records=len(records)
|
|
)
|
|
return {"symbol": symbol, "records": len(records)}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to fetch historical data", symbol=symbol, error=str(e))
|
|
return None
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.stock_tasks.update_stock_info")
|
|
def update_stock_info(symbol: str):
|
|
"""Update stock metadata (sector, industry, market cap, etc.)."""
|
|
logger.info("Updating stock info", symbol=symbol)
|
|
|
|
try:
|
|
ticker = yf.Ticker(symbol)
|
|
info = ticker.info
|
|
|
|
return {
|
|
"symbol": symbol,
|
|
"name": info.get("longName") or info.get("shortName"),
|
|
"sector": info.get("sector"),
|
|
"industry": info.get("industry"),
|
|
"market_cap": info.get("marketCap"),
|
|
"exchange": info.get("exchange"),
|
|
"country": info.get("country"),
|
|
}
|
|
except Exception as e:
|
|
logger.error("Failed to update stock info", symbol=symbol, error=str(e))
|
|
return None
|