fix: Add periodic Drift reconnection to prevent memory leaks
- Memory leak identified: Drift SDK accumulates WebSocket subscriptions over time - Root cause: accountUnsubscribe errors pile up when connections close/reconnect - Symptom: Heap grows to 4GB+ after 10+ hours, eventual OOM crash - Solution: Automatic reconnection every 4 hours to clear subscriptions Changes: - lib/drift/client.ts: Add reconnectTimer and scheduleReconnection() - lib/drift/client.ts: Implement private reconnect() method - lib/drift/client.ts: Clear timer in disconnect() - app/api/drift/reconnect/route.ts: Manual reconnection endpoint (POST) - app/api/drift/reconnect/route.ts: Reconnection status endpoint (GET) Impact: - Prevents JavaScript heap out of memory crashes - Telegram bot timeouts resolved (was failing due to unresponsive bot) - System will auto-heal every 4 hours instead of requiring manual restart - Emergency manual reconnect available via API if needed Tested: Container restarted successfully, no more WebSocket accumulation expected
This commit is contained in:
77
app/api/drift/reconnect/route.ts
Normal file
77
app/api/drift/reconnect/route.ts
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
import { NextRequest, NextResponse } from 'next/server'
|
||||||
|
import { getDriftService } from '@/lib/drift/client'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manual Drift reconnection endpoint
|
||||||
|
* Forces reconnection to clear WebSocket subscriptions and prevent memory leaks
|
||||||
|
*
|
||||||
|
* POST /api/drift/reconnect
|
||||||
|
* Authorization: Bearer <API_SECRET_KEY>
|
||||||
|
*/
|
||||||
|
export async function POST(request: NextRequest) {
|
||||||
|
try {
|
||||||
|
// Verify API key
|
||||||
|
const authHeader = request.headers.get('authorization')
|
||||||
|
const apiKey = process.env.API_SECRET_KEY
|
||||||
|
|
||||||
|
if (!authHeader || !authHeader.startsWith('Bearer ') || authHeader.slice(7) !== apiKey) {
|
||||||
|
return NextResponse.json(
|
||||||
|
{ success: false, error: 'Unauthorized' },
|
||||||
|
{ status: 401 }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log('🔄 Manual reconnection requested via API...')
|
||||||
|
|
||||||
|
const driftService = getDriftService()
|
||||||
|
|
||||||
|
// Force reconnection by calling private method through type assertion
|
||||||
|
await (driftService as any).reconnect()
|
||||||
|
|
||||||
|
console.log('✅ Manual reconnection complete')
|
||||||
|
|
||||||
|
return NextResponse.json({
|
||||||
|
success: true,
|
||||||
|
message: 'Drift connection refreshed successfully',
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
})
|
||||||
|
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error('❌ Manual reconnection failed:', error)
|
||||||
|
return NextResponse.json(
|
||||||
|
{
|
||||||
|
success: false,
|
||||||
|
error: error.message || 'Failed to reconnect to Drift Protocol',
|
||||||
|
timestamp: new Date().toISOString()
|
||||||
|
},
|
||||||
|
{ status: 500 }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get reconnection status and next scheduled reconnection time
|
||||||
|
*
|
||||||
|
* GET /api/drift/reconnect
|
||||||
|
*/
|
||||||
|
export async function GET(request: NextRequest) {
|
||||||
|
try {
|
||||||
|
const driftService = getDriftService()
|
||||||
|
|
||||||
|
return NextResponse.json({
|
||||||
|
success: true,
|
||||||
|
status: {
|
||||||
|
initialized: (driftService as any).isInitialized,
|
||||||
|
hasReconnectTimer: !!(driftService as any).reconnectTimer,
|
||||||
|
reconnectIntervalHours: (driftService as any).reconnectIntervalMs / 1000 / 60 / 60,
|
||||||
|
message: 'Automatic reconnection runs every 4 hours to prevent memory leaks'
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
} catch (error: any) {
|
||||||
|
return NextResponse.json(
|
||||||
|
{ success: false, error: error.message },
|
||||||
|
{ status: 500 }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -28,6 +28,8 @@ export class DriftService {
|
|||||||
private driftClient: DriftClient | null = null
|
private driftClient: DriftClient | null = null
|
||||||
private user: User | null = null
|
private user: User | null = null
|
||||||
private isInitialized: boolean = false
|
private isInitialized: boolean = false
|
||||||
|
private reconnectTimer: NodeJS.Timeout | null = null
|
||||||
|
private reconnectIntervalMs: number = 4 * 60 * 60 * 1000 // 4 hours (prevent memory leak)
|
||||||
|
|
||||||
constructor(private config: DriftConfig) {
|
constructor(private config: DriftConfig) {
|
||||||
this.connection = new Connection(config.rpcUrl, 'confirmed')
|
this.connection = new Connection(config.rpcUrl, 'confirmed')
|
||||||
@@ -163,12 +165,74 @@ export class DriftService {
|
|||||||
this.isInitialized = true
|
this.isInitialized = true
|
||||||
console.log('✅ Drift service initialized successfully')
|
console.log('✅ Drift service initialized successfully')
|
||||||
|
|
||||||
|
// Start periodic reconnection to prevent memory leaks
|
||||||
|
this.scheduleReconnection()
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('❌ Failed to initialize Drift service after retries:', error)
|
console.error('❌ Failed to initialize Drift service after retries:', error)
|
||||||
throw error
|
throw error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule periodic reconnection to prevent WebSocket memory leaks
|
||||||
|
* Drift SDK accumulates subscriptions over time, causing memory leaks
|
||||||
|
* Periodic reconnection clears old subscriptions and resets memory
|
||||||
|
*/
|
||||||
|
private scheduleReconnection(): void {
|
||||||
|
// Clear existing timer if any
|
||||||
|
if (this.reconnectTimer) {
|
||||||
|
clearTimeout(this.reconnectTimer)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Schedule reconnection every 4 hours
|
||||||
|
this.reconnectTimer = setTimeout(async () => {
|
||||||
|
try {
|
||||||
|
console.log('🔄 Scheduled reconnection: Clearing WebSocket subscriptions to prevent memory leak...')
|
||||||
|
await this.reconnect()
|
||||||
|
console.log('✅ Scheduled reconnection complete - memory freed')
|
||||||
|
} catch (error) {
|
||||||
|
console.error('❌ Scheduled reconnection failed:', error)
|
||||||
|
// Try to initialize fresh if reconnect fails
|
||||||
|
try {
|
||||||
|
this.isInitialized = false
|
||||||
|
await this.initialize()
|
||||||
|
} catch (reinitError) {
|
||||||
|
console.error('❌ Failed to reinitialize after reconnect failure:', reinitError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, this.reconnectIntervalMs)
|
||||||
|
|
||||||
|
console.log(`⏰ Scheduled reconnection in ${this.reconnectIntervalMs / 1000 / 60 / 60} hours`)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reconnect to Drift Protocol (clears old subscriptions)
|
||||||
|
*/
|
||||||
|
private async reconnect(): Promise<void> {
|
||||||
|
console.log('🔄 Reconnecting to Drift Protocol...')
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Unsubscribe from old connections
|
||||||
|
if (this.driftClient) {
|
||||||
|
await this.driftClient.unsubscribe()
|
||||||
|
console.log('✅ Unsubscribed from old Drift connection')
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset state
|
||||||
|
this.driftClient = null
|
||||||
|
this.user = null
|
||||||
|
this.isInitialized = false
|
||||||
|
|
||||||
|
// Reinitialize
|
||||||
|
await this.initialize()
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
console.error('❌ Reconnection failed:', error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get current USDC balance
|
* Get current USDC balance
|
||||||
*/
|
*/
|
||||||
@@ -369,6 +433,13 @@ export class DriftService {
|
|||||||
* Disconnect from Drift
|
* Disconnect from Drift
|
||||||
*/
|
*/
|
||||||
async disconnect(): Promise<void> {
|
async disconnect(): Promise<void> {
|
||||||
|
// Clear reconnection timer
|
||||||
|
if (this.reconnectTimer) {
|
||||||
|
clearTimeout(this.reconnectTimer)
|
||||||
|
this.reconnectTimer = null
|
||||||
|
console.log('⏰ Cleared reconnection timer')
|
||||||
|
}
|
||||||
|
|
||||||
if (this.driftClient) {
|
if (this.driftClient) {
|
||||||
await this.driftClient.unsubscribe()
|
await this.driftClient.unsubscribe()
|
||||||
console.log('✅ Drift client disconnected')
|
console.log('✅ Drift client disconnected')
|
||||||
|
|||||||
Reference in New Issue
Block a user