Files
marketscanner/backend/app/workers/tasks/stock_tasks.py
mindesbunister 074787f067 Initial project structure: MarketScanner - Fear-to-Fortune Trading Intelligence
Features:
- FastAPI backend with stocks, news, signals, watchlist, analytics endpoints
- React frontend with TailwindCSS dark mode trading dashboard
- Celery workers for news fetching, sentiment analysis, pattern detection
- TimescaleDB schema for time-series stock data
- Docker Compose setup for all services
- OpenAI integration for sentiment analysis
2026-01-08 14:15:51 +01:00

103 lines
3.1 KiB
Python

"""
Stock data fetching tasks
"""
import yfinance as yf
import structlog
from app.workers.celery_app import celery_app
from app.core.config import settings
logger = structlog.get_logger()
@celery_app.task(name="app.workers.tasks.stock_tasks.update_stock_prices")
def update_stock_prices():
"""Update prices for all tracked stocks."""
logger.info("Starting stock price update")
# Placeholder - would get active stocks from database
# For now, just demonstrate the concept
updated_count = 0
logger.info("Stock prices updated", count=updated_count)
return {"updated": updated_count}
@celery_app.task(name="app.workers.tasks.stock_tasks.fetch_stock_price")
def fetch_stock_price(symbol: str):
"""Fetch current price for a single stock."""
logger.info("Fetching price", symbol=symbol)
try:
ticker = yf.Ticker(symbol)
info = ticker.info
return {
"symbol": symbol,
"price": info.get("currentPrice") or info.get("regularMarketPrice"),
"previous_close": info.get("previousClose"),
"volume": info.get("volume"),
"market_cap": info.get("marketCap"),
}
except Exception as e:
logger.error("Failed to fetch price", symbol=symbol, error=str(e))
return None
@celery_app.task(name="app.workers.tasks.stock_tasks.fetch_historical_data")
def fetch_historical_data(symbol: str, period: str = "10y"):
"""Fetch historical price data for a stock."""
logger.info("Fetching historical data", symbol=symbol, period=period)
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(period=period)
# Convert to list of dicts for storage
records = []
for idx, row in hist.iterrows():
records.append({
"time": idx.isoformat(),
"open": row["Open"],
"high": row["High"],
"low": row["Low"],
"close": row["Close"],
"volume": row["Volume"],
})
logger.info(
"Historical data fetched",
symbol=symbol,
records=len(records)
)
return {"symbol": symbol, "records": len(records)}
except Exception as e:
logger.error("Failed to fetch historical data", symbol=symbol, error=str(e))
return None
@celery_app.task(name="app.workers.tasks.stock_tasks.update_stock_info")
def update_stock_info(symbol: str):
"""Update stock metadata (sector, industry, market cap, etc.)."""
logger.info("Updating stock info", symbol=symbol)
try:
ticker = yf.Ticker(symbol)
info = ticker.info
return {
"symbol": symbol,
"name": info.get("longName") or info.get("shortName"),
"sector": info.get("sector"),
"industry": info.get("industry"),
"market_cap": info.get("marketCap"),
"exchange": info.get("exchange"),
"country": info.get("country"),
}
except Exception as e:
logger.error("Failed to update stock info", symbol=symbol, error=str(e))
return None