// Real-time price monitoring service with automatic analysis triggering import { EventEmitter } from 'events' import { PrismaClient } from '@prisma/client' import PriceFetcher from './price-fetcher' const prisma = new PrismaClient() export interface PriceAlert { id: string symbol: string tradeId: string alertType: 'TP_APPROACH' | 'SL_APPROACH' | 'BREAKOUT' | 'REVERSAL' currentPrice: number targetPrice: number threshold: number triggered: boolean createdAt: Date } export interface TradeMonitoring { tradeId: string symbol: string side: 'BUY' | 'SELL' entryPrice: number stopLoss?: number takeProfit?: number currentPrice?: number currentPnL?: number pnlPercentage?: number distanceToTP?: number distanceToSL?: number status: 'ACTIVE' | 'APPROACHING_TP' | 'APPROACHING_SL' | 'CRITICAL' } class PriceMonitor extends EventEmitter { private static instance: PriceMonitor private monitoringInterval: NodeJS.Timeout | null = null private priceCache: Map = new Map() private alerts: Map = new Map() private isRunning = false private readonly UPDATE_INTERVAL = 1 * 60 * 1000 // 1 minute for more responsive monitoring private readonly CACHE_DURATION = 2 * 60 * 1000 // 2 minutes (slightly longer than update) // Thresholds for triggering analysis private readonly TP_APPROACH_THRESHOLD = 0.15 // 15% away from TP private readonly SL_APPROACH_THRESHOLD = 0.25 // 25% away from SL private readonly CRITICAL_THRESHOLD = 0.05 // 5% away from TP/SL private constructor() { super() } static getInstance(): PriceMonitor { if (!PriceMonitor.instance) { PriceMonitor.instance = new PriceMonitor() } return PriceMonitor.instance } async startMonitoring(): Promise { if (this.isRunning) { console.log('📊 Price monitor already running') return } this.isRunning = true console.log('🚀 Starting real-time price monitoring service') console.log(`âąī¸ Update interval: ${this.UPDATE_INTERVAL / 1000}s (5 minutes)`) // Initial price check await this.updatePricesAndAnalyze() // Set up periodic monitoring this.monitoringInterval = setInterval(async () => { try { await this.updatePricesAndAnalyze() } catch (error) { console.error('❌ Error in price monitoring cycle:', error) } }, this.UPDATE_INTERVAL) console.log('✅ Price monitoring service started') } async stopMonitoring(): Promise { if (this.monitoringInterval) { clearInterval(this.monitoringInterval) this.monitoringInterval = null } this.isRunning = false console.log('âšī¸ Price monitoring service stopped') } isMonitoring(): boolean { return this.isRunning } private async updatePricesAndAnalyze(): Promise { console.log('📊 Price monitor: Updating prices and checking positions...') try { // Get all active trades const activeTrades = await this.getActiveTradesForMonitoring() // Always fetch prices for common trading symbols, even without active trades const baseSymbols = ['SOLUSD', 'BTCUSD', 'ETHUSD'] // Common trading pairs const tradeSymbols = activeTrades.map(trade => trade.symbol) const symbols = [...new Set([...baseSymbols, ...tradeSymbols])] console.log(`📊 Updating prices for symbols: ${symbols.join(', ')}`) if (activeTrades.length === 0) { console.log('📊 No active trades to monitor, but updating base symbol prices') // Still update prices for base symbols const priceUpdates = await this.fetchPricesForSymbols(symbols) // Emit price updates for UI this.emit('pricesUpdated', priceUpdates) return } console.log(`📊 Monitoring ${activeTrades.length} active trades`) // Update prices for all symbols const priceUpdates = await this.fetchPricesForSymbols(symbols) // Analyze each trade const analysisRequests: string[] = [] const updatedTrades: TradeMonitoring[] = [] for (const trade of activeTrades) { const currentPrice = priceUpdates.get(trade.symbol) if (!currentPrice) { console.log(`âš ī¸ Could not get price for ${trade.symbol}`) continue } const monitoring = await this.analyzeTradePosition(trade, currentPrice) updatedTrades.push(monitoring) // Update trade in database with current PnL await this.updateTradeCurrentData(trade.id, currentPrice, monitoring.currentPnL!) // Check if trade should be closed (TP/SL hit) const shouldClose = await this.checkTradeClose(trade, currentPrice) if (shouldClose) { await this.closeTrade(trade.id, currentPrice, shouldClose.reason) console.log(`🔒 Trade ${trade.id.slice(-8)} closed: ${shouldClose.reason} at $${currentPrice}`) continue // Skip further processing for this trade } // Check if analysis is needed const needsAnalysis = this.shouldTriggerAnalysis(monitoring) if (needsAnalysis) { analysisRequests.push(trade.symbol) console.log(`đŸŽ¯ Analysis triggered for ${trade.symbol}: ${monitoring.status}`) } } // Emit events for UI updates this.emit('tradesUpdated', updatedTrades) this.emit('pricesUpdated', priceUpdates) // Trigger analysis for symbols that need it (deduplicated) const uniqueAnalysisRequests = [...new Set(analysisRequests)] for (const symbol of uniqueAnalysisRequests) { await this.triggerSmartAnalysis(symbol, 'PRICE_MOVEMENT') } } catch (error) { console.error('❌ Error in price monitoring update:', error) } } private async getActiveTradesForMonitoring(): Promise { return await prisma.trade.findMany({ where: { status: 'OPEN', isAutomated: true }, select: { id: true, symbol: true, side: true, amount: true, price: true, entryPrice: true, stopLoss: true, takeProfit: true, leverage: true, createdAt: true } }) } private async fetchPricesForSymbols(symbols: string[]): Promise> { const priceUpdates = new Map() for (const symbol of symbols) { try { // Check cache first const cached = this.priceCache.get(symbol) if (cached && Date.now() - cached.timestamp < this.CACHE_DURATION) { priceUpdates.set(symbol, cached.price) continue } // Fetch new price const price = await PriceFetcher.getCurrentPrice(symbol) if (price > 0) { priceUpdates.set(symbol, price) this.priceCache.set(symbol, { price, timestamp: Date.now() }) console.log(`💰 ${symbol}: $${price.toFixed(2)}`) } // Rate limiting - small delay between requests await new Promise(resolve => setTimeout(resolve, 100)) } catch (error) { console.error(`❌ Error fetching price for ${symbol}:`, error) } } return priceUpdates } private async analyzeTradePosition(trade: any, currentPrice: number): Promise { const entryPrice = trade.entryPrice || trade.price const leverage = trade.leverage || 1 // đŸ”Ĩ FIX: Get actual trading amount from session settings const session = await prisma.automationSession.findFirst({ where: { userId: trade.userId, symbol: trade.symbol }, orderBy: { createdAt: 'desc' } }) const sessionSettings = session?.settings as any const actualTradingAmount = trade.tradingAmount || sessionSettings?.tradingAmount || 34 const storedPositionValue = trade.amount * trade.price const adjustmentRatio = actualTradingAmount / storedPositionValue // Calculate PnL based on actual investment amount const priceChange = trade.side === 'BUY' ? (currentPrice - entryPrice) : (entryPrice - currentPrice) const rawPnL = priceChange * trade.amount * leverage const currentPnL = rawPnL * adjustmentRatio // Adjust for actual investment const pnlPercentage = (currentPnL / actualTradingAmount) * 100 console.log(`💰 Price Monitor P&L Calculation:`, { tradeId: trade.id, actualTradingAmount, storedPositionValue: storedPositionValue.toFixed(2), adjustmentRatio: adjustmentRatio.toFixed(4), rawPnL: rawPnL.toFixed(2), adjustedPnL: currentPnL.toFixed(2) }) // Calculate distances to TP/SL let distanceToTP: number | undefined let distanceToSL: number | undefined if (trade.takeProfit) { distanceToTP = Math.abs(currentPrice - trade.takeProfit) / Math.abs(trade.takeProfit - entryPrice) } if (trade.stopLoss) { distanceToSL = Math.abs(currentPrice - trade.stopLoss) / Math.abs(entryPrice - trade.stopLoss) } // Determine status let status: TradeMonitoring['status'] = 'ACTIVE' if (distanceToTP !== undefined && distanceToTP <= this.CRITICAL_THRESHOLD) { status = 'CRITICAL' } else if (distanceToSL !== undefined && distanceToSL <= this.CRITICAL_THRESHOLD) { status = 'CRITICAL' } else if (distanceToTP !== undefined && distanceToTP <= this.TP_APPROACH_THRESHOLD) { status = 'APPROACHING_TP' } else if (distanceToSL !== undefined && distanceToSL <= this.SL_APPROACH_THRESHOLD) { status = 'APPROACHING_SL' } return { tradeId: trade.id, symbol: trade.symbol, side: trade.side, entryPrice, stopLoss: trade.stopLoss, takeProfit: trade.takeProfit, currentPrice, currentPnL, pnlPercentage, distanceToTP, distanceToSL, status } } private shouldTriggerAnalysis(monitoring: TradeMonitoring): boolean { // Generate alert ID const alertId = `${monitoring.tradeId}_${monitoring.status}_${Date.now()}` // Check if we already triggered for this situation recently const recentAlert = Array.from(this.alerts.values()).find(alert => alert.tradeId === monitoring.tradeId && alert.alertType.includes(monitoring.status.split('_')[1] || monitoring.status) && Date.now() - alert.createdAt.getTime() < 30 * 60 * 1000 // 30 minutes cooldown ) if (recentAlert) { return false } // Trigger analysis for critical situations or approaches const shouldTrigger = monitoring.status === 'CRITICAL' || monitoring.status === 'APPROACHING_TP' || monitoring.status === 'APPROACHING_SL' if (shouldTrigger) { // Create alert const alert: PriceAlert = { id: alertId, symbol: monitoring.symbol, tradeId: monitoring.tradeId, alertType: monitoring.status === 'APPROACHING_TP' ? 'TP_APPROACH' : monitoring.status === 'APPROACHING_SL' ? 'SL_APPROACH' : 'BREAKOUT', currentPrice: monitoring.currentPrice!, targetPrice: monitoring.status === 'APPROACHING_TP' ? monitoring.takeProfit! : monitoring.stopLoss!, threshold: monitoring.status === 'APPROACHING_TP' ? monitoring.distanceToTP! : monitoring.distanceToSL!, triggered: true, createdAt: new Date() } this.alerts.set(alertId, alert) this.emit('alert', alert) } return shouldTrigger } private async updateTradeCurrentData(tradeId: string, currentPrice: number, currentPnL: number): Promise { try { await prisma.trade.update({ where: { id: tradeId }, data: { // Store current price and PnL for reference learningData: { currentPrice, currentPnL, lastUpdated: new Date().toISOString() } } }) } catch (error) { console.error(`❌ Error updating trade ${tradeId}:`, error) } } private async triggerSmartAnalysis(symbol: string, reason: string): Promise { try { console.log(`đŸŽ¯ Triggering smart analysis for ${symbol} (${reason})`) // Import automation service const { AutomationService } = await import('./automation-service-simple') // Get the service instance or create analysis request // This would trigger a new analysis cycle for the specific symbol this.emit('analysisRequested', { symbol, reason, timestamp: new Date() }) } catch (error) { console.error(`❌ Error triggering analysis for ${symbol}:`, error) } } // Public methods for external access getCurrentPrice(symbol: string): number | null { const cached = this.priceCache.get(symbol) return cached ? cached.price : null } getActiveAlerts(): PriceAlert[] { return Array.from(this.alerts.values()).filter(alert => alert.triggered) } async getTradeMonitoringData(): Promise { const activeTrades = await this.getActiveTradesForMonitoring() const results: TradeMonitoring[] = [] for (const trade of activeTrades) { const currentPrice = this.getCurrentPrice(trade.symbol) if (currentPrice) { const monitoring = await this.analyzeTradePosition(trade, currentPrice) results.push(monitoring) } } return results } // Manual price update (for immediate checks) async forceUpdatePrice(symbol: string): Promise { try { const price = await PriceFetcher.getCurrentPrice(symbol) if (price > 0) { this.priceCache.set(symbol, { price, timestamp: Date.now() }) return price } } catch (error) { console.error(`❌ Error force updating price for ${symbol}:`, error) } return null } // Check if a trade should be closed based on TP/SL private async checkTradeClose(trade: any, currentPrice: number): Promise<{ reason: string } | null> { const entryPrice = trade.entryPrice || trade.price // Check Take Profit if (trade.takeProfit) { const tpHit = (trade.side === 'BUY' && currentPrice >= trade.takeProfit) || (trade.side === 'SELL' && currentPrice <= trade.takeProfit) if (tpHit) { return { reason: 'TAKE_PROFIT' } } } // Check Stop Loss if (trade.stopLoss) { const slHit = (trade.side === 'BUY' && currentPrice <= trade.stopLoss) || (trade.side === 'SELL' && currentPrice >= trade.stopLoss) if (slHit) { return { reason: 'STOP_LOSS' } } } return null } // Close a trade by updating its status and exit data private async closeTrade(tradeId: string, exitPrice: number, reason: string): Promise { try { const trade = await prisma.trade.findUnique({ where: { id: tradeId } }) if (!trade) return const entryPrice = trade.entryPrice || trade.price const pnl = this.calculatePnL(trade.side, entryPrice, exitPrice, trade.amount) const tradingAmount = trade.amount * entryPrice // Estimate trading amount const pnlPercent = ((pnl / tradingAmount) * 100) await prisma.trade.update({ where: { id: tradeId }, data: { status: 'COMPLETED', exitPrice: exitPrice, closedAt: new Date(), profit: pnl, pnlPercent: pnlPercent, outcome: pnl > 0 ? 'WIN' : pnl < 0 ? 'LOSS' : 'BREAK_EVEN' } }) } catch (error) { console.error('Error closing trade:', error) } } // Calculate P&L for a trade private calculatePnL(side: string, entryPrice: number, exitPrice: number, amount: number): number { if (side === 'BUY') { return (exitPrice - entryPrice) * amount } else { return (entryPrice - exitPrice) * amount } } } export const priceMonitor = PriceMonitor.getInstance() export default priceMonitor