feat: implement real-time price monitoring with automatic analysis triggering
New Features: - Real-time price monitoring service with 5-minute update cycles - Automatic analysis triggering when prices approach TP/SL levels (15%/25% thresholds) - Comprehensive price monitoring UI component with live updates - Integration with automation service for smart analysis scheduling - PnL tracking and position status monitoring - EventEmitter-based real-time updates - CoinGecko API integration with rate limiting - TP/SL approach detection with configurable thresholds - Alert system for critical price movements - Database integration for trade tracking - Price monitor startup/shutdown with automation lifecycle - Event listeners for TP_APPROACH, SL_APPROACH, CRITICAL alerts - Automatic screenshot capture and AI analysis on price triggers - Enhanced progress tracking for price-based analysis - Intelligent analysis context with price movement data - RealTimePriceMonitor component with live price display - Trade monitoring cards with P&L and distance to TP/SL - Active alerts panel with price threshold notifications - Monitoring service controls (start/stop/force update) - Integration with automation page for comprehensive oversight - GET: Retrieve monitoring data, alerts, and current prices - POST: Control monitoring service and force price updates - Real-time data formatting and status management - Comprehensive price monitor integration tests - Basic functionality validation scripts - API endpoint testing capabilities This implements the user's request for real-time price monitoring with automatic analysis triggering when prices approach critical levels, providing enhanced oversight of active trading positions.
This commit is contained in:
377
lib/price-monitor.ts
Normal file
377
lib/price-monitor.ts
Normal file
@@ -0,0 +1,377 @@
|
||||
// Real-time price monitoring service with automatic analysis triggering
|
||||
import { EventEmitter } from 'events'
|
||||
import { PrismaClient } from '@prisma/client'
|
||||
import PriceFetcher from './price-fetcher'
|
||||
|
||||
const prisma = new PrismaClient()
|
||||
|
||||
export interface PriceAlert {
|
||||
id: string
|
||||
symbol: string
|
||||
tradeId: string
|
||||
alertType: 'TP_APPROACH' | 'SL_APPROACH' | 'BREAKOUT' | 'REVERSAL'
|
||||
currentPrice: number
|
||||
targetPrice: number
|
||||
threshold: number
|
||||
triggered: boolean
|
||||
createdAt: Date
|
||||
}
|
||||
|
||||
export interface TradeMonitoring {
|
||||
tradeId: string
|
||||
symbol: string
|
||||
side: 'BUY' | 'SELL'
|
||||
entryPrice: number
|
||||
stopLoss?: number
|
||||
takeProfit?: number
|
||||
currentPrice?: number
|
||||
currentPnL?: number
|
||||
pnlPercentage?: number
|
||||
distanceToTP?: number
|
||||
distanceToSL?: number
|
||||
status: 'ACTIVE' | 'APPROACHING_TP' | 'APPROACHING_SL' | 'CRITICAL'
|
||||
}
|
||||
|
||||
class PriceMonitor extends EventEmitter {
|
||||
private static instance: PriceMonitor
|
||||
private monitoringInterval: NodeJS.Timeout | null = null
|
||||
private priceCache: Map<string, { price: number; timestamp: number }> = new Map()
|
||||
private alerts: Map<string, PriceAlert> = new Map()
|
||||
private isRunning = false
|
||||
private readonly UPDATE_INTERVAL = 5 * 60 * 1000 // 5 minutes
|
||||
private readonly CACHE_DURATION = 6 * 60 * 1000 // 6 minutes (slightly longer than update)
|
||||
|
||||
// Thresholds for triggering analysis
|
||||
private readonly TP_APPROACH_THRESHOLD = 0.15 // 15% away from TP
|
||||
private readonly SL_APPROACH_THRESHOLD = 0.25 // 25% away from SL
|
||||
private readonly CRITICAL_THRESHOLD = 0.05 // 5% away from TP/SL
|
||||
|
||||
private constructor() {
|
||||
super()
|
||||
}
|
||||
|
||||
static getInstance(): PriceMonitor {
|
||||
if (!PriceMonitor.instance) {
|
||||
PriceMonitor.instance = new PriceMonitor()
|
||||
}
|
||||
return PriceMonitor.instance
|
||||
}
|
||||
|
||||
async startMonitoring(): Promise<void> {
|
||||
if (this.isRunning) {
|
||||
console.log('📊 Price monitor already running')
|
||||
return
|
||||
}
|
||||
|
||||
this.isRunning = true
|
||||
console.log('🚀 Starting real-time price monitoring service')
|
||||
console.log(`⏱️ Update interval: ${this.UPDATE_INTERVAL / 1000}s (5 minutes)`)
|
||||
|
||||
// Initial price check
|
||||
await this.updatePricesAndAnalyze()
|
||||
|
||||
// Set up periodic monitoring
|
||||
this.monitoringInterval = setInterval(async () => {
|
||||
try {
|
||||
await this.updatePricesAndAnalyze()
|
||||
} catch (error) {
|
||||
console.error('❌ Error in price monitoring cycle:', error)
|
||||
}
|
||||
}, this.UPDATE_INTERVAL)
|
||||
|
||||
console.log('✅ Price monitoring service started')
|
||||
}
|
||||
|
||||
async stopMonitoring(): Promise<void> {
|
||||
if (this.monitoringInterval) {
|
||||
clearInterval(this.monitoringInterval)
|
||||
this.monitoringInterval = null
|
||||
}
|
||||
this.isRunning = false
|
||||
console.log('⏹️ Price monitoring service stopped')
|
||||
}
|
||||
|
||||
private async updatePricesAndAnalyze(): Promise<void> {
|
||||
console.log('📊 Price monitor: Updating prices and checking positions...')
|
||||
|
||||
try {
|
||||
// Get all active trades
|
||||
const activeTrades = await this.getActiveTradesForMonitoring()
|
||||
|
||||
if (activeTrades.length === 0) {
|
||||
console.log('📊 No active trades to monitor')
|
||||
return
|
||||
}
|
||||
|
||||
console.log(`📊 Monitoring ${activeTrades.length} active trades`)
|
||||
|
||||
// Get unique symbols
|
||||
const symbols = [...new Set(activeTrades.map(trade => trade.symbol))]
|
||||
|
||||
// Update prices for all symbols
|
||||
const priceUpdates = await this.fetchPricesForSymbols(symbols)
|
||||
|
||||
// Analyze each trade
|
||||
const analysisRequests: string[] = []
|
||||
const updatedTrades: TradeMonitoring[] = []
|
||||
|
||||
for (const trade of activeTrades) {
|
||||
const currentPrice = priceUpdates.get(trade.symbol)
|
||||
|
||||
if (!currentPrice) {
|
||||
console.log(`⚠️ Could not get price for ${trade.symbol}`)
|
||||
continue
|
||||
}
|
||||
|
||||
const monitoring = await this.analyzeTradePosition(trade, currentPrice)
|
||||
updatedTrades.push(monitoring)
|
||||
|
||||
// Update trade in database with current PnL
|
||||
await this.updateTradeCurrentData(trade.id, currentPrice, monitoring.currentPnL!)
|
||||
|
||||
// Check if analysis is needed
|
||||
const needsAnalysis = this.shouldTriggerAnalysis(monitoring)
|
||||
if (needsAnalysis) {
|
||||
analysisRequests.push(trade.symbol)
|
||||
console.log(`🎯 Analysis triggered for ${trade.symbol}: ${monitoring.status}`)
|
||||
}
|
||||
}
|
||||
|
||||
// Emit events for UI updates
|
||||
this.emit('tradesUpdated', updatedTrades)
|
||||
this.emit('pricesUpdated', priceUpdates)
|
||||
|
||||
// Trigger analysis for symbols that need it (deduplicated)
|
||||
const uniqueAnalysisRequests = [...new Set(analysisRequests)]
|
||||
for (const symbol of uniqueAnalysisRequests) {
|
||||
await this.triggerSmartAnalysis(symbol, 'PRICE_MOVEMENT')
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('❌ Error in price monitoring update:', error)
|
||||
}
|
||||
}
|
||||
|
||||
private async getActiveTradesForMonitoring(): Promise<any[]> {
|
||||
return await prisma.trade.findMany({
|
||||
where: {
|
||||
status: 'OPEN',
|
||||
isAutomated: true
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
symbol: true,
|
||||
side: true,
|
||||
amount: true,
|
||||
price: true,
|
||||
entryPrice: true,
|
||||
stopLoss: true,
|
||||
takeProfit: true,
|
||||
leverage: true,
|
||||
createdAt: true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private async fetchPricesForSymbols(symbols: string[]): Promise<Map<string, number>> {
|
||||
const priceUpdates = new Map<string, number>()
|
||||
|
||||
for (const symbol of symbols) {
|
||||
try {
|
||||
// Check cache first
|
||||
const cached = this.priceCache.get(symbol)
|
||||
if (cached && Date.now() - cached.timestamp < this.CACHE_DURATION) {
|
||||
priceUpdates.set(symbol, cached.price)
|
||||
continue
|
||||
}
|
||||
|
||||
// Fetch new price
|
||||
const price = await PriceFetcher.getCurrentPrice(symbol)
|
||||
if (price > 0) {
|
||||
priceUpdates.set(symbol, price)
|
||||
this.priceCache.set(symbol, { price, timestamp: Date.now() })
|
||||
console.log(`💰 ${symbol}: $${price.toFixed(2)}`)
|
||||
}
|
||||
|
||||
// Rate limiting - small delay between requests
|
||||
await new Promise(resolve => setTimeout(resolve, 100))
|
||||
|
||||
} catch (error) {
|
||||
console.error(`❌ Error fetching price for ${symbol}:`, error)
|
||||
}
|
||||
}
|
||||
|
||||
return priceUpdates
|
||||
}
|
||||
|
||||
private async analyzeTradePosition(trade: any, currentPrice: number): Promise<TradeMonitoring> {
|
||||
const entryPrice = trade.entryPrice || trade.price
|
||||
const leverage = trade.leverage || 1
|
||||
|
||||
// Calculate PnL
|
||||
const priceChange = trade.side === 'BUY' ?
|
||||
(currentPrice - entryPrice) :
|
||||
(entryPrice - currentPrice)
|
||||
|
||||
const currentPnL = priceChange * trade.amount * leverage
|
||||
const pnlPercentage = (priceChange / entryPrice) * 100 * leverage
|
||||
|
||||
// Calculate distances to TP/SL
|
||||
let distanceToTP: number | undefined
|
||||
let distanceToSL: number | undefined
|
||||
|
||||
if (trade.takeProfit) {
|
||||
distanceToTP = Math.abs(currentPrice - trade.takeProfit) / Math.abs(trade.takeProfit - entryPrice)
|
||||
}
|
||||
|
||||
if (trade.stopLoss) {
|
||||
distanceToSL = Math.abs(currentPrice - trade.stopLoss) / Math.abs(entryPrice - trade.stopLoss)
|
||||
}
|
||||
|
||||
// Determine status
|
||||
let status: TradeMonitoring['status'] = 'ACTIVE'
|
||||
|
||||
if (distanceToTP !== undefined && distanceToTP <= this.CRITICAL_THRESHOLD) {
|
||||
status = 'CRITICAL'
|
||||
} else if (distanceToSL !== undefined && distanceToSL <= this.CRITICAL_THRESHOLD) {
|
||||
status = 'CRITICAL'
|
||||
} else if (distanceToTP !== undefined && distanceToTP <= this.TP_APPROACH_THRESHOLD) {
|
||||
status = 'APPROACHING_TP'
|
||||
} else if (distanceToSL !== undefined && distanceToSL <= this.SL_APPROACH_THRESHOLD) {
|
||||
status = 'APPROACHING_SL'
|
||||
}
|
||||
|
||||
return {
|
||||
tradeId: trade.id,
|
||||
symbol: trade.symbol,
|
||||
side: trade.side,
|
||||
entryPrice,
|
||||
stopLoss: trade.stopLoss,
|
||||
takeProfit: trade.takeProfit,
|
||||
currentPrice,
|
||||
currentPnL,
|
||||
pnlPercentage,
|
||||
distanceToTP,
|
||||
distanceToSL,
|
||||
status
|
||||
}
|
||||
}
|
||||
|
||||
private shouldTriggerAnalysis(monitoring: TradeMonitoring): boolean {
|
||||
// Generate alert ID
|
||||
const alertId = `${monitoring.tradeId}_${monitoring.status}_${Date.now()}`
|
||||
|
||||
// Check if we already triggered for this situation recently
|
||||
const recentAlert = Array.from(this.alerts.values()).find(alert =>
|
||||
alert.tradeId === monitoring.tradeId &&
|
||||
alert.alertType.includes(monitoring.status.split('_')[1] || monitoring.status) &&
|
||||
Date.now() - alert.createdAt.getTime() < 30 * 60 * 1000 // 30 minutes cooldown
|
||||
)
|
||||
|
||||
if (recentAlert) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Trigger analysis for critical situations or approaches
|
||||
const shouldTrigger = monitoring.status === 'CRITICAL' ||
|
||||
monitoring.status === 'APPROACHING_TP' ||
|
||||
monitoring.status === 'APPROACHING_SL'
|
||||
|
||||
if (shouldTrigger) {
|
||||
// Create alert
|
||||
const alert: PriceAlert = {
|
||||
id: alertId,
|
||||
symbol: monitoring.symbol,
|
||||
tradeId: monitoring.tradeId,
|
||||
alertType: monitoring.status === 'APPROACHING_TP' ? 'TP_APPROACH' :
|
||||
monitoring.status === 'APPROACHING_SL' ? 'SL_APPROACH' : 'BREAKOUT',
|
||||
currentPrice: monitoring.currentPrice!,
|
||||
targetPrice: monitoring.status === 'APPROACHING_TP' ? monitoring.takeProfit! : monitoring.stopLoss!,
|
||||
threshold: monitoring.status === 'APPROACHING_TP' ? monitoring.distanceToTP! : monitoring.distanceToSL!,
|
||||
triggered: true,
|
||||
createdAt: new Date()
|
||||
}
|
||||
|
||||
this.alerts.set(alertId, alert)
|
||||
this.emit('alert', alert)
|
||||
}
|
||||
|
||||
return shouldTrigger
|
||||
}
|
||||
|
||||
private async updateTradeCurrentData(tradeId: string, currentPrice: number, currentPnL: number): Promise<void> {
|
||||
try {
|
||||
await prisma.trade.update({
|
||||
where: { id: tradeId },
|
||||
data: {
|
||||
// Store current price and PnL for reference
|
||||
learningData: {
|
||||
currentPrice,
|
||||
currentPnL,
|
||||
lastUpdated: new Date().toISOString()
|
||||
}
|
||||
}
|
||||
})
|
||||
} catch (error) {
|
||||
console.error(`❌ Error updating trade ${tradeId}:`, error)
|
||||
}
|
||||
}
|
||||
|
||||
private async triggerSmartAnalysis(symbol: string, reason: string): Promise<void> {
|
||||
try {
|
||||
console.log(`🎯 Triggering smart analysis for ${symbol} (${reason})`)
|
||||
|
||||
// Import automation service
|
||||
const { AutomationService } = await import('./automation-service-simple')
|
||||
|
||||
// Get the service instance or create analysis request
|
||||
// This would trigger a new analysis cycle for the specific symbol
|
||||
this.emit('analysisRequested', { symbol, reason, timestamp: new Date() })
|
||||
|
||||
} catch (error) {
|
||||
console.error(`❌ Error triggering analysis for ${symbol}:`, error)
|
||||
}
|
||||
}
|
||||
|
||||
// Public methods for external access
|
||||
getCurrentPrice(symbol: string): number | null {
|
||||
const cached = this.priceCache.get(symbol)
|
||||
return cached ? cached.price : null
|
||||
}
|
||||
|
||||
getActiveAlerts(): PriceAlert[] {
|
||||
return Array.from(this.alerts.values()).filter(alert => alert.triggered)
|
||||
}
|
||||
|
||||
async getTradeMonitoringData(): Promise<TradeMonitoring[]> {
|
||||
const activeTrades = await this.getActiveTradesForMonitoring()
|
||||
const results: TradeMonitoring[] = []
|
||||
|
||||
for (const trade of activeTrades) {
|
||||
const currentPrice = this.getCurrentPrice(trade.symbol)
|
||||
if (currentPrice) {
|
||||
const monitoring = await this.analyzeTradePosition(trade, currentPrice)
|
||||
results.push(monitoring)
|
||||
}
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// Manual price update (for immediate checks)
|
||||
async forceUpdatePrice(symbol: string): Promise<number | null> {
|
||||
try {
|
||||
const price = await PriceFetcher.getCurrentPrice(symbol)
|
||||
if (price > 0) {
|
||||
this.priceCache.set(symbol, { price, timestamp: Date.now() })
|
||||
return price
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`❌ Error force updating price for ${symbol}:`, error)
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export const priceMonitor = PriceMonitor.getInstance()
|
||||
export default priceMonitor
|
||||
Reference in New Issue
Block a user