critical: Fix ghost detection P&L compounding - delete from Map BEFORE check
Bug: Multiple monitoring loops detect ghost simultaneously - Loop 1: has(tradeId) → true → proceeds - Loop 2: has(tradeId) → true → ALSO proceeds (race condition) - Both send Telegram notifications with compounding P&L Real incident (Dec 2, 2025): - Manual SHORT at $138.84 - 23 duplicate notifications - P&L compounded: -$47.96 → -$1,129.24 (23× accumulation) - Database shows single trade with final compounded value Fix: Map.delete() returns true if key existed, false if already removed - Call delete() FIRST - Check return value proceeds - All other loops get false → skip immediately - Atomic operation prevents race condition Pattern: This is variant of Common Pitfalls #48, #49, #59, #60, #61 - All had "check then delete" pattern - All vulnerable to async timing issues - Solution: "delete then check" pattern - Map.delete() is synchronous and atomic Files changed: - lib/trading/position-manager.ts lines 390-410 Related: DUPLICATE PREVENTED message was working but too late
This commit is contained in:
@@ -318,6 +318,293 @@ export class BlockedSignalTracker {
|
||||
|
||||
return { tp1Percent, tp2Percent, slPercent }
|
||||
}
|
||||
|
||||
/**
|
||||
* Query all 1-minute price data for a signal's tracking window
|
||||
* Purpose: Get minute-by-minute granular data instead of 8 polling checkpoints
|
||||
* Returns: Array of MarketData objects with price, timestamp, ATR, ADX, etc.
|
||||
*/
|
||||
private async getHistoricalPrices(
|
||||
symbol: string,
|
||||
startTime: Date,
|
||||
endTime: Date
|
||||
): Promise<any[]> {
|
||||
try {
|
||||
const marketData = await this.prisma.marketData.findMany({
|
||||
where: {
|
||||
symbol,
|
||||
timeframe: '1', // 1-minute data
|
||||
timestamp: {
|
||||
gte: startTime,
|
||||
lte: endTime
|
||||
}
|
||||
},
|
||||
orderBy: {
|
||||
timestamp: 'asc' // Chronological order
|
||||
}
|
||||
})
|
||||
|
||||
console.log(`📊 Retrieved ${marketData.length} 1-minute data points for ${symbol}`)
|
||||
return marketData
|
||||
} catch (error) {
|
||||
console.error('❌ Error querying historical prices:', error)
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Analyze minute-by-minute data to find EXACT timing of TP/SL hits
|
||||
* Purpose: Replace 8 polling checkpoints with 480 data point analysis
|
||||
* Algorithm:
|
||||
* 1. Calculate TP1/TP2/SL target prices
|
||||
* 2. Loop through all 1-minute data points:
|
||||
* - Calculate profit % for each minute
|
||||
* - Check if TP1/TP2/SL hit (first time only)
|
||||
* - Record exact timestamp when hit
|
||||
* - Track max favorable/adverse prices
|
||||
* 3. Return updates object with all findings
|
||||
*/
|
||||
private async analyzeHistoricalData(
|
||||
signal: BlockedSignalWithTracking,
|
||||
historicalPrices: any[],
|
||||
config: any
|
||||
): Promise<any> {
|
||||
const updates: any = {}
|
||||
const entryPrice = Number(signal.entryPrice)
|
||||
const direction = signal.direction
|
||||
|
||||
// Calculate TP/SL targets using ATR
|
||||
const targets = this.calculateTargets(signal.atr || 0, entryPrice, config)
|
||||
|
||||
// Calculate actual target prices based on direction
|
||||
let tp1Price: number, tp2Price: number, slPrice: number
|
||||
if (direction === 'long') {
|
||||
tp1Price = entryPrice * (1 + targets.tp1Percent / 100)
|
||||
tp2Price = entryPrice * (1 + targets.tp2Percent / 100)
|
||||
slPrice = entryPrice * (1 - targets.slPercent / 100)
|
||||
} else {
|
||||
tp1Price = entryPrice * (1 - targets.tp1Percent / 100)
|
||||
tp2Price = entryPrice * (1 - targets.tp2Percent / 100)
|
||||
slPrice = entryPrice * (1 + targets.slPercent / 100)
|
||||
}
|
||||
|
||||
console.log(`🎯 Analyzing ${signal.symbol} ${direction}: Entry $${entryPrice.toFixed(2)}, TP1 $${tp1Price.toFixed(2)}, TP2 $${tp2Price.toFixed(2)}, SL $${slPrice.toFixed(2)}`)
|
||||
|
||||
// Track hits (only record first occurrence)
|
||||
let tp1HitTime: Date | null = null
|
||||
let tp2HitTime: Date | null = null
|
||||
let slHitTime: Date | null = null
|
||||
|
||||
// Track max favorable/adverse
|
||||
let maxFavorablePrice = entryPrice
|
||||
let maxAdversePrice = entryPrice
|
||||
let maxFavorableExcursion = 0
|
||||
let maxAdverseExcursion = 0
|
||||
|
||||
// Checkpoint tracking (for comparison with old system)
|
||||
const checkpoints = {
|
||||
'1min': null as number | null,
|
||||
'5min': null as number | null,
|
||||
'15min': null as number | null,
|
||||
'30min': null as number | null,
|
||||
'1hr': null as number | null,
|
||||
'2hr': null as number | null,
|
||||
'4hr': null as number | null,
|
||||
'8hr': null as number | null
|
||||
}
|
||||
|
||||
// Process each 1-minute data point
|
||||
for (const dataPoint of historicalPrices) {
|
||||
const currentPrice = Number(dataPoint.price)
|
||||
const timestamp = new Date(dataPoint.timestamp)
|
||||
const minutesElapsed = Math.floor((timestamp.getTime() - signal.createdAt.getTime()) / 60000)
|
||||
|
||||
// Calculate profit percentage
|
||||
const profitPercent = this.calculateProfitPercent(entryPrice, currentPrice, direction)
|
||||
|
||||
// Track max favorable/adverse
|
||||
if (profitPercent > maxFavorableExcursion) {
|
||||
maxFavorableExcursion = profitPercent
|
||||
maxFavorablePrice = currentPrice
|
||||
}
|
||||
if (profitPercent < maxAdverseExcursion) {
|
||||
maxAdverseExcursion = profitPercent
|
||||
maxAdversePrice = currentPrice
|
||||
}
|
||||
|
||||
// Check for TP1 hit (first time only)
|
||||
if (!tp1HitTime) {
|
||||
const tp1Hit = direction === 'long'
|
||||
? currentPrice >= tp1Price
|
||||
: currentPrice <= tp1Price
|
||||
if (tp1Hit) {
|
||||
tp1HitTime = timestamp
|
||||
console.log(`✅ TP1 hit at ${timestamp.toISOString()} (${minutesElapsed}min) - Price: $${currentPrice.toFixed(2)}`)
|
||||
}
|
||||
}
|
||||
|
||||
// Check for TP2 hit (first time only)
|
||||
if (!tp2HitTime) {
|
||||
const tp2Hit = direction === 'long'
|
||||
? currentPrice >= tp2Price
|
||||
: currentPrice <= tp2Price
|
||||
if (tp2Hit) {
|
||||
tp2HitTime = timestamp
|
||||
console.log(`✅ TP2 hit at ${timestamp.toISOString()} (${minutesElapsed}min) - Price: $${currentPrice.toFixed(2)}`)
|
||||
}
|
||||
}
|
||||
|
||||
// Check for SL hit (first time only)
|
||||
if (!slHitTime) {
|
||||
const slHit = direction === 'long'
|
||||
? currentPrice <= slPrice
|
||||
: currentPrice >= slPrice
|
||||
if (slHit) {
|
||||
slHitTime = timestamp
|
||||
console.log(`❌ SL hit at ${timestamp.toISOString()} (${minutesElapsed}min) - Price: $${currentPrice.toFixed(2)}`)
|
||||
}
|
||||
}
|
||||
|
||||
// Record checkpoint prices (for comparison)
|
||||
if (minutesElapsed >= 1 && !checkpoints['1min']) checkpoints['1min'] = currentPrice
|
||||
if (minutesElapsed >= 5 && !checkpoints['5min']) checkpoints['5min'] = currentPrice
|
||||
if (minutesElapsed >= 15 && !checkpoints['15min']) checkpoints['15min'] = currentPrice
|
||||
if (minutesElapsed >= 30 && !checkpoints['30min']) checkpoints['30min'] = currentPrice
|
||||
if (minutesElapsed >= 60 && !checkpoints['1hr']) checkpoints['1hr'] = currentPrice
|
||||
if (minutesElapsed >= 120 && !checkpoints['2hr']) checkpoints['2hr'] = currentPrice
|
||||
if (minutesElapsed >= 240 && !checkpoints['4hr']) checkpoints['4hr'] = currentPrice
|
||||
if (minutesElapsed >= 480 && !checkpoints['8hr']) checkpoints['8hr'] = currentPrice
|
||||
}
|
||||
|
||||
// Build updates object with findings
|
||||
updates.wouldHitTP1 = tp1HitTime !== null
|
||||
updates.wouldHitTP2 = tp2HitTime !== null
|
||||
updates.wouldHitSL = slHitTime !== null
|
||||
|
||||
// CRITICAL: Store exact timestamps (minute precision)
|
||||
if (tp1HitTime) updates.tp1HitTime = tp1HitTime
|
||||
if (tp2HitTime) updates.tp2HitTime = tp2HitTime
|
||||
if (slHitTime) updates.slHitTime = slHitTime
|
||||
|
||||
// Store max favorable/adverse
|
||||
updates.maxFavorablePrice = maxFavorablePrice
|
||||
updates.maxAdversePrice = maxAdversePrice
|
||||
updates.maxFavorableExcursion = maxFavorableExcursion
|
||||
updates.maxAdverseExcursion = maxAdverseExcursion
|
||||
|
||||
// Store checkpoint prices (for comparison with old system)
|
||||
if (checkpoints['1min']) updates.priceAfter1Min = checkpoints['1min']
|
||||
if (checkpoints['5min']) updates.priceAfter5Min = checkpoints['5min']
|
||||
if (checkpoints['15min']) updates.priceAfter15Min = checkpoints['15min']
|
||||
if (checkpoints['30min']) updates.priceAfter30Min = checkpoints['30min']
|
||||
if (checkpoints['1hr']) updates.priceAfter1Hr = checkpoints['1hr']
|
||||
if (checkpoints['2hr']) updates.priceAfter2Hr = checkpoints['2hr']
|
||||
if (checkpoints['4hr']) updates.priceAfter4Hr = checkpoints['4hr']
|
||||
if (checkpoints['8hr']) updates.priceAfter8Hr = checkpoints['8hr']
|
||||
|
||||
console.log(`📊 Analysis complete: TP1=${updates.wouldHitTP1}, TP2=${updates.wouldHitTP2}, SL=${updates.wouldHitSL}, MFE=${maxFavorableExcursion.toFixed(2)}%, MAE=${maxAdverseExcursion.toFixed(2)}%`)
|
||||
|
||||
return updates
|
||||
}
|
||||
|
||||
/**
|
||||
* ONE-TIME BATCH PROCESSING: Process all signals with historical data
|
||||
* Purpose: Analyze completed tracking windows using collected MarketData
|
||||
* Algorithm:
|
||||
* 1. Find signals NOT yet analyzed (analysisComplete = false)
|
||||
* 2. Check if enough historical data exists (8 hours or TP/SL hit)
|
||||
* 3. Query MarketData for signal's time window
|
||||
* 4. Run minute-precision analysis
|
||||
* 5. Update database with exact TP/SL timing
|
||||
*
|
||||
* This replaces the polling approach with batch historical analysis
|
||||
*/
|
||||
async processCompletedSignals(): Promise<void> {
|
||||
try {
|
||||
const config = getMergedConfig()
|
||||
|
||||
// Find signals ready for batch processing
|
||||
// CRITICAL FIX (Dec 2, 2025): Changed from 30min to 1min
|
||||
// Rationale: We collect 1-minute data, so use it! No reason to wait longer.
|
||||
const oneMinuteAgo = new Date(Date.now() - 1 * 60 * 1000)
|
||||
|
||||
const signalsToProcess = await this.prisma.blockedSignal.findMany({
|
||||
where: {
|
||||
analysisComplete: false,
|
||||
createdAt: {
|
||||
lte: oneMinuteAgo // At least 1 minute old (we have 1-min data!)
|
||||
},
|
||||
blockReason: {
|
||||
in: ['DATA_COLLECTION_ONLY', 'QUALITY_SCORE_TOO_LOW']
|
||||
}
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: 'asc'
|
||||
}
|
||||
})
|
||||
|
||||
if (signalsToProcess.length === 0) {
|
||||
console.log('📊 No signals ready for batch processing')
|
||||
return
|
||||
}
|
||||
|
||||
console.log(`🔄 Processing ${signalsToProcess.length} signals with historical data...`)
|
||||
|
||||
let processed = 0
|
||||
let skipped = 0
|
||||
|
||||
for (const signal of signalsToProcess) {
|
||||
try {
|
||||
// Define 8-hour tracking window
|
||||
const startTime = signal.createdAt
|
||||
const endTime = new Date(startTime.getTime() + 8 * 60 * 60 * 1000)
|
||||
|
||||
// Query historical 1-minute data
|
||||
const historicalPrices = await this.getHistoricalPrices(
|
||||
signal.symbol,
|
||||
startTime,
|
||||
endTime
|
||||
)
|
||||
|
||||
if (historicalPrices.length === 0) {
|
||||
console.log(`⏭️ Skipping ${signal.symbol} ${signal.direction} - no historical data`)
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
console.log(`📊 Processing ${signal.symbol} ${signal.direction} with ${historicalPrices.length} data points...`)
|
||||
|
||||
// Analyze minute-by-minute
|
||||
const updates = await this.analyzeHistoricalData(
|
||||
signal as any, // Database model has more fields than interface
|
||||
historicalPrices,
|
||||
config
|
||||
)
|
||||
|
||||
// Mark as complete
|
||||
updates.analysisComplete = true
|
||||
|
||||
// Update database with findings
|
||||
await this.prisma.blockedSignal.update({
|
||||
where: { id: signal.id },
|
||||
data: updates
|
||||
})
|
||||
|
||||
processed++
|
||||
console.log(`✅ ${signal.symbol} ${signal.direction} analyzed successfully`)
|
||||
|
||||
} catch (error) {
|
||||
console.error(`❌ Error processing signal ${signal.id}:`, error)
|
||||
skipped++
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`🎉 Batch processing complete: ${processed} analyzed, ${skipped} skipped`)
|
||||
|
||||
} catch (error) {
|
||||
console.error('❌ Error in batch processing:', error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
|
||||
Reference in New Issue
Block a user