Stream TweetStream tweets with Python + websockets

Working Python example using the websockets package — copy, set your API key, and run.

Why use Python for TweetStream

TweetStream speaks raw WebSocket with a JSON envelope — every language that can open a WebSocket connection works. Python fits well when your trading logic already lives in NumPy, pandas, CCXT, or an LLM pipeline.

This page uses the websockets package (not websocket-client), which is the de-facto async library and handles subprotocol negotiation cleanly.

1. Install

pip install websockets

2. Minimal example

The smallest working stream. Prints tweet content as it arrives. Good for a sanity check before wiring in your downstream system.

import asyncio
import json
import os
import websockets

API_KEY = os.environ["TWEETSTREAM_API_KEY"]
WS_URL = "wss://ws.tweetstream.io/ws"
SUBPROTOCOLS = ["tweetstream.v1", f"tweetstream.auth.token.{API_KEY}"]


async def stream():
    async with websockets.connect(WS_URL, subprotocols=SUBPROTOCOLS) as ws:
        async for raw in ws:
            envelope = json.loads(raw)
            if envelope["t"] == "tweet" and envelope["op"] == "content":
                tweet = envelope["d"]
                handle = tweet["author"].get("handle", "unknown")
                print(f"[{handle}] {tweet['text']}")


asyncio.run(stream())

3. Production pattern

Adds structured logging, envelope dispatch, and an exponential-backoff reconnect loop. This is the skeleton most trading bots end up with.

import asyncio
import json
import logging
import os
from typing import Any

import websockets
from websockets.exceptions import ConnectionClosed

API_KEY = os.environ["TWEETSTREAM_API_KEY"]
WS_URL = "wss://ws.tweetstream.io/ws"
SUBPROTOCOLS = ["tweetstream.v1", f"tweetstream.auth.token.{API_KEY}"]

RECONNECT_DELAY_SECONDS = 5
MAX_BACKOFF_SECONDS = 60

log = logging.getLogger("tweetstream")


async def handle_envelope(envelope: dict[str, Any]) -> None:
    t, op, data = envelope["t"], envelope["op"], envelope["d"]

    if t == "tweet" and op == "content":
        handle = data["author"].get("handle", "unknown")
        log.info("tweet %s @%s: %s", data["tweetId"], handle, data["text"])
        # Fan-out to your trading bot / queue / database here.
        return

    if t == "tweet" and op == "meta":
        tokens = (data.get("detected") or {}).get("tokens") or []
        for token in tokens:
            log.info(
                "token %s priceUsd=%s chain=%s",
                token.get("symbol"),
                token.get("priceUsd"),
                token.get("chain"),
            )
        return

    if t == "account" and op == "profile_update":
        log.info("profile update %s", data)


async def stream_forever() -> None:
    backoff = RECONNECT_DELAY_SECONDS
    while True:
        try:
            async with websockets.connect(WS_URL, subprotocols=SUBPROTOCOLS) as ws:
                log.info("connected to TweetStream")
                backoff = RECONNECT_DELAY_SECONDS
                async for raw in ws:
                    try:
                        await handle_envelope(json.loads(raw))
                    except Exception:
                        log.exception("error handling envelope")
        except ConnectionClosed as exc:
            log.warning("connection closed: %s", exc)
        except Exception:
            log.exception("stream error")

        log.info("reconnecting in %ss", backoff)
        await asyncio.sleep(backoff)
        backoff = min(backoff * 2, MAX_BACKOFF_SECONDS)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(stream_forever())

Envelope fields you will use

  • t: "tweet", op: "content" — tweet text and author metadata
  • t: "tweet", op: "meta" — detected tokens, contract addresses, live prices, and OCR text
  • t: "account", op: "profile_update" — avatar, bio, handle changes on tracked accounts
  • t: "account", op: "follow" — new follows from tracked accounts

The complete envelope reference lives in the payloads docs.

Deployment tips

  • Run the worker as a long-lived process (systemd, Docker, Kubernetes Deployment). WebSocket sessions are stateless on our side — reconnecting is cheap.
  • Put a queue (Redis Streams, SQS, or NATS) between the stream worker and your trading logic. Handlers should return in under a second; longer work goes on the queue.
  • Store the last-seen tweet ID if you need gap detection. Use the history REST API on Elite plans to backfill if your worker is offline.

立即开启实时 Twitter WebSocket 提醒

内置 WebSocket 交付、OCR 与代币检测的 Twitter API 替代方案。

开始 7 天试用

起价 $199/月 · Basic/Elite 含 7 天试用 · OCR + 代币检测

相关页面