pip install websockets
import asyncio
import json
import time
import websockets
API_KEY = "dsk_your_key_here"
WS_URL = "wss://cryptolisting.ws"
# Filter exchanges (optional):
# WS_URL = "wss://cryptolisting.ws?cex=binance,upbit"
MAX_RETRIES = 20
def on_announcement(msg: dict):
now_us = int(time.time() * 1_000_000)
network_ms = (now_us - msg["dispatchTimestampUs"]) / 1000
print(f"[{msg['listingType']}] {msg['ticker']} on {msg['publisher']}")
print(f" {msg['title']}")
print(f" network={network_ms:.2f}ms")
async def run():
headers = {"X-API-Key": API_KEY}
for attempt in range(MAX_RETRIES):
try:
async with websockets.connect(WS_URL, extra_headers=headers) as ws:
print("connected")
async for raw in ws:
msg = json.loads(raw)
if msg["type"] == "welcome":
print(f"welcome: tier={msg['tier']} cex={msg['allowedCex']}")
await asyncio.sleep(15)
await ws.send(json.dumps({"type": "test"}))
elif msg["type"] in ("announcement", "test_announcement"):
if msg["type"] == "test_announcement":
print("[TEST] ", end="")
on_announcement(msg)
except websockets.ConnectionClosed as e:
reason = e.rcvd.reason if e.rcvd else ""
if reason in ("key_expired", "key_invalidated"):
print(f"key invalid: {reason}"); return
print(f"closed: {reason}")
except Exception as e:
print(f"error: {e}")
backoff = min(2 ** attempt, 300)
print(f"reconnecting in {backoff}s")
await asyncio.sleep(backoff)
if __name__ == "__main__":
asyncio.run(run())
npm install ws
const WebSocket = require("ws");
const API_KEY = "dsk_your_key_here";
const WS_URL = "wss://cryptolisting.ws";
// Filter exchanges (optional):
// const WS_URL = "wss://cryptolisting.ws?cex=binance,upbit";
const MAX_RETRIES = 20;
function onAnnouncement(msg) {
const nowUs = Date.now() * 1000;
const networkMs = (nowUs - msg.dispatchTimestampUs) / 1000;
console.log(`[${msg.listingType}] ${msg.ticker} on ${msg.publisher}`);
console.log(` ${msg.title}`);
console.log(` network=${networkMs.toFixed(2)}ms`);
}
function connect(attempt = 0) {
if (attempt >= MAX_RETRIES) return console.log("max retries");
const ws = new WebSocket(WS_URL, { headers: { "X-API-Key": API_KEY } });
ws.on("open", () => console.log("connected"));
ws.on("message", (data) => {
const msg = JSON.parse(data);
if (msg.type === "welcome") {
console.log(`welcome: tier=${msg.tier} cex=${msg.allowedCex}`);
setTimeout(() => ws.send(JSON.stringify({ type: "test" })), 15_000);
} else if (msg.type === "announcement" || msg.type === "test_announcement") {
if (msg.type === "test_announcement") process.stdout.write("[TEST] ");
onAnnouncement(msg);
}
});
ws.on("close", (code, reason) => {
const r = reason.toString();
if (r === "key_expired" || r === "key_invalidated") {
return console.log(`key invalid: ${r}`);
}
const backoff = Math.min(2 ** attempt, 300);
console.log(`closed (${code}); reconnecting in ${backoff}s`);
setTimeout(() => connect(attempt + 1), backoff * 1000);
});
ws.on("error", (err) => console.error("error:", err.message));
}
connect();
go get github.com/gorilla/websocket
package main
import (
"encoding/json"
"log"
"math"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
apiKey = "dsk_your_key_here"
wsURL = "wss://cryptolisting.ws"
maxRetries = 20
)
type Msg struct {
Type string `json:"type"`
Title string `json:"title,omitempty"`
Ticker string `json:"ticker,omitempty"`
Publisher string `json:"publisher,omitempty"`
ListingType string `json:"listingType,omitempty"`
DetectedTimestampUs uint64 `json:"detectedTimestampUs,omitempty"`
DispatchTimestampUs uint64 `json:"dispatchTimestampUs,omitempty"`
AbnormalDetectionLatency bool `json:"abnormalDetectionLatency,omitempty"`
Tier string `json:"tier,omitempty"`
AllowedCex string `json:"allowedCex,omitempty"`
}
func onAnnouncement(m Msg) {
nowUs := uint64(time.Now().UnixMicro())
networkMs := float64(nowUs-m.DispatchTimestampUs) / 1000
log.Printf("[%s] %s on %s | %s | network=%.2fms",
m.ListingType, m.Ticker, m.Publisher, m.Title, networkMs)
}
func main() {
headers := http.Header{"X-API-Key": {apiKey}}
for attempt := 0; attempt < maxRetries; attempt++ {
conn, resp, err := websocket.DefaultDialer.Dial(wsURL, headers)
if err != nil {
if resp != nil && resp.StatusCode == 403 {
log.Println("key invalid"); return
}
log.Printf("dial: %v", err)
backoff := time.Duration(math.Min(math.Pow(2, float64(attempt)), 300)) * time.Second
log.Printf("reconnect in %s", backoff)
time.Sleep(backoff)
continue
}
log.Println("connected")
attempt = 0
stop := false
for {
_, raw, err := conn.ReadMessage()
if err != nil {
if ce, ok := err.(*websocket.CloseError); ok {
if ce.Text == "key_expired" || ce.Text == "key_invalidated" {
log.Printf("key invalid: %s", ce.Text); stop = true
} else {
log.Printf("closed: %s", ce.Text)
}
} else {
log.Printf("read: %v", err)
}
break
}
var m Msg
json.Unmarshal(raw, &m)
switch m.Type {
case "welcome":
log.Printf("welcome: tier=%s cex=%s", m.Tier, m.AllowedCex)
time.Sleep(15 * time.Second)
conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"test"}`))
case "announcement", "test_announcement":
onAnnouncement(m)
}
}
conn.Close()
if stop { return }
backoff := time.Duration(math.Min(math.Pow(2, float64(attempt)), 300)) * time.Second
log.Printf("reconnect in %s", backoff)
time.Sleep(backoff)
}
}
Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
futures-util = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
http = "1"
use futures_util::StreamExt;
use serde::Deserialize;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message as Ws;
const API_KEY: &str = "dsk_your_key_here";
const WS_URL: &str = "wss://cryptolisting.ws";
const MAX_RETRIES: u32 = 20;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Msg {
#[serde(rename = "type")] msg_type: String,
#[serde(default)] title: String,
#[serde(default)] ticker: String,
#[serde(default)] publisher: String,
#[serde(default)] listing_type: String,
#[serde(default)] dispatch_timestamp_us: u64,
#[serde(default)] tier: String,
#[serde(default)] allowed_cex: String,
}
fn on_announcement(m: &Msg) {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64;
let network_ms = (now - m.dispatch_timestamp_us) as f64 / 1000.0;
println!("[{}] {} on {} | {} | network={:.2}ms",
m.listing_type, m.ticker, m.publisher, m.title, network_ms);
}
#[tokio::main]
async fn main() {
for attempt in 0..MAX_RETRIES {
let req = http::Request::builder()
.uri(WS_URL)
.header("X-API-Key", API_KEY)
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key",
tokio_tungstenite::tungstenite::handshake::client::generate_key())
.body(()).unwrap();
let ws = match connect_async(req).await {
Ok((ws, _)) => ws,
Err(e) => {
eprintln!("dial: {e}");
let backoff = Duration::from_secs((2u64.pow(attempt)).min(300));
tokio::time::sleep(backoff).await;
continue;
}
};
println!("connected");
let (_, mut rx) = ws.split();
let mut stop = false;
while let Some(frame) = rx.next().await {
match frame {
Err(e) => { eprintln!("read: {e}"); break; }
Ok(Ws::Close(Some(CloseFrame { reason, .. }))) => {
if reason == "key_expired" || reason == "key_invalidated" {
eprintln!("key invalid: {reason}"); stop = true;
} else { eprintln!("closed: {reason}"); }
break;
}
Ok(f) => {
if let Ok(text) = f.to_text() {
if let Ok(m) = serde_json::from_str::<Msg>(text) {
match m.msg_type.as_str() {
"welcome" => println!("welcome: tier={} cex={}", m.tier, m.allowed_cex),
"announcement" | "test_announcement" => on_announcement(&m),
_ => {}
}
}
}
}
}
}
if stop { return; }
let backoff = Duration::from_secs((2u64.pow(attempt)).min(300));
tokio::time::sleep(backoff).await;
}
}