feat: Replace blind 2-hour reconnect with error-based health monitoring
User Request: Replace blind 2-hour restart timer with smart monitoring that only restarts when accountUnsubscribe errors actually occur Changes: . Health Monitor (NEW): - Created lib/monitoring/drift-health-monitor.ts - Tracks accountUnsubscribe errors in 30-second sliding window - Triggers container restart via flag file when 50+ errors detected - Prevents unnecessary restarts when SDK healthy . Drift Client: - Removed blind scheduleReconnection() and 2-hour timer - Added interceptWebSocketErrors() to catch SDK errors - Patches console.error to monitor for accountUnsubscribe patterns - Starts health monitor after successful initialization - Removed unused reconnect() method and reconnectTimer field . Health API (NEW): - GET /api/drift/health - Check current error count and health status - Returns: healthy boolean, errorCount, threshold, message - Useful for external monitoring and debugging Impact: - System only restarts when actual memory leak detected - Prevents unnecessary downtime every 2 hours - More targeted response to SDK issues - Better operational stability Files: - lib/monitoring/drift-health-monitor.ts (NEW - 165 lines) - lib/drift/client.ts (removed timer, added error interception) - app/api/drift/health/route.ts (NEW - health check endpoint) Testing: - Health monitor starts on initialization: ✅ - API endpoint returns healthy status: ✅ - No blind reconnection scheduled: ✅
This commit is contained in:
29
app/api/drift/health/route.ts
Normal file
29
app/api/drift/health/route.ts
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
/**
|
||||||
|
* Drift Health Check API
|
||||||
|
*
|
||||||
|
* GET /api/drift/health - Get current health status
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { NextRequest, NextResponse } from 'next/server'
|
||||||
|
import { getDriftHealthMonitor } from '@/lib/monitoring/drift-health-monitor'
|
||||||
|
|
||||||
|
export async function GET(req: NextRequest) {
|
||||||
|
try {
|
||||||
|
const monitor = getDriftHealthMonitor()
|
||||||
|
const status = monitor.getHealthStatus()
|
||||||
|
|
||||||
|
return NextResponse.json({
|
||||||
|
success: true,
|
||||||
|
...status,
|
||||||
|
message: status.healthy
|
||||||
|
? 'Drift SDK connections healthy'
|
||||||
|
: `Warning: ${status.errorCount} accountUnsubscribe errors detected`
|
||||||
|
})
|
||||||
|
|
||||||
|
} catch (error: any) {
|
||||||
|
return NextResponse.json({
|
||||||
|
success: false,
|
||||||
|
error: error.message
|
||||||
|
}, { status: 500 })
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@
|
|||||||
import { Connection, PublicKey, Keypair } from '@solana/web3.js'
|
import { Connection, PublicKey, Keypair } from '@solana/web3.js'
|
||||||
import { DriftClient, initialize, User, PerpMarkets } from '@drift-labs/sdk'
|
import { DriftClient, initialize, User, PerpMarkets } from '@drift-labs/sdk'
|
||||||
import bs58 from 'bs58'
|
import bs58 from 'bs58'
|
||||||
|
import { getDriftHealthMonitor } from '../monitoring/drift-health-monitor'
|
||||||
|
|
||||||
// Manual wallet interface (more compatible than SDK Wallet class)
|
// Manual wallet interface (more compatible than SDK Wallet class)
|
||||||
interface ManualWallet {
|
interface ManualWallet {
|
||||||
@@ -30,8 +31,6 @@ export class DriftService {
|
|||||||
private driftClient: DriftClient | null = null
|
private driftClient: DriftClient | null = null
|
||||||
private user: User | null = null
|
private user: User | null = null
|
||||||
private isInitialized: boolean = false
|
private isInitialized: boolean = false
|
||||||
private reconnectTimer: NodeJS.Timeout | null = null
|
|
||||||
private reconnectIntervalMs: number = 2 * 60 * 60 * 1000 // 2 hours (aggressive - Drift SDK memory leak is severe)
|
|
||||||
|
|
||||||
constructor(private config: DriftConfig) {
|
constructor(private config: DriftConfig) {
|
||||||
// Helius connection for Drift SDK initialization (handles burst subscriptions well)
|
// Helius connection for Drift SDK initialization (handles burst subscriptions well)
|
||||||
@@ -173,6 +172,9 @@ export class DriftService {
|
|||||||
await this.driftClient.subscribe()
|
await this.driftClient.subscribe()
|
||||||
console.log('✅ Drift client subscribed to account updates')
|
console.log('✅ Drift client subscribed to account updates')
|
||||||
|
|
||||||
|
// Intercept WebSocket errors for health monitoring
|
||||||
|
this.interceptWebSocketErrors()
|
||||||
|
|
||||||
// Get user account
|
// Get user account
|
||||||
this.user = this.driftClient.getUser()
|
this.user = this.driftClient.getUser()
|
||||||
}, 3, 2000, 'Drift initialization')
|
}, 3, 2000, 'Drift initialization')
|
||||||
@@ -180,8 +182,9 @@ export class DriftService {
|
|||||||
this.isInitialized = true
|
this.isInitialized = true
|
||||||
console.log('✅ Drift service initialized successfully')
|
console.log('✅ Drift service initialized successfully')
|
||||||
|
|
||||||
// Start periodic reconnection to prevent memory leaks
|
// Start health monitoring (error-based restart instead of blind timer)
|
||||||
this.scheduleReconnection()
|
const monitor = getDriftHealthMonitor()
|
||||||
|
monitor.start()
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('❌ Failed to initialize Drift service after retries:', error)
|
console.error('❌ Failed to initialize Drift service after retries:', error)
|
||||||
@@ -190,61 +193,24 @@ export class DriftService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule periodic reconnection to prevent WebSocket memory leaks
|
* Intercept WebSocket errors for health monitoring
|
||||||
* Drift SDK accumulates subscriptions over time, causing memory leaks
|
|
||||||
* Periodic reconnection clears old subscriptions and resets memory
|
|
||||||
*/
|
*/
|
||||||
private scheduleReconnection(): void {
|
private interceptWebSocketErrors(): void {
|
||||||
// Clear existing timer if any
|
const monitor = getDriftHealthMonitor()
|
||||||
if (this.reconnectTimer) {
|
|
||||||
clearTimeout(this.reconnectTimer)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedule reconnection every 2 hours
|
// Patch console.error to catch accountUnsubscribe errors
|
||||||
this.reconnectTimer = setTimeout(async () => {
|
const originalConsoleError = console.error
|
||||||
try {
|
console.error = (...args: any[]) => {
|
||||||
console.log('🔄 Scheduled reconnection: Clearing WebSocket subscriptions to prevent memory leak...')
|
const errorMessage = args.join(' ')
|
||||||
await this.reconnect()
|
|
||||||
console.log('✅ Scheduled reconnection complete - memory freed')
|
|
||||||
} catch (error) {
|
|
||||||
console.error('❌ Scheduled reconnection failed:', error)
|
|
||||||
// Try to initialize fresh if reconnect fails
|
|
||||||
try {
|
|
||||||
this.isInitialized = false
|
|
||||||
await this.initialize()
|
|
||||||
} catch (reinitError) {
|
|
||||||
console.error('❌ Failed to reinitialize after reconnect failure:', reinitError)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, this.reconnectIntervalMs)
|
|
||||||
|
|
||||||
console.log(`⏰ Scheduled reconnection in ${this.reconnectIntervalMs / 1000 / 60 / 60} hours`)
|
// Detect accountUnsubscribe errors (Drift SDK memory leak symptom)
|
||||||
}
|
if (errorMessage.includes('accountUnsubscribe error') ||
|
||||||
|
errorMessage.includes('readyState was 2')) {
|
||||||
/**
|
monitor.recordError('accountUnsubscribe')
|
||||||
* Reconnect to Drift Protocol (clears old subscriptions)
|
|
||||||
*/
|
|
||||||
private async reconnect(): Promise<void> {
|
|
||||||
console.log('🔄 Reconnecting to Drift Protocol...')
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Unsubscribe from old connections
|
|
||||||
if (this.driftClient) {
|
|
||||||
await this.driftClient.unsubscribe()
|
|
||||||
console.log('✅ Unsubscribed from old Drift connection')
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset state
|
// Call original console.error
|
||||||
this.driftClient = null
|
originalConsoleError.apply(console, args)
|
||||||
this.user = null
|
|
||||||
this.isInitialized = false
|
|
||||||
|
|
||||||
// Reinitialize
|
|
||||||
await this.initialize()
|
|
||||||
|
|
||||||
} catch (error) {
|
|
||||||
console.error('❌ Reconnection failed:', error)
|
|
||||||
throw error
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -456,13 +422,6 @@ export class DriftService {
|
|||||||
* Disconnect from Drift
|
* Disconnect from Drift
|
||||||
*/
|
*/
|
||||||
async disconnect(): Promise<void> {
|
async disconnect(): Promise<void> {
|
||||||
// Clear reconnection timer
|
|
||||||
if (this.reconnectTimer) {
|
|
||||||
clearTimeout(this.reconnectTimer)
|
|
||||||
this.reconnectTimer = null
|
|
||||||
console.log('⏰ Cleared reconnection timer')
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.driftClient) {
|
if (this.driftClient) {
|
||||||
await this.driftClient.unsubscribe()
|
await this.driftClient.unsubscribe()
|
||||||
console.log('✅ Drift client disconnected')
|
console.log('✅ Drift client disconnected')
|
||||||
|
|||||||
162
lib/monitoring/drift-health-monitor.ts
Normal file
162
lib/monitoring/drift-health-monitor.ts
Normal file
@@ -0,0 +1,162 @@
|
|||||||
|
/**
|
||||||
|
* Drift SDK Health Monitor
|
||||||
|
*
|
||||||
|
* Monitors for accountUnsubscribe errors that indicate WebSocket connection issues.
|
||||||
|
* When detected, triggers container restart via flag file for watch-restart.sh
|
||||||
|
*/
|
||||||
|
|
||||||
|
import fs from 'fs'
|
||||||
|
import path from 'path'
|
||||||
|
|
||||||
|
class DriftHealthMonitor {
|
||||||
|
private errorCounts: Map<string, number> = new Map()
|
||||||
|
private errorWindow: number = 30000 // 30 second window
|
||||||
|
private errorThreshold: number = 50 // 50 errors in 30 seconds = problem
|
||||||
|
private checkInterval: NodeJS.Timeout | null = null
|
||||||
|
private isMonitoring: boolean = false
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start monitoring for Drift SDK errors
|
||||||
|
*/
|
||||||
|
start(): void {
|
||||||
|
if (this.isMonitoring) {
|
||||||
|
console.log('⚠️ Drift health monitor already running')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.isMonitoring = true
|
||||||
|
console.log('🏥 Drift health monitor started')
|
||||||
|
console.log(` Threshold: ${this.errorThreshold} accountUnsubscribe errors in ${this.errorWindow/1000}s`)
|
||||||
|
|
||||||
|
// Check error counts every 10 seconds
|
||||||
|
this.checkInterval = setInterval(() => {
|
||||||
|
this.checkErrorThreshold()
|
||||||
|
}, 10000)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop monitoring
|
||||||
|
*/
|
||||||
|
stop(): void {
|
||||||
|
if (this.checkInterval) {
|
||||||
|
clearInterval(this.checkInterval)
|
||||||
|
this.checkInterval = null
|
||||||
|
}
|
||||||
|
this.isMonitoring = false
|
||||||
|
console.log('🏥 Drift health monitor stopped')
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record an accountUnsubscribe error
|
||||||
|
*/
|
||||||
|
recordError(errorType: string = 'accountUnsubscribe'): void {
|
||||||
|
const now = Date.now()
|
||||||
|
const key = `${errorType}-${now}`
|
||||||
|
this.errorCounts.set(key, now)
|
||||||
|
|
||||||
|
// Clean up old errors outside the window
|
||||||
|
this.cleanupOldErrors()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove errors older than the error window
|
||||||
|
*/
|
||||||
|
private cleanupOldErrors(): void {
|
||||||
|
const now = Date.now()
|
||||||
|
const cutoff = now - this.errorWindow
|
||||||
|
|
||||||
|
for (const [key, timestamp] of this.errorCounts.entries()) {
|
||||||
|
if (timestamp < cutoff) {
|
||||||
|
this.errorCounts.delete(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if error threshold exceeded
|
||||||
|
*/
|
||||||
|
private checkErrorThreshold(): void {
|
||||||
|
this.cleanupOldErrors()
|
||||||
|
|
||||||
|
const errorCount = this.errorCounts.size
|
||||||
|
|
||||||
|
if (errorCount >= this.errorThreshold) {
|
||||||
|
console.error(`🚨 CRITICAL: ${errorCount} Drift SDK errors in ${this.errorWindow/1000}s (threshold: ${this.errorThreshold})`)
|
||||||
|
console.error('🔄 Triggering container restart to clear WebSocket connection leak...')
|
||||||
|
|
||||||
|
this.triggerRestart()
|
||||||
|
|
||||||
|
// Stop monitoring to prevent multiple restart triggers
|
||||||
|
this.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trigger container restart via flag file
|
||||||
|
*/
|
||||||
|
private triggerRestart(): void {
|
||||||
|
const restartFlagPath = '/tmp/trading-bot-restart.flag'
|
||||||
|
|
||||||
|
try {
|
||||||
|
fs.writeFileSync(
|
||||||
|
restartFlagPath,
|
||||||
|
`Drift SDK health check failed: ${this.errorCounts.size} accountUnsubscribe errors\nTimestamp: ${new Date().toISOString()}\n`,
|
||||||
|
'utf-8'
|
||||||
|
)
|
||||||
|
console.log(`✅ Restart flag created at ${restartFlagPath}`)
|
||||||
|
console.log(' watch-restart.sh will restart container within 10 seconds')
|
||||||
|
} catch (error) {
|
||||||
|
console.error('❌ Failed to create restart flag:', error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current error count
|
||||||
|
*/
|
||||||
|
getErrorCount(): number {
|
||||||
|
this.cleanupOldErrors()
|
||||||
|
return this.errorCounts.size
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get health status
|
||||||
|
*/
|
||||||
|
getHealthStatus(): { healthy: boolean; errorCount: number; threshold: number } {
|
||||||
|
const errorCount = this.getErrorCount()
|
||||||
|
return {
|
||||||
|
healthy: errorCount < this.errorThreshold,
|
||||||
|
errorCount,
|
||||||
|
threshold: this.errorThreshold
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Singleton instance
|
||||||
|
let monitorInstance: DriftHealthMonitor | null = null
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the Drift health monitor singleton
|
||||||
|
*/
|
||||||
|
export function getDriftHealthMonitor(): DriftHealthMonitor {
|
||||||
|
if (!monitorInstance) {
|
||||||
|
monitorInstance = new DriftHealthMonitor()
|
||||||
|
}
|
||||||
|
return monitorInstance
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start Drift health monitoring
|
||||||
|
*/
|
||||||
|
export function startDriftHealthMonitoring(): void {
|
||||||
|
const monitor = getDriftHealthMonitor()
|
||||||
|
monitor.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop Drift health monitoring
|
||||||
|
*/
|
||||||
|
export function stopDriftHealthMonitoring(): void {
|
||||||
|
if (monitorInstance) {
|
||||||
|
monitorInstance.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user