跳转到主要内容
本节介绍如何使用 StreamManagerV2 从 Injective Indexer API 流式获取实时数据。

StreamManagerV2

StreamManagerV2 提供了一个基于事件的架构,用于管理 gRPC 流,具有自动重试、指数退避和全面的错误处理功能。

主要特性

  • 基于事件的生命周期 - 监听连接、断开、错误和数据事件
  • 自动重试 - 可配置的指数退避和重试限制
  • 错误处理 - 区分可重试和不可重试的错误
  • 持久模式 - 在达到最大尝试次数后继续重试
  • 细粒度控制 - 启动、停止和管理流生命周期

基本用法

import { getNetworkEndpoints, Network } from '@injectivelabs/networks'
import { 
  StreamManagerV2,
  IndexerGrpcSpotStreamV2 
} from '@injectivelabs/sdk-ts/client/indexer'

const endpoints = getNetworkEndpoints(Network.Testnet)
const stream = new IndexerGrpcSpotStreamV2(endpoints.indexer)

const streamManager = new StreamManagerV2({
  id: 'my-stream',
  streamFactory: () => stream.streamOrders({ 
    marketId: '0x...',
    callback: (response) => {
      streamManager.emit('data', response)
    }
  }),
  onData: (data) => {
    console.log(data)
  },
  retryConfig: {
    enabled: true,
    maxAttempts: 5,
    initialDelayMs: 1000,
    maxDelayMs: 30000,
    backoffMultiplier: 2,
    persistent: true
  }
})

// 事件监听器
streamManager.on('connect', () => console.log('Connected'))
streamManager.on('disconnect', (reason) => console.log('Disconnected:', reason))
streamManager.on('error', (error) => console.error('Error:', error))
streamManager.on('stateChange', ({ from, to }) => console.log(`State: ${from} -> ${to}`))

// 启动/停止
streamManager.start()
streamManager.stop()

可用的 Stream 类

  • IndexerGrpcAccountStreamV2 - 账户余额和交易流
  • IndexerGrpcAccountPortfolioStreamV2 - 投资组合价值流
  • IndexerGrpcArchiverStreamV2 - Archiver 数据流
  • IndexerGrpcAuctionStreamV2 - 拍卖竞标流
  • IndexerGrpcDerivativesStreamV2 - 衍生品市场流
  • IndexerGrpcExplorerStreamV2 - 区块链浏览器流
  • IndexerGrpcMitoStreamV2 - Mito vault 流
  • IndexerGrpcOracleStreamV2 - Oracle 价格源流
  • IndexerGrpcSpotStreamV2 - 现货市场流
  • IndexerGrpcTradingStreamV2 - 交易自动化流

重试配置

retryConfig: {
  enabled: true,           // 启用/禁用重试
  maxAttempts: 5,          // 最大重试次数(0 = 无限)
  initialDelayMs: 1000,    // 初始退避延迟
  maxDelayMs: 30000,       // 最大退避延迟
  backoffMultiplier: 2,    // 指数退避乘数
  persistent: true         // 在 maxAttempts 后以最大延迟继续
}

事件类型

  • connect - 流成功连接
  • disconnect - 流断开连接及原因
  • error - 流发生错误
  • data - 收到新数据
  • stateChange - 流状态改变
  • retry - 重试尝试开始
  • warn - 警告消息

流示例