/** * Pyth Price Feed Integration * * Real-time price monitoring using Pyth Network oracles */ import { Connection, PublicKey } from '@solana/web3.js' import { PriceServiceConnection } from '@pythnetwork/price-service-client' import { getMarketConfig } from '../../config/trading' export interface PriceUpdate { symbol: string price: number confidence: number timestamp: number slot?: number expo: number } export interface PriceMonitorConfig { symbols: string[] // e.g., ['SOL-PERP', 'BTC-PERP'] onPriceUpdate: (update: PriceUpdate) => void | Promise onError?: (error: Error) => void } /** * Pyth Price Monitor * * Monitors prices via WebSocket with RPC polling fallback */ export class PythPriceMonitor { private priceService: PriceServiceConnection private connection: Connection private isMonitoring: boolean = false private priceCache: Map = new Map() private pollingIntervals: Map = new Map() private lastUpdateTime: Map = new Map() constructor( connection: Connection, hermesUrl: string = 'https://hermes.pyth.network' ) { this.connection = connection this.priceService = new PriceServiceConnection(hermesUrl, { priceFeedRequestConfig: { binary: true, }, }) console.log('✅ Pyth price monitor created') } /** * Start monitoring prices for multiple symbols */ async start(config: PriceMonitorConfig): Promise { if (this.isMonitoring) { console.warn('⚠️ Price monitor already running') return } console.log('🚀 Starting Pyth price monitor for:', config.symbols) try { // Get Pyth price feed IDs for all symbols const priceIds = config.symbols.map(symbol => { const marketConfig = getMarketConfig(symbol) return marketConfig.pythPriceFeedId }) console.log('📡 Subscribing to Pyth WebSocket...') // Subscribe to Pyth WebSocket for real-time updates this.priceService.subscribePriceFeedUpdates(priceIds, (priceFeed) => { try { const price = priceFeed.getPriceUnchecked() // Find which symbol this feed belongs to const symbol = config.symbols.find(sym => { const marketConfig = getMarketConfig(sym) return marketConfig.pythPriceFeedId === `0x${priceFeed.id}` }) if (symbol && price) { const priceNumber = Number(price.price) * Math.pow(10, price.expo) const confidenceNumber = Number(price.conf) * Math.pow(10, price.expo) const update: PriceUpdate = { symbol, price: priceNumber, confidence: confidenceNumber, timestamp: Date.now(), expo: price.expo, } // Cache the update this.priceCache.set(symbol, update) this.lastUpdateTime.set(symbol, Date.now()) // Notify callback Promise.resolve(config.onPriceUpdate(update)).catch(error => { if (config.onError) { config.onError(error as Error) } }) } } catch (error) { console.error('❌ Error processing Pyth price update:', error) if (config.onError) { config.onError(error as Error) } } }) console.log('✅ Pyth WebSocket subscribed') // Start polling fallback (every 2 seconds) in case WebSocket fails this.startPollingFallback(config) this.isMonitoring = true console.log('✅ Price monitoring active') } catch (error) { console.error('❌ Failed to start price monitor:', error) throw error } } /** * Polling fallback - checks prices every 2 seconds via RPC */ private startPollingFallback(config: PriceMonitorConfig): void { console.log('🔄 Starting polling fallback (every 2s)...') for (const symbol of config.symbols) { const interval = setInterval(async () => { try { // Only poll if WebSocket hasn't updated in 5 seconds const lastUpdate = this.lastUpdateTime.get(symbol) || 0 const timeSinceUpdate = Date.now() - lastUpdate if (timeSinceUpdate > 5000) { console.log(`⚠️ WebSocket stale for ${symbol}, using polling fallback`) await this.fetchPriceViaRPC(symbol, config.onPriceUpdate) } } catch (error) { console.error(`❌ Polling error for ${symbol}:`, error) if (config.onError) { config.onError(error as Error) } } }, 2000) // Poll every 2 seconds this.pollingIntervals.set(symbol, interval) } console.log('✅ Polling fallback active') } /** * Fetch price via RPC (fallback method) */ private async fetchPriceViaRPC( symbol: string, onUpdate: (update: PriceUpdate) => void | Promise ): Promise { try { const priceIds = [getMarketConfig(symbol).pythPriceFeedId] const priceFeeds = await this.priceService.getLatestPriceFeeds(priceIds) if (priceFeeds && priceFeeds.length > 0) { const priceFeed = priceFeeds[0] const price = priceFeed.getPriceUnchecked() const priceNumber = Number(price.price) * Math.pow(10, price.expo) const confidenceNumber = Number(price.conf) * Math.pow(10, price.expo) const update: PriceUpdate = { symbol, price: priceNumber, confidence: confidenceNumber, timestamp: Date.now(), expo: price.expo, } this.priceCache.set(symbol, update) this.lastUpdateTime.set(symbol, Date.now()) await onUpdate(update) } } catch (error) { console.error(`❌ RPC fetch failed for ${symbol}:`, error) throw error } } /** * Get cached price (instant, no network call) */ getCachedPrice(symbol: string): PriceUpdate | null { return this.priceCache.get(symbol) || null } /** * Get all cached prices */ getAllCachedPrices(): Map { return new Map(this.priceCache) } /** * Check if monitoring is active */ isActive(): boolean { return this.isMonitoring } /** * Stop monitoring */ async stop(): Promise { if (!this.isMonitoring) { return } console.log('🛑 Stopping price monitor...') // Clear polling intervals this.pollingIntervals.forEach(interval => clearInterval(interval)) this.pollingIntervals.clear() // Close Pyth WebSocket (if implemented by library) // Note: PriceServiceConnection doesn't have explicit close method // WebSocket will be garbage collected this.priceCache.clear() this.lastUpdateTime.clear() this.isMonitoring = false console.log('✅ Price monitor stopped') } } // Singleton instance let pythPriceMonitorInstance: PythPriceMonitor | null = null export function getPythPriceMonitor(): PythPriceMonitor { if (!pythPriceMonitorInstance) { const connection = new Connection( process.env.SOLANA_RPC_URL || 'https://api.mainnet-beta.solana.com', 'confirmed' ) const hermesUrl = process.env.PYTH_HERMES_URL || 'https://hermes.pyth.network' pythPriceMonitorInstance = new PythPriceMonitor(connection, hermesUrl) } return pythPriceMonitorInstance }