API Reference
DBot OfficialDBot DashboardPricing

订阅新上交易对(WS)

用于订阅最新创建的流动池 / 交易对

FreePlusProEnterprise积分消耗
0

URL

wss://api-data-v1.dbotx.com/data/ws/

说明

为保证WebSocket连接的可用性和稳定性,需要至少每1分钟(建议每30-55秒)进行一次心跳订阅,否则系统会自动断开超时链接

请求示例

{
  "method": "subscribe", // 进行的操作,“subscribe”表示订阅,“unsubscribe”表示取消订阅
  "type": "newPairInfo", // 订阅的类型,“tx”表示实时交易,“pairsInfo”表示多交易对数据更新,“newPairInfo”表示新上交易对,“pairInfo”表示单交易对数据更新
  "args": {
    "pairsInfoInterval": "1h", // 订阅的时间区间,包括:1m/5m/1h/6h/24h
    "newPairInfoDex": "pump" // 订阅的Dex,包括:pump/boop/raydium_launchlab,不填表示订阅全部Dex
  }
}

响应数据

{
  "status": "ack", // 订阅状态,“ack”代表成功
  "method": "subscribeResponse",
  "result": {
    "type": "newPairInfo", // 订阅的类型,“tx”表示实时交易,“pairsInfo”表示多交易对数据更新,“newPairInfo”表示新上交易对,“pairInfo”表示单交易对数据更新
    "t": 1751008703051,
    "subscribed": [
      "newPairInfo"
    ],
    "message": "Connected and subscribed"
  }
}

以NodeJS为例

const WebSocket = require('ws')
function main() {
  const ws = new WebSocket('wss://api-data-v1.dbotx.com/data/ws/', {
    headers: {
      'x-api-key': 'YOUR_API_KEY',
    },
  })
  ws.on('open', () => {
    ws.send(
      JSON.stringify({
        method: 'subscribe',
        type: 'tx',
        args: {
          pair: [
            'CzhUkaHFjo8s9sp2SuM6QMrV518pRsu3KmqKRhNvMLeN',
            '7P1f3rzUV9493vEoA7nbrEJoV9NnWfUaGePgjGUb6EX9'
          ],
        },
      })
    )
    setInterval(() => {
      ws.ping()
    }, 30000)
  })
  ws.on('message', res => {
    console.log('res:', res.toString('utf-8'))
  })
}
main()

以Python为例

import asyncio
import websockets
import json

async def main():
    uri = "wss://api-data-v1.dbotx.com/data/ws/"
    headers = {"x-api-key": "YOUR_API_KEY"}
    msg = {
        "method": "subscribe",
        "type": "tx",
        "args": {
            "pair": [
                "CzhUkaHFjo8s9sp2SuM6QMrV518pRsu3KmqKRhNvMLeN",
                "7P1f3rzUV9493vEoA7nbrEJoV9NnWfUaGePgjGUb6EX9"
            ]
        }
    }

    async with websockets.connect(uri, additional_headers=headers) as ws:
        await ws.send(json.dumps(msg))

        async def keep_alive():
            while True:
                await ws.ping()
                await asyncio.sleep(30)

        async def listen_for_messages():
            async for message in ws:
                print(message)

        await asyncio.gather(keep_alive(), listen_for_messages())

if name == "main":
    asyncio.run(main())