Skip to main content

WebSocket Trading Bot

Build a real-time bot that reacts to price changes instantly via WebSocket streaming, instead of polling the REST API.

Architecture


Full Source Code

#!/usr/bin/env python3
"""
WebSocket Trading Bot for PolySimulator

Streams real-time prices via WebSocket and places orders via REST
when price signals are detected.

Usage:
    export POLYSIM_API_KEY="ps_live_..."
    python ws_bot.py
"""

import asyncio
import json
import os
import time

import aiohttp


API_KEY = os.environ["POLYSIM_API_KEY"]
BASE_URL = os.environ.get("POLYSIM_BASE_URL", "http://localhost:8000")
WS_URL = BASE_URL.replace("http://", "ws://").replace("https://", "wss://")


async def get_ws_token(session):
    """Obtain a short-lived WebSocket authentication token."""
    async with session.post(
        f"{BASE_URL}/v1/keys/ws-token",
        headers={"X-API-Key": API_KEY},
    ) as resp:
        data = await resp.json()
        return data["token"]


async def place_order(session, market_id, side, outcome, quantity):
    """Place an order via REST API."""
    payload = {
        "market_id": market_id,
        "side": side,
        "outcome": outcome,
        "quantity": str(quantity),
        "order_type": "market",
    }
    async with session.post(
        f"{BASE_URL}/v1/orders",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json",
            "Idempotency-Key": f"ws-{market_id}-{side}-{int(time.time())}",
        },
        json=payload,
    ) as resp:
        return await resp.json()


async def run_ws_bot(market_ids: list[str]):
    """Main WebSocket bot loop."""
    async with aiohttp.ClientSession() as session:
        # 1. Get WebSocket token
        token = await get_ws_token(session)
        print(f"Got WS token, connecting...")

        # 2. Connect to WebSocket
        async with session.ws_connect(
            f"{WS_URL}/v1/ws/prices?token={token}"
        ) as ws:
            # 3. Subscribe to markets
            await ws.send_json({
                "type": "subscribe",
                "channel": "prices",
                "market_ids": market_ids,
            })
            print(f"Subscribed to {len(market_ids)} markets")

            # 4. Process price updates
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)

                    if data.get("type") == "price_update":
                        market_id = data["market_id"]
                        yes_price = float(data["prices"]["yes"])

                        # Strategy: buy when Yes < 0.35, sell when > 0.65
                        if yes_price < 0.35:
                            print(f"BUY signal: {market_id} @ {yes_price:.2f}")
                            result = await place_order(
                                session, market_id, "BUY", "Yes", 5
                            )
                            print(f"  → {result.get('status')} @ {result.get('price')}")

                        elif yes_price > 0.65:
                            print(f"SELL signal: {market_id} @ {yes_price:.2f}")
                            result = await place_order(
                                session, market_id, "SELL", "Yes", 5
                            )
                            print(f"  → {result.get('status')} @ {result.get('price')}")

                    elif data.get("type") == "pong":
                        pass  # Heartbeat response

                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    print("WebSocket closed, reconnecting...")
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"WebSocket error: {ws.exception()}")
                    break


async def main():
    # Define which markets to monitor
    market_ids = [
        "0x1234abcd...",  # Replace with actual condition IDs
        "0x5678efgh...",
    ]

    while True:
        try:
            await run_ws_bot(market_ids)
        except Exception as e:
            print(f"Connection error: {e}")
        print("Reconnecting in 5s...")
        await asyncio.sleep(5)


if __name__ == "__main__":
    asyncio.run(main())

Why WebSocket Over Polling?

MetricREST Polling (5s)WebSocket
Latency0–5,000 ms<100 ms
API calls/hour720 per market1 (connection)
Rate limit riskHighNone
Data freshnessStale up to 5sReal-time
Use REST API for order placement and account queries. Use WebSocket for price monitoring and signal detection.

Connection Management

WebSocket tokens expire after 5 minutes. Your bot must handle reconnection:
  1. Detect close/error events
  2. Request a new token via POST /v1/keys/ws-token
  3. Re-subscribe to all channels

Next Steps