- Enhanced DNS failover monitor on secondary (72.62.39.24) - Auto-promotes database: pg_ctl promote on failover - Creates DEMOTED flag on primary via SSH (split-brain protection) - Telegram notifications with database promotion status - Startup safety script ready (integration pending) - 90-second automatic recovery vs 10-30 min manual - Zero-cost 95% enterprise HA benefit Status: DEPLOYED and MONITORING (14:52 CET) Next: Controlled failover test during maintenance
536 lines
19 KiB
TypeScript
536 lines
19 KiB
TypeScript
/**
|
|
* Smart Entry Validation Queue
|
|
*
|
|
* Purpose: Monitor blocked signals (quality 50-89) and validate them using 1-minute price action.
|
|
* Instead of hard-blocking marginal quality signals, we "Block & Watch" - enter if price confirms
|
|
* the direction, abandon if price moves against us.
|
|
*
|
|
* This system bridges the gap between:
|
|
* - High quality signals (90+): Immediate execution
|
|
* - Marginal quality signals (50-89): Block & Watch → Execute if confirmed
|
|
* - Low quality signals (<50): Hard block, no validation
|
|
*/
|
|
|
|
import { getMarketDataCache } from './market-data-cache'
|
|
import { logger } from '../utils/logger'
|
|
import { getMergedConfig } from '../../config/trading'
|
|
import { sendValidationNotification } from '../notifications/telegram'
|
|
|
|
interface QueuedSignal {
|
|
id: string
|
|
symbol: string
|
|
direction: 'long' | 'short'
|
|
originalPrice: number
|
|
originalSignalData: {
|
|
atr?: number
|
|
adx?: number
|
|
rsi?: number
|
|
volumeRatio?: number
|
|
pricePosition?: number
|
|
indicatorVersion?: string
|
|
timeframe?: string
|
|
}
|
|
qualityScore: number
|
|
blockedAt: number // Unix timestamp
|
|
entryWindowMinutes: number // How long to watch (default: 10)
|
|
confirmationThreshold: number // % move needed to confirm (default: 0.3%)
|
|
maxDrawdown: number // % move against to abandon (default: -0.4%)
|
|
highestPrice?: number // Track highest price seen (for longs)
|
|
lowestPrice?: number // Track lowest price seen (for shorts)
|
|
status: 'pending' | 'confirmed' | 'abandoned' | 'expired' | 'executed'
|
|
validatedAt?: number
|
|
executedAt?: number
|
|
executionPrice?: number
|
|
tradeId?: string
|
|
}
|
|
|
|
class SmartValidationQueue {
|
|
private queue: Map<string, QueuedSignal> = new Map()
|
|
private monitoringInterval?: NodeJS.Timeout
|
|
private isMonitoring = false
|
|
|
|
constructor() {
|
|
logger.log('🧠 Smart Validation Queue initialized')
|
|
}
|
|
|
|
/**
|
|
* Add a blocked signal to validation queue
|
|
*/
|
|
async addSignal(params: {
|
|
blockReason: string
|
|
symbol: string
|
|
direction: 'long' | 'short'
|
|
originalPrice: number
|
|
qualityScore: number
|
|
atr?: number
|
|
adx?: number
|
|
rsi?: number
|
|
volumeRatio?: number
|
|
pricePosition?: number
|
|
indicatorVersion?: string
|
|
timeframe?: string
|
|
}): Promise<QueuedSignal | null> {
|
|
const config = getMergedConfig()
|
|
|
|
// Only queue signals blocked for quality (not cooldown, rate limits, etc.)
|
|
if (params.blockReason !== 'QUALITY_SCORE_TOO_LOW') {
|
|
return null
|
|
}
|
|
|
|
// Only queue marginal quality signals (50-89)
|
|
// Below 50: Too low quality, don't validate
|
|
// 90+: Should have been executed, not blocked
|
|
if (params.qualityScore < 50 || params.qualityScore >= 90) {
|
|
return null
|
|
}
|
|
|
|
const signalId = `${params.symbol}_${params.direction}_${Date.now()}`
|
|
|
|
const queuedSignal: QueuedSignal = {
|
|
id: signalId,
|
|
symbol: params.symbol,
|
|
direction: params.direction,
|
|
originalPrice: params.originalPrice,
|
|
originalSignalData: {
|
|
atr: params.atr,
|
|
adx: params.adx,
|
|
rsi: params.rsi,
|
|
volumeRatio: params.volumeRatio,
|
|
pricePosition: params.pricePosition,
|
|
indicatorVersion: params.indicatorVersion,
|
|
timeframe: params.timeframe,
|
|
},
|
|
qualityScore: params.qualityScore,
|
|
blockedAt: Date.now(),
|
|
entryWindowMinutes: 90, // Two-stage: watch for 90 minutes
|
|
confirmationThreshold: 0.3, // Two-stage: need +0.3% move to confirm
|
|
maxDrawdown: -1.0, // Abandon if -1.0% against direction (widened from 0.4%)
|
|
highestPrice: params.originalPrice,
|
|
lowestPrice: params.originalPrice,
|
|
status: 'pending',
|
|
}
|
|
|
|
this.queue.set(signalId, queuedSignal)
|
|
console.log(`⏰ Smart validation queued: ${params.symbol} ${params.direction.toUpperCase()} @ $${params.originalPrice.toFixed(2)} (quality: ${params.qualityScore})`)
|
|
console.log(` Two-stage watch ${queuedSignal.entryWindowMinutes}min: +${queuedSignal.confirmationThreshold}% confirms, ${queuedSignal.maxDrawdown}% abandons`)
|
|
|
|
// Send Telegram notification
|
|
await sendValidationNotification({
|
|
event: 'queued',
|
|
symbol: params.symbol,
|
|
direction: params.direction,
|
|
originalPrice: params.originalPrice,
|
|
qualityScore: params.qualityScore,
|
|
})
|
|
|
|
// Start monitoring if not already running
|
|
if (!this.isMonitoring) {
|
|
this.startMonitoring()
|
|
}
|
|
|
|
return queuedSignal
|
|
}
|
|
|
|
/**
|
|
* Start monitoring queued signals
|
|
*/
|
|
private startMonitoring(): void {
|
|
if (this.isMonitoring) {
|
|
return
|
|
}
|
|
|
|
this.isMonitoring = true
|
|
console.log('👁️ Smart validation monitoring started (checks every 30s)')
|
|
|
|
// Check every 30 seconds
|
|
this.monitoringInterval = setInterval(async () => {
|
|
await this.checkValidations()
|
|
}, 30000)
|
|
}
|
|
|
|
/**
|
|
* Stop monitoring
|
|
*/
|
|
stopMonitoring(): void {
|
|
if (this.monitoringInterval) {
|
|
clearInterval(this.monitoringInterval)
|
|
this.monitoringInterval = undefined
|
|
}
|
|
this.isMonitoring = false
|
|
console.log('⏸️ Smart validation monitoring stopped')
|
|
}
|
|
|
|
/**
|
|
* Check all pending signals for validation
|
|
*/
|
|
private async checkValidations(): Promise<void> {
|
|
const pending = Array.from(this.queue.values()).filter(s => s.status === 'pending')
|
|
|
|
if (pending.length === 0) {
|
|
// No pending signals, stop monitoring to save resources
|
|
this.stopMonitoring()
|
|
return
|
|
}
|
|
|
|
console.log(`👁️ Smart validation check: ${pending.length} pending signals`)
|
|
|
|
for (const signal of pending) {
|
|
try {
|
|
await this.validateSignal(signal)
|
|
} catch (error) {
|
|
console.error(`❌ Error validating signal ${signal.id}:`, error)
|
|
}
|
|
}
|
|
|
|
// Clean up completed signals older than 1 hour
|
|
this.cleanupOldSignals()
|
|
}
|
|
|
|
/**
|
|
* Validate a single signal using current price
|
|
*/
|
|
private async validateSignal(signal: QueuedSignal): Promise<void> {
|
|
const now = Date.now()
|
|
const ageMinutes = (now - signal.blockedAt) / (1000 * 60)
|
|
|
|
// Check if expired (beyond entry window)
|
|
if (ageMinutes > signal.entryWindowMinutes) {
|
|
signal.status = 'expired'
|
|
logger.log(`⏰ Signal expired: ${signal.symbol} ${signal.direction} (${ageMinutes.toFixed(1)}min old)`)
|
|
|
|
// Send Telegram notification
|
|
await sendValidationNotification({
|
|
event: 'expired',
|
|
symbol: signal.symbol,
|
|
direction: signal.direction,
|
|
originalPrice: signal.originalPrice,
|
|
qualityScore: signal.qualityScore,
|
|
validationTime: (now - signal.blockedAt) / 1000,
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// CRITICAL FIX (Dec 11, 2025): Query database for latest 1-minute data instead of cache
|
|
// Cache singleton issue: API routes and validation queue have separate instances
|
|
// Database is single source of truth for market data
|
|
let currentPrice: number
|
|
let priceDataAge: number
|
|
|
|
try {
|
|
const { getPrismaClient } = await import('../database/trades')
|
|
const prisma = getPrismaClient()
|
|
|
|
// Get most recent market data within last 2 minutes
|
|
const recentData = await prisma.marketData.findFirst({
|
|
where: {
|
|
symbol: signal.symbol,
|
|
timestamp: {
|
|
gte: new Date(Date.now() - 2 * 60 * 1000) // Last 2 minutes
|
|
}
|
|
},
|
|
orderBy: {
|
|
timestamp: 'desc'
|
|
}
|
|
})
|
|
|
|
if (!recentData) {
|
|
console.log(`⚠️ No recent market data for ${signal.symbol} in database (last 2 min), skipping validation`)
|
|
return
|
|
}
|
|
|
|
currentPrice = recentData.price
|
|
priceDataAge = Math.round((Date.now() - recentData.timestamp.getTime()) / 1000)
|
|
|
|
console.log(`✅ Using database market data for ${signal.symbol} (${priceDataAge}s old, price: $${currentPrice.toFixed(2)})`)
|
|
} catch (dbError) {
|
|
console.error(`❌ Database query failed for ${signal.symbol}:`, dbError)
|
|
return
|
|
}
|
|
|
|
const priceChange = ((currentPrice - signal.originalPrice) / signal.originalPrice) * 100
|
|
|
|
console.log(`📊 ${signal.symbol} ${signal.direction.toUpperCase()}: Original $${signal.originalPrice.toFixed(2)} → Current $${currentPrice.toFixed(2)} = ${priceChange >= 0 ? '+' : ''}${priceChange.toFixed(2)}%`)
|
|
|
|
// Update price extremes
|
|
if (!signal.highestPrice || currentPrice > signal.highestPrice) {
|
|
signal.highestPrice = currentPrice
|
|
}
|
|
if (!signal.lowestPrice || currentPrice < signal.lowestPrice) {
|
|
signal.lowestPrice = currentPrice
|
|
}
|
|
|
|
// Validation logic based on direction
|
|
if (signal.direction === 'long') {
|
|
// LONG: Need price to move UP to confirm
|
|
if (priceChange >= signal.confirmationThreshold) {
|
|
// Price moved up enough - CONFIRMED!
|
|
signal.status = 'confirmed'
|
|
signal.validatedAt = now
|
|
logger.log(`✅ LONG CONFIRMED: ${signal.symbol} moved +${priceChange.toFixed(2)}% ($${signal.originalPrice.toFixed(2)} → $${currentPrice.toFixed(2)})`)
|
|
logger.log(` Validation time: ${ageMinutes.toFixed(1)} minutes, executing trade...`)
|
|
|
|
// Send Telegram notification
|
|
await sendValidationNotification({
|
|
event: 'confirmed',
|
|
symbol: signal.symbol,
|
|
direction: signal.direction,
|
|
originalPrice: signal.originalPrice,
|
|
currentPrice: currentPrice,
|
|
qualityScore: signal.qualityScore,
|
|
validationTime: (now - signal.blockedAt) / 1000,
|
|
priceChange: priceChange,
|
|
})
|
|
|
|
// Execute the trade
|
|
await this.executeTrade(signal, currentPrice)
|
|
} else if (priceChange <= signal.maxDrawdown) {
|
|
// Price moved down too much - ABANDON
|
|
signal.status = 'abandoned'
|
|
logger.log(`❌ LONG ABANDONED: ${signal.symbol} dropped ${priceChange.toFixed(2)}% ($${signal.originalPrice.toFixed(2)} → $${currentPrice.toFixed(2)})`)
|
|
logger.log(` Saved from potential loser after ${ageMinutes.toFixed(1)} minutes`)
|
|
|
|
// Send Telegram notification
|
|
await sendValidationNotification({
|
|
event: 'abandoned',
|
|
symbol: signal.symbol,
|
|
direction: signal.direction,
|
|
originalPrice: signal.originalPrice,
|
|
currentPrice: currentPrice,
|
|
qualityScore: signal.qualityScore,
|
|
validationTime: (now - signal.blockedAt) / 1000,
|
|
priceChange: priceChange,
|
|
})
|
|
} else {
|
|
// Still pending, log progress
|
|
logger.log(`⏳ LONG watching: ${signal.symbol} at ${priceChange.toFixed(2)}% (need +${signal.confirmationThreshold}%, abandon at ${signal.maxDrawdown}%) - ${ageMinutes.toFixed(1)}min`)
|
|
}
|
|
} else {
|
|
// SHORT: Need price to move DOWN to confirm
|
|
if (priceChange <= -signal.confirmationThreshold) {
|
|
// Price moved down enough - CONFIRMED!
|
|
signal.status = 'confirmed'
|
|
signal.validatedAt = now
|
|
logger.log(`✅ SHORT CONFIRMED: ${signal.symbol} moved ${priceChange.toFixed(2)}% ($${signal.originalPrice.toFixed(2)} → $${currentPrice.toFixed(2)})`)
|
|
logger.log(` Validation time: ${ageMinutes.toFixed(1)} minutes, executing trade...`)
|
|
|
|
// Send Telegram notification
|
|
await sendValidationNotification({
|
|
event: 'confirmed',
|
|
symbol: signal.symbol,
|
|
direction: signal.direction,
|
|
originalPrice: signal.originalPrice,
|
|
currentPrice: currentPrice,
|
|
qualityScore: signal.qualityScore,
|
|
validationTime: (now - signal.blockedAt) / 1000,
|
|
priceChange: priceChange,
|
|
})
|
|
|
|
// Execute the trade
|
|
await this.executeTrade(signal, currentPrice)
|
|
} else if (priceChange >= -signal.maxDrawdown) {
|
|
// Price moved up too much - ABANDON
|
|
signal.status = 'abandoned'
|
|
logger.log(`❌ SHORT ABANDONED: ${signal.symbol} rose +${priceChange.toFixed(2)}% ($${signal.originalPrice.toFixed(2)} → $${currentPrice.toFixed(2)})`)
|
|
logger.log(` Saved from potential loser after ${ageMinutes.toFixed(1)} minutes`)
|
|
|
|
// Send Telegram notification
|
|
await sendValidationNotification({
|
|
event: 'abandoned',
|
|
symbol: signal.symbol,
|
|
direction: signal.direction,
|
|
originalPrice: signal.originalPrice,
|
|
currentPrice: currentPrice,
|
|
qualityScore: signal.qualityScore,
|
|
validationTime: (now - signal.blockedAt) / 1000,
|
|
priceChange: priceChange,
|
|
})
|
|
} else {
|
|
// Still pending, log progress
|
|
logger.log(`⏳ SHORT watching: ${signal.symbol} at ${priceChange.toFixed(2)}% (need ${-signal.confirmationThreshold}%, abandon at +${-signal.maxDrawdown}%) - ${ageMinutes.toFixed(1)}min`)
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute validated trade via API
|
|
*/
|
|
private async executeTrade(signal: QueuedSignal, currentPrice: number): Promise<void> {
|
|
try {
|
|
// Call the execute endpoint with the validated signal
|
|
const executeUrl = 'http://localhost:3000/api/trading/execute'
|
|
|
|
const payload = {
|
|
symbol: signal.symbol,
|
|
direction: signal.direction,
|
|
signalPrice: currentPrice,
|
|
currentPrice: currentPrice,
|
|
timeframe: signal.originalSignalData.timeframe || '5',
|
|
atr: signal.originalSignalData.atr || 0,
|
|
adx: signal.originalSignalData.adx || 0,
|
|
rsi: signal.originalSignalData.rsi || 0,
|
|
volumeRatio: signal.originalSignalData.volumeRatio || 0,
|
|
pricePosition: signal.originalSignalData.pricePosition || 0,
|
|
indicatorVersion: signal.originalSignalData.indicatorVersion || 'v9',
|
|
validatedEntry: true, // Flag to indicate this is a validated entry
|
|
originalQualityScore: signal.qualityScore,
|
|
validationDelayMinutes: (Date.now() - signal.blockedAt) / (1000 * 60),
|
|
}
|
|
|
|
logger.log(`🚀 Executing validated trade: ${signal.symbol} ${signal.direction.toUpperCase()} @ $${currentPrice.toFixed(2)}`)
|
|
logger.log(` Original signal: $${signal.originalPrice.toFixed(2)}, Quality: ${signal.qualityScore}`)
|
|
logger.log(` Entry delay: ${payload.validationDelayMinutes.toFixed(1)} minutes`)
|
|
|
|
const response = await fetch(executeUrl, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': `Bearer ${process.env.API_SECRET_KEY}`,
|
|
},
|
|
body: JSON.stringify(payload),
|
|
})
|
|
|
|
const result = await response.json()
|
|
|
|
if (response.ok && result.success) {
|
|
signal.status = 'executed'
|
|
signal.executedAt = Date.now()
|
|
signal.executionPrice = currentPrice
|
|
signal.tradeId = result.trade?.id
|
|
logger.log(`✅ Trade executed successfully: ${signal.symbol} ${signal.direction}`)
|
|
logger.log(` Trade ID: ${signal.tradeId}`)
|
|
logger.log(` Entry: $${currentPrice.toFixed(2)}, Size: $${result.trade?.positionSizeUSD || 'unknown'}`)
|
|
|
|
// Send execution notification
|
|
const slippage = ((currentPrice - signal.originalPrice) / signal.originalPrice) * 100
|
|
await sendValidationNotification({
|
|
event: 'executed',
|
|
symbol: signal.symbol,
|
|
direction: signal.direction,
|
|
originalPrice: signal.originalPrice,
|
|
currentPrice: currentPrice,
|
|
qualityScore: signal.qualityScore,
|
|
validationTime: (signal.executedAt - signal.blockedAt) / 1000,
|
|
priceChange: slippage,
|
|
})
|
|
} else {
|
|
console.error(`❌ Trade execution failed: ${result.error || result.message}`)
|
|
signal.status = 'abandoned' // Mark as abandoned if execution fails
|
|
}
|
|
} catch (error) {
|
|
console.error(`❌ Error executing validated trade:`, error)
|
|
signal.status = 'abandoned'
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clean up old signals to prevent memory leak
|
|
*/
|
|
private cleanupOldSignals(): void {
|
|
const oneHourAgo = Date.now() - (60 * 60 * 1000)
|
|
let cleaned = 0
|
|
|
|
for (const [id, signal] of this.queue) {
|
|
if (signal.status !== 'pending' && signal.blockedAt < oneHourAgo) {
|
|
this.queue.delete(id)
|
|
cleaned++
|
|
}
|
|
}
|
|
|
|
if (cleaned > 0) {
|
|
logger.log(`🧹 Cleaned up ${cleaned} old validated signals`)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get queue status
|
|
*/
|
|
getStatus(): {
|
|
pending: number
|
|
confirmed: number
|
|
abandoned: number
|
|
expired: number
|
|
executed: number
|
|
total: number
|
|
} {
|
|
const signals = Array.from(this.queue.values())
|
|
return {
|
|
pending: signals.filter(s => s.status === 'pending').length,
|
|
confirmed: signals.filter(s => s.status === 'confirmed').length,
|
|
abandoned: signals.filter(s => s.status === 'abandoned').length,
|
|
expired: signals.filter(s => s.status === 'expired').length,
|
|
executed: signals.filter(s => s.status === 'executed').length,
|
|
total: signals.length,
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get all queued signals (for debugging)
|
|
*/
|
|
getQueue(): QueuedSignal[] {
|
|
return Array.from(this.queue.values())
|
|
}
|
|
}
|
|
|
|
// Singleton instance
|
|
let queueInstance: SmartValidationQueue | null = null
|
|
|
|
export function getSmartValidationQueue(): SmartValidationQueue {
|
|
if (!queueInstance) {
|
|
queueInstance = new SmartValidationQueue()
|
|
}
|
|
return queueInstance
|
|
}
|
|
|
|
export async function startSmartValidation(): Promise<void> {
|
|
const queue = getSmartValidationQueue()
|
|
|
|
// CRITICAL FIX (Dec 9, 2025): Load pending signals from database on startup
|
|
// Bug: Queue is in-memory only, container restart loses all queued signals
|
|
// Solution: Query BlockedSignal table for signals within entry window
|
|
try {
|
|
const { getPrismaClient } = await import('../database/trades')
|
|
const prisma = getPrismaClient()
|
|
|
|
// Find signals blocked within last 90 minutes (two-stage entry window)
|
|
const ninetyMinutesAgo = new Date(Date.now() - 90 * 60 * 1000)
|
|
|
|
const recentBlocked = await prisma.blockedSignal.findMany({
|
|
where: {
|
|
blockReason: 'SMART_VALIDATION_QUEUED', // FIXED Dec 12, 2025: Look for queued signals only
|
|
createdAt: { gte: ninetyMinutesAgo }, // Match entry window (90 minutes)
|
|
},
|
|
orderBy: { createdAt: 'desc' },
|
|
})
|
|
|
|
console.log(`🔄 Restoring ${recentBlocked.length} pending signals from database`)
|
|
|
|
// Re-queue each signal
|
|
for (const signal of recentBlocked) {
|
|
await queue.addSignal({
|
|
blockReason: 'SMART_VALIDATION_QUEUED',
|
|
symbol: signal.symbol,
|
|
direction: signal.direction as 'long' | 'short',
|
|
originalPrice: signal.signalPrice,
|
|
qualityScore: signal.signalQualityScore || 0,
|
|
atr: signal.atr || undefined,
|
|
adx: signal.adx || undefined,
|
|
rsi: signal.rsi || undefined,
|
|
volumeRatio: signal.volumeRatio || undefined,
|
|
pricePosition: signal.pricePosition || undefined,
|
|
indicatorVersion: signal.indicatorVersion || 'v5',
|
|
timeframe: signal.timeframe || '5',
|
|
})
|
|
}
|
|
|
|
if (recentBlocked.length > 0) {
|
|
console.log(`✅ Smart validation restored ${recentBlocked.length} signals, monitoring started`)
|
|
} else {
|
|
console.log('🧠 Smart validation system ready (no pending signals)')
|
|
}
|
|
} catch (error) {
|
|
console.error('❌ Error restoring smart validation queue:', error)
|
|
console.log('🧠 Smart validation system ready (error loading signals)')
|
|
}
|
|
}
|