Files
trading_bot_v4/lib/drift/client.ts
mindesbunister f505db4ac8 fix: Reduce Drift SDK auto-reconnect interval from 4h to 2h
Problem: Bot froze after only 1 hour of runtime with API timeouts,
despite having 4-hour auto-reconnect protection for Drift SDK memory leak.

Investigation showed:
- Singleton pattern working correctly (reusing same instance)
- Hundreds of accountUnsubscribe errors (WebSocket leak)
- Container froze at ~1 hour, not 4 hours

Root Cause: Drift SDK's memory leak is MORE SEVERE than expected.
Even with single instance, subscriptions accumulate faster than anticipated.
4-hour interval too long - system hits memory/connection limits before cleanup.

Solution: Reduce auto-reconnect interval to 2 hours (more aggressive).
This ensures cleanup happens before critical thresholds reached.

Code change (lib/drift/client.ts):
- reconnectIntervalMs: 4 hours → 2 hours
- Updated log messages to reflect new interval

Impact: System now self-heals every 2 hours instead of 4,
preventing the freeze that occurred tonight at 1-hour mark.

Related: Common Pitfall #1 (Drift SDK memory leak)
2025-11-16 02:15:01 +01:00

535 lines
16 KiB
TypeScript

/**
* Drift Protocol Client
*
* Handles connection to Drift Protocol and basic operations
*/
import { Connection, PublicKey, Keypair } from '@solana/web3.js'
import { DriftClient, initialize, User, PerpMarkets } from '@drift-labs/sdk'
import bs58 from 'bs58'
// Manual wallet interface (more compatible than SDK Wallet class)
interface ManualWallet {
publicKey: PublicKey
signTransaction: (tx: any) => Promise<any>
signAllTransactions: (txs: any[]) => Promise<any[]>
}
export interface DriftConfig {
rpcUrl: string
alchemyRpcUrl?: string // Optional: Use Alchemy for trade operations (better sustained rate limits)
walletPrivateKey: string
env: 'mainnet-beta' | 'devnet'
}
export class DriftService {
private connection: Connection
private tradeConnection: Connection // Separate connection for trade operations (uses Alchemy if available)
private wallet: ManualWallet
private keypair: Keypair
private driftClient: DriftClient | null = null
private user: User | null = null
private isInitialized: boolean = false
private reconnectTimer: NodeJS.Timeout | null = null
private reconnectIntervalMs: number = 2 * 60 * 60 * 1000 // 2 hours (aggressive - Drift SDK memory leak is severe)
constructor(private config: DriftConfig) {
// Helius connection for Drift SDK initialization (handles burst subscriptions well)
this.connection = new Connection(config.rpcUrl, 'confirmed')
// Alchemy connection for trade operations (better sustained rate limits)
// Falls back to Helius if Alchemy not configured
this.tradeConnection = config.alchemyRpcUrl
? new Connection(config.alchemyRpcUrl, 'confirmed')
: this.connection
if (config.alchemyRpcUrl) {
console.log('🔀 Hybrid RPC mode: Helius for init, Alchemy for trades')
} else {
console.log('📡 Single RPC mode: Helius for all operations')
}
// Create wallet from private key
// Support both formats:
// 1. JSON array: [91,24,199,...] (from Phantom export as array)
// 2. Base58 string: "5Jm7X..." (from Phantom export as string)
let secretKey: Uint8Array
if (config.walletPrivateKey.startsWith('[')) {
// JSON array format
const keyArray = JSON.parse(config.walletPrivateKey)
secretKey = new Uint8Array(keyArray)
} else {
// Base58 string format
secretKey = bs58.decode(config.walletPrivateKey)
}
this.keypair = Keypair.fromSecretKey(secretKey)
// Create manual wallet interface (more reliable than SDK Wallet)
this.wallet = {
publicKey: this.keypair.publicKey,
signTransaction: async (tx) => {
if (typeof tx.partialSign === 'function') {
tx.partialSign(this.keypair)
} else if (typeof tx.sign === 'function') {
tx.sign([this.keypair])
}
return tx
},
signAllTransactions: async (txs) => {
return txs.map(tx => {
if (typeof tx.partialSign === 'function') {
tx.partialSign(this.keypair)
} else if (typeof tx.sign === 'function') {
tx.sign([this.keypair])
}
return tx
})
}
}
console.log('✅ Drift service created for wallet:', this.wallet.publicKey.toString())
}
/**
* Retry helper for handling transient network failures (DNS, timeouts)
*/
private async retryOperation<T>(
operation: () => Promise<T>,
maxRetries: number = 3,
delayMs: number = 2000,
operationName: string = 'operation'
): Promise<T> {
let lastError: Error | null = null
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation()
} catch (error: any) {
lastError = error
// Check if it's a transient network error
const isTransient =
error?.message?.includes('fetch failed') ||
error?.message?.includes('EAI_AGAIN') ||
error?.message?.includes('ENOTFOUND') ||
error?.message?.includes('ETIMEDOUT') ||
error?.message?.includes('ECONNREFUSED') ||
error?.code === 'EAI_AGAIN' ||
error?.cause?.code === 'EAI_AGAIN'
console.log(`🔍 Error detection: isTransient=${isTransient}, attempt=${attempt}/${maxRetries}`)
console.log(`🔍 Error details: message="${error?.message}", code="${error?.code}", cause.code="${error?.cause?.code}"`)
if (!isTransient || attempt === maxRetries) {
// Non-transient error or max retries reached - fail immediately
console.log(`❌ Not retrying: isTransient=${isTransient}, maxed=${attempt === maxRetries}`)
throw error
}
console.log(`⚠️ ${operationName} failed (attempt ${attempt}/${maxRetries}): ${error?.message || error}`)
console.log(`⏳ Retrying in ${delayMs}ms...`)
// Wait before retry
await new Promise(resolve => setTimeout(resolve, delayMs))
}
}
throw lastError || new Error(`${operationName} failed after ${maxRetries} retries`)
}
/**
* Initialize Drift client and subscribe to account updates
* Includes automatic retry for transient network failures (DNS, timeouts)
*/
async initialize(): Promise<void> {
if (this.isInitialized) {
console.log('⚠️ Drift service already initialized')
return
}
try {
console.log('🚀 Initializing Drift Protocol client...')
// Wrap initialization in retry logic to handle DNS failures
await this.retryOperation(async () => {
// Initialize Drift SDK (gets program IDs and config)
const sdkConfig = initialize({
env: this.config.env === 'devnet' ? 'devnet' : 'mainnet-beta'
})
// Create Drift client with manual wallet and SDK config
this.driftClient = new DriftClient({
connection: this.connection,
wallet: this.wallet as any, // Type assertion for compatibility
programID: new PublicKey(sdkConfig.DRIFT_PROGRAM_ID),
opts: {
commitment: 'confirmed',
},
})
// Subscribe to Drift account updates (this makes RPC calls)
await this.driftClient.subscribe()
console.log('✅ Drift client subscribed to account updates')
// Get user account
this.user = this.driftClient.getUser()
}, 3, 2000, 'Drift initialization')
this.isInitialized = true
console.log('✅ Drift service initialized successfully')
// Start periodic reconnection to prevent memory leaks
this.scheduleReconnection()
} catch (error) {
console.error('❌ Failed to initialize Drift service after retries:', error)
throw error
}
}
/**
* Schedule periodic reconnection to prevent WebSocket memory leaks
* Drift SDK accumulates subscriptions over time, causing memory leaks
* Periodic reconnection clears old subscriptions and resets memory
*/
private scheduleReconnection(): void {
// Clear existing timer if any
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
}
// Schedule reconnection every 2 hours
this.reconnectTimer = setTimeout(async () => {
try {
console.log('🔄 Scheduled reconnection: Clearing WebSocket subscriptions to prevent memory leak...')
await this.reconnect()
console.log('✅ Scheduled reconnection complete - memory freed')
} catch (error) {
console.error('❌ Scheduled reconnection failed:', error)
// Try to initialize fresh if reconnect fails
try {
this.isInitialized = false
await this.initialize()
} catch (reinitError) {
console.error('❌ Failed to reinitialize after reconnect failure:', reinitError)
}
}
}, this.reconnectIntervalMs)
console.log(`⏰ Scheduled reconnection in ${this.reconnectIntervalMs / 1000 / 60 / 60} hours`)
}
/**
* Reconnect to Drift Protocol (clears old subscriptions)
*/
private async reconnect(): Promise<void> {
console.log('🔄 Reconnecting to Drift Protocol...')
try {
// Unsubscribe from old connections
if (this.driftClient) {
await this.driftClient.unsubscribe()
console.log('✅ Unsubscribed from old Drift connection')
}
// Reset state
this.driftClient = null
this.user = null
this.isInitialized = false
// Reinitialize
await this.initialize()
} catch (error) {
console.error('❌ Reconnection failed:', error)
throw error
}
}
/**
* Get the Solana connection (Helius - for SDK operations)
*/
getConnection(): Connection {
return this.connection
}
/**
* Get the trade connection (Alchemy if configured, otherwise Helius)
* Use this for all trade operations (open, close, place orders)
*/
getTradeConnection(): Connection {
return this.tradeConnection
}
/**
* Get current USDC balance
*/
async getUSDCBalance(): Promise<number> {
this.ensureInitialized()
try {
const accountData = this.user!.getUserAccount()
// USDC spot balance (in quote currency)
const spotBalance = this.user!.getSpotMarketAssetValue(0) // 0 = USDC market
return Number(spotBalance) / 1e6 // USDC has 6 decimals
} catch (error) {
console.error('❌ Failed to get USDC balance:', error)
throw error
}
}
/**
* Get current position for a market
*/
async getPosition(marketIndex: number): Promise<{
size: number
entryPrice: number
unrealizedPnL: number
side: 'long' | 'short' | 'none'
} | null> {
this.ensureInitialized()
try {
const position = this.user!.getPerpPosition(marketIndex)
if (!position || position.baseAssetAmount.eqn(0)) {
return null
}
const baseAssetAmount = Number(position.baseAssetAmount) / 1e9 // 9 decimals
const quoteAssetAmount = Number(position.quoteAssetAmount) / 1e6 // 6 decimals
// Calculate entry price
const entryPrice = Math.abs(quoteAssetAmount / baseAssetAmount)
// Get unrealized P&L
const unrealizedPnL = Number(this.user!.getUnrealizedPNL(false, marketIndex)) / 1e6
const side = baseAssetAmount > 0 ? 'long' : baseAssetAmount < 0 ? 'short' : 'none'
return {
size: Math.abs(baseAssetAmount),
entryPrice,
unrealizedPnL,
side,
}
} catch (error) {
console.error(`❌ Failed to get position for market ${marketIndex}:`, error)
return null
}
}
/**
* Get all active positions
*/
async getAllPositions(): Promise<Array<{
marketIndex: number
symbol: string
size: number
entryPrice: number
unrealizedPnL: number
side: 'long' | 'short'
}>> {
this.ensureInitialized()
const positions = []
// Check common markets (SOL, BTC, ETH)
const markets = [
{ index: 0, symbol: 'SOL-PERP' },
{ index: 1, symbol: 'BTC-PERP' },
{ index: 2, symbol: 'ETH-PERP' },
]
for (const market of markets) {
const position = await this.getPosition(market.index)
if (position && position.side !== 'none') {
positions.push({
marketIndex: market.index,
symbol: market.symbol,
...position,
side: position.side as 'long' | 'short',
})
}
}
return positions
}
/**
* Get current oracle price for a market
*/
async getOraclePrice(marketIndex: number): Promise<number> {
this.ensureInitialized()
try {
const oracleData = this.driftClient!.getOracleDataForPerpMarket(marketIndex)
return Number(oracleData.price) / 1e6
} catch (error) {
console.error(`❌ Failed to get oracle price for market ${marketIndex}:`, error)
throw error
}
}
/**
* Get funding rate for a perpetual market
* Returns funding rate as percentage (e.g., 0.01 = 1% per 8 hours)
*/
async getFundingRate(marketIndex: number): Promise<number | null> {
this.ensureInitialized()
try {
const perpMarketAccount = this.driftClient!.getPerpMarketAccount(marketIndex)
if (!perpMarketAccount) {
console.warn(`⚠️ No perp market account found for index ${marketIndex}`)
return null
}
// Funding rate is stored as a number with 9 decimals (1e9)
// Convert to percentage
const fundingRate = Number(perpMarketAccount.amm.lastFundingRate) / 1e9
return fundingRate
} catch (error) {
console.error(`❌ Failed to get funding rate for market ${marketIndex}:`, error)
return null
}
}
/**
* Get account health (margin ratio)
*/
async getAccountHealth(): Promise<{
totalCollateral: number
totalLiability: number
freeCollateral: number
marginRatio: number
}> {
this.ensureInitialized()
try {
const totalCollateral = Number(this.user!.getTotalCollateral()) / 1e6
const totalLiability = Number(this.user!.getTotalLiabilityValue()) / 1e6
const freeCollateral = Number(this.user!.getFreeCollateral()) / 1e6
const marginRatio = totalLiability > 0
? totalCollateral / totalLiability
: Infinity
return {
totalCollateral,
totalLiability,
freeCollateral,
marginRatio,
}
} catch (error) {
console.error('❌ Failed to get account health:', error)
throw error
}
}
/**
* Get Drift client instance
*/
getClient(): DriftClient {
this.ensureInitialized()
return this.driftClient!
}
/**
* Get user instance
*/
getUser(): User {
this.ensureInitialized()
return this.user!
}
/**
* Disconnect from Drift
*/
async disconnect(): Promise<void> {
// Clear reconnection timer
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
console.log('⏰ Cleared reconnection timer')
}
if (this.driftClient) {
await this.driftClient.unsubscribe()
console.log('✅ Drift client disconnected')
}
this.isInitialized = false
}
/**
* Ensure service is initialized
*/
private ensureInitialized(): void {
if (!this.isInitialized || !this.driftClient || !this.user) {
throw new Error('Drift service not initialized. Call initialize() first.')
}
}
}
// Singleton instance with better persistence
let driftServiceInstance: DriftService | null = null
let initializationPromise: Promise<DriftService> | null = null
export function getDriftService(): DriftService {
if (!driftServiceInstance) {
const config: DriftConfig = {
rpcUrl: process.env.SOLANA_RPC_URL || 'https://api.mainnet-beta.solana.com',
alchemyRpcUrl: process.env.ALCHEMY_RPC_URL, // Optional: Alchemy for trade operations
walletPrivateKey: process.env.DRIFT_WALLET_PRIVATE_KEY || '',
env: (process.env.DRIFT_ENV as 'mainnet-beta' | 'devnet') || 'mainnet-beta',
}
if (!config.walletPrivateKey) {
throw new Error('DRIFT_WALLET_PRIVATE_KEY not set in environment')
}
driftServiceInstance = new DriftService(config)
console.log('🔄 Created new Drift service singleton')
} else {
console.log('♻️ Reusing existing Drift service instance')
}
return driftServiceInstance
}
export async function initializeDriftService(): Promise<DriftService> {
// If already initializing, return the same promise to avoid multiple concurrent inits
if (initializationPromise) {
console.log('⏳ Waiting for ongoing initialization...')
return initializationPromise
}
const service = getDriftService()
// If already initialized, return immediately
if (service['isInitialized']) {
console.log('✅ Drift service already initialized')
return service
}
// Start initialization and cache the promise
initializationPromise = service.initialize().then(() => {
initializationPromise = null // Clear after completion
return service
}).catch((error) => {
initializationPromise = null // Clear on error so it can be retried
throw error
})
return initializationPromise
}