/** * Drift Protocol Client * * Handles connection to Drift Protocol and basic operations */ import { Connection, PublicKey, Keypair } from '@solana/web3.js' import { logger } from '../utils/logger' import { DriftClient, initialize, User, PerpMarkets } from '@drift-labs/sdk' import bs58 from 'bs58' import { getDriftHealthMonitor } from '../monitoring/drift-health-monitor' // Manual wallet interface (more compatible than SDK Wallet class) interface ManualWallet { publicKey: PublicKey signTransaction: (tx: any) => Promise signAllTransactions: (txs: any[]) => Promise } export interface DriftConfig { rpcUrl: string alchemyRpcUrl?: string // Optional: Use Alchemy for trade operations (better sustained rate limits) walletPrivateKey: string env: 'mainnet-beta' | 'devnet' } export class DriftService { private connection: Connection private tradeConnection: Connection // Separate connection for trade operations (uses Alchemy if available) private wallet: ManualWallet private keypair: Keypair private driftClient: DriftClient | null = null private user: User | null = null private isInitialized: boolean = false constructor(private config: DriftConfig) { // Helius connection for Drift SDK initialization (handles burst subscriptions well) this.connection = new Connection(config.rpcUrl, 'confirmed') // Alchemy connection for trade operations (better sustained rate limits) // Falls back to Helius if Alchemy not configured this.tradeConnection = config.alchemyRpcUrl ? new Connection(config.alchemyRpcUrl, 'confirmed') : this.connection if (config.alchemyRpcUrl) { logger.log('🔀 Hybrid RPC mode: Helius for init, Alchemy for trades') } else { logger.log('📡 Single RPC mode: Helius for all operations') } // Create wallet from private key // Support both formats: // 1. JSON array: [91,24,199,...] (from Phantom export as array) // 2. Base58 string: "5Jm7X..." (from Phantom export as string) let secretKey: Uint8Array if (config.walletPrivateKey.startsWith('[')) { // JSON array format const keyArray = JSON.parse(config.walletPrivateKey) secretKey = new Uint8Array(keyArray) } else { // Base58 string format secretKey = bs58.decode(config.walletPrivateKey) } this.keypair = Keypair.fromSecretKey(secretKey) // Create manual wallet interface (more reliable than SDK Wallet) this.wallet = { publicKey: this.keypair.publicKey, signTransaction: async (tx) => { if (typeof tx.partialSign === 'function') { tx.partialSign(this.keypair) } else if (typeof tx.sign === 'function') { tx.sign([this.keypair]) } return tx }, signAllTransactions: async (txs) => { return txs.map(tx => { if (typeof tx.partialSign === 'function') { tx.partialSign(this.keypair) } else if (typeof tx.sign === 'function') { tx.sign([this.keypair]) } return tx }) } } logger.log('✅ Drift service created for wallet:', this.wallet.publicKey.toString()) } /** * Retry helper for handling transient network failures (DNS, timeouts) */ private async retryOperation( operation: () => Promise, maxRetries: number = 3, delayMs: number = 2000, operationName: string = 'operation' ): Promise { let lastError: Error | null = null for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await operation() } catch (error: any) { lastError = error // Check if it's a transient network error const isTransient = error?.message?.includes('fetch failed') || error?.message?.includes('EAI_AGAIN') || error?.message?.includes('ENOTFOUND') || error?.message?.includes('ETIMEDOUT') || error?.message?.includes('ECONNREFUSED') || error?.code === 'EAI_AGAIN' || error?.cause?.code === 'EAI_AGAIN' logger.log(`🔍 Error detection: isTransient=${isTransient}, attempt=${attempt}/${maxRetries}`) logger.log(`🔍 Error details: message="${error?.message}", code="${error?.code}", cause.code="${error?.cause?.code}"`) if (!isTransient || attempt === maxRetries) { // Non-transient error or max retries reached - fail immediately logger.log(`❌ Not retrying: isTransient=${isTransient}, maxed=${attempt === maxRetries}`) throw error } logger.log(`⚠️ ${operationName} failed (attempt ${attempt}/${maxRetries}): ${error?.message || error}`) logger.log(`⏳ Retrying in ${delayMs}ms...`) // Wait before retry await new Promise(resolve => setTimeout(resolve, delayMs)) } } throw lastError || new Error(`${operationName} failed after ${maxRetries} retries`) } /** * Initialize Drift client and subscribe to account updates * Includes automatic retry for transient network failures (DNS, timeouts) */ async initialize(): Promise { if (this.isInitialized) { logger.log('⚠️ Drift service already initialized') return } try { logger.log('🚀 Initializing Drift Protocol client...') // Wrap initialization in retry logic to handle DNS failures await this.retryOperation(async () => { // Initialize Drift SDK (gets program IDs and config) const sdkConfig = initialize({ env: this.config.env === 'devnet' ? 'devnet' : 'mainnet-beta' }) // Create Drift client with manual wallet and SDK config this.driftClient = new DriftClient({ connection: this.connection, wallet: this.wallet as any, // Type assertion for compatibility programID: new PublicKey(sdkConfig.DRIFT_PROGRAM_ID), opts: { commitment: 'confirmed', }, }) // Subscribe to Drift account updates (this makes RPC calls) await this.driftClient.subscribe() logger.log('✅ Drift client subscribed to account updates') // Get user account this.user = this.driftClient.getUser() }, 3, 2000, 'Drift initialization') this.isInitialized = true logger.log('✅ Drift service initialized successfully') // CRITICAL FIX (Nov 25, 2025): Intercept errors BEFORE starting monitor // Without this, errors aren't recorded and auto-restart never triggers logger.log('🔧 Setting up error interception for health monitoring...') this.interceptWebSocketErrors() logger.log('✅ Error interception active') // Start health monitoring (error-based restart instead of blind timer) const monitor = getDriftHealthMonitor() monitor.start() } catch (error) { console.error('❌ Failed to initialize Drift service after retries:', error) throw error } } /** * Intercept WebSocket errors for health monitoring */ private interceptWebSocketErrors(): void { const monitor = getDriftHealthMonitor() // Patch console.error to catch accountUnsubscribe errors const originalConsoleError = console.error console.error = (...args: any[]) => { const errorMessage = args.join(' ') // Detect accountUnsubscribe errors (Drift SDK memory leak symptom) if (errorMessage.includes('accountUnsubscribe error') || errorMessage.includes('readyState was 2')) { monitor.recordError('accountUnsubscribe') } // Call original console.error originalConsoleError.apply(console, args) } } /** * Get the Solana connection (Helius - for SDK operations) */ getConnection(): Connection { return this.connection } /** * Get the trade connection (Alchemy if configured, otherwise Helius) * Use this for all trade operations (open, close, place orders) */ getTradeConnection(): Connection { return this.tradeConnection } /** * Get current USDC balance */ async getUSDCBalance(): Promise { this.ensureInitialized() try { const accountData = this.user!.getUserAccount() // USDC spot balance (in quote currency) const spotBalance = this.user!.getSpotMarketAssetValue(0) // 0 = USDC market return Number(spotBalance) / 1e6 // USDC has 6 decimals } catch (error) { console.error('❌ Failed to get USDC balance:', error) throw error } } /** * Get current position for a market */ async getPosition(marketIndex: number): Promise<{ size: number entryPrice: number unrealizedPnL: number side: 'long' | 'short' | 'none' } | null> { this.ensureInitialized() try { const position = this.user!.getPerpPosition(marketIndex) if (!position || position.baseAssetAmount.eqn(0)) { return null } const baseAssetAmount = Number(position.baseAssetAmount) / 1e9 // 9 decimals const quoteAssetAmount = Number(position.quoteAssetAmount) / 1e6 // 6 decimals // Calculate entry price const entryPrice = Math.abs(quoteAssetAmount / baseAssetAmount) // Get unrealized P&L const unrealizedPnL = Number(this.user!.getUnrealizedPNL(false, marketIndex)) / 1e6 const side = baseAssetAmount > 0 ? 'long' : baseAssetAmount < 0 ? 'short' : 'none' return { size: Math.abs(baseAssetAmount), entryPrice, unrealizedPnL, side, } } catch (error) { console.error(`❌ Failed to get position for market ${marketIndex}:`, error) return null } } /** * Get all active positions */ async getAllPositions(): Promise> { this.ensureInitialized() const positions = [] // Check common markets (SOL, BTC, ETH) const markets = [ { index: 0, symbol: 'SOL-PERP' }, { index: 1, symbol: 'BTC-PERP' }, { index: 2, symbol: 'ETH-PERP' }, ] for (const market of markets) { const position = await this.getPosition(market.index) if (position && position.side !== 'none') { positions.push({ marketIndex: market.index, symbol: market.symbol, ...position, side: position.side as 'long' | 'short', }) } } return positions } /** * Get current oracle price for a market */ async getOraclePrice(marketIndex: number): Promise { this.ensureInitialized() try { const oracleData = this.driftClient!.getOracleDataForPerpMarket(marketIndex) return Number(oracleData.price) / 1e6 } catch (error) { console.error(`❌ Failed to get oracle price for market ${marketIndex}:`, error) throw error } } /** * Get funding rate for a perpetual market * Returns funding rate as percentage (e.g., 0.01 = 1% per 8 hours) */ async getFundingRate(marketIndex: number): Promise { this.ensureInitialized() try { const perpMarketAccount = this.driftClient!.getPerpMarketAccount(marketIndex) if (!perpMarketAccount) { console.warn(`⚠️ No perp market account found for index ${marketIndex}`) return null } // Funding rate is stored as a number with 9 decimals (1e9) // Convert to percentage const fundingRate = Number(perpMarketAccount.amm.lastFundingRate) / 1e9 return fundingRate } catch (error) { console.error(`❌ Failed to get funding rate for market ${marketIndex}:`, error) return null } } /** * Get account health (margin ratio) */ async getAccountHealth(): Promise<{ totalCollateral: number totalLiability: number freeCollateral: number marginRatio: number }> { this.ensureInitialized() try { const totalCollateral = Number(this.user!.getTotalCollateral()) / 1e6 const totalLiability = Number(this.user!.getTotalLiabilityValue()) / 1e6 const freeCollateral = Number(this.user!.getFreeCollateral()) / 1e6 const marginRatio = totalLiability > 0 ? totalCollateral / totalLiability : Infinity return { totalCollateral, totalLiability, freeCollateral, marginRatio, } } catch (error) { console.error('❌ Failed to get account health:', error) throw error } } /** * Get Drift client instance */ getClient(): DriftClient { this.ensureInitialized() return this.driftClient! } /** * Get user instance */ getUser(): User { this.ensureInitialized() return this.user! } /** * Disconnect from Drift */ async disconnect(): Promise { if (this.driftClient) { await this.driftClient.unsubscribe() logger.log('✅ Drift client disconnected') } this.isInitialized = false } /** * Ensure service is initialized */ private ensureInitialized(): void { if (!this.isInitialized || !this.driftClient || !this.user) { throw new Error('Drift service not initialized. Call initialize() first.') } } } // Singleton instance with better persistence let driftServiceInstance: DriftService | null = null let initializationPromise: Promise | null = null export function getDriftService(): DriftService { if (!driftServiceInstance) { const config: DriftConfig = { rpcUrl: process.env.SOLANA_RPC_URL || 'https://api.mainnet-beta.solana.com', alchemyRpcUrl: process.env.ALCHEMY_RPC_URL, // Optional: Alchemy for trade operations walletPrivateKey: process.env.DRIFT_WALLET_PRIVATE_KEY || '', env: (process.env.DRIFT_ENV as 'mainnet-beta' | 'devnet') || 'mainnet-beta', } if (!config.walletPrivateKey) { throw new Error('DRIFT_WALLET_PRIVATE_KEY not set in environment') } driftServiceInstance = new DriftService(config) logger.log('🔄 Created new Drift service singleton') } else { logger.log('♻️ Reusing existing Drift service instance') } return driftServiceInstance } export async function initializeDriftService(): Promise { // If already initializing, return the same promise to avoid multiple concurrent inits if (initializationPromise) { logger.log('⏳ Waiting for ongoing initialization...') return initializationPromise } const service = getDriftService() // If already initialized, return immediately if (service['isInitialized']) { logger.log('✅ Drift service already initialized') return service } // Start initialization and cache the promise initializationPromise = service.initialize().then(() => { initializationPromise = null // Clear after completion return service }).catch((error) => { initializationPromise = null // Clear on error so it can be retried throw error }) return initializationPromise }