Files
marketscanner/backend/app/workers/tasks/pattern_tasks.py
mindesbunister 074787f067 Initial project structure: MarketScanner - Fear-to-Fortune Trading Intelligence
Features:
- FastAPI backend with stocks, news, signals, watchlist, analytics endpoints
- React frontend with TailwindCSS dark mode trading dashboard
- Celery workers for news fetching, sentiment analysis, pattern detection
- TimescaleDB schema for time-series stock data
- Docker Compose setup for all services
- OpenAI integration for sentiment analysis
2026-01-08 14:15:51 +01:00

143 lines
4.8 KiB
Python

"""
Pattern detection and buy signal generation tasks
"""
from datetime import datetime, timedelta
import structlog
from app.workers.celery_app import celery_app
from app.core.config import settings
logger = structlog.get_logger()
@celery_app.task(name="app.workers.tasks.pattern_tasks.detect_panic_events")
def detect_panic_events():
"""Detect new panic events based on sentiment and price drops."""
logger.info("Starting panic event detection")
# Detection criteria:
# 1. Sentiment score drops below threshold
# 2. Price drops significantly (>5% in 24h)
# 3. News volume spikes
# Placeholder - actual implementation would:
# - Query recent news sentiment by stock
# - Check price movements
# - Create panic_events records
detected_count = 0
logger.info("Panic detection complete", detected=detected_count)
return {"detected": detected_count}
@celery_app.task(name="app.workers.tasks.pattern_tasks.generate_buy_signals")
def generate_buy_signals():
"""Generate buy signals based on historical patterns."""
logger.info("Starting buy signal generation")
# Signal generation criteria:
# 1. Active panic event exists
# 2. Similar historical events had good recovery
# 3. Price is near or past typical bottom
# 4. Volume indicates capitulation
# Placeholder - actual implementation would:
# - Find stocks with active panic events
# - Match against historical patterns
# - Calculate confidence scores
# - Create buy_signals records
signals_count = 0
logger.info("Signal generation complete", signals=signals_count)
return {"generated": signals_count}
@celery_app.task(name="app.workers.tasks.pattern_tasks.analyze_historical_pattern")
def analyze_historical_pattern(stock_id: str, event_type: str):
"""Analyze historical patterns for a specific stock and event type."""
logger.info("Analyzing historical pattern", stock_id=stock_id, event_type=event_type)
# Would query past panic events for this stock
# Calculate statistics:
# - Average/median drawdown
# - Average/median recovery time
# - Average/median recovery percentage
# - Success rate (how often did it recover)
return {
"stock_id": stock_id,
"event_type": event_type,
"pattern": None, # Would contain pattern data
}
@celery_app.task(name="app.workers.tasks.pattern_tasks.calculate_confidence_score")
def calculate_confidence_score(
stock_id: str,
current_drawdown: float,
current_sentiment: float,
historical_pattern: dict,
) -> float:
"""Calculate confidence score for a potential buy signal."""
# Factors:
# 1. How close is current drawdown to historical average
# 2. How negative is sentiment (capitulation indicator)
# 3. Pattern reliability (sample size, consistency)
# 4. Market conditions (sector performance, overall market)
score = 0.5 # Base score
# Adjust based on drawdown match
if historical_pattern and historical_pattern.get("avg_drawdown"):
avg_drawdown = historical_pattern["avg_drawdown"]
drawdown_ratio = current_drawdown / avg_drawdown
if 0.8 <= drawdown_ratio <= 1.2:
score += 0.2 # Close to historical average
# Adjust based on sentiment (more panic = higher score)
if current_sentiment < settings.PANIC_THRESHOLD:
panic_intensity = abs(current_sentiment - settings.PANIC_THRESHOLD) / 50
score += min(panic_intensity * 0.2, 0.2)
# Adjust based on pattern reliability
if historical_pattern and historical_pattern.get("event_count", 0) >= 3:
score += 0.1 # Multiple historical examples
return min(max(score, 0), 1) # Clamp to 0-1
@celery_app.task(name="app.workers.tasks.pattern_tasks.update_panic_event_status")
def update_panic_event_status():
"""Update panic events - check if they've ended/recovered."""
logger.info("Updating panic event statuses")
# Check active (incomplete) panic events
# Mark as complete if:
# - Price has recovered to pre-panic levels
# - Sentiment has normalized
# - Enough time has passed
updated_count = 0
logger.info("Panic status update complete", updated=updated_count)
return {"updated": updated_count}
@celery_app.task(name="app.workers.tasks.pattern_tasks.rebuild_patterns")
def rebuild_patterns(stock_id: str = None):
"""Rebuild historical patterns from panic events."""
logger.info("Rebuilding patterns", stock_id=stock_id or "all")
# Aggregate all completed panic events
# Group by stock and event type
# Calculate pattern statistics
patterns_count = 0
logger.info("Pattern rebuild complete", patterns=patterns_count)
return {"rebuilt": patterns_count}