Requires: pip install websockets
import asyncio
import json
import ssl
import time
import websockets
API_KEY = "dsk_your_key_here"
WS_URL = "wss://cryptolisting.ws"
# Filter specific exchanges (optional):
# WS_URL = "wss://cryptolisting.ws?cex=binance,upbit"
HEARTBEAT_TIMEOUT = 35 # seconds
MAX_RETRIES = 20
def on_announcement(msg: dict):
now_us = int(time.time() * 1_000_000)
network_ms = (now_us - msg["dispatchTimestampUs"]) / 1000
total_ms = (now_us - msg["publishTimestampUs"]) / 1000
print(f"[{msg['listingType']}] {msg['ticker']} on {msg['publisher']}")
print(f" Title: {msg['title']}")
print(f" Latency: {total_ms:.2f}ms total, {network_ms:.2f}ms network")
async def connect():
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
headers = {"X-API-Key": API_KEY}
for attempt in range(MAX_RETRIES):
try:
async with websockets.connect(
WS_URL, extra_headers=headers, ssl=ssl_ctx
) as ws:
print("Connected!")
while True:
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=HEARTBEAT_TIMEOUT
)
except asyncio.TimeoutError:
print("No heartbeat -- reconnecting")
break
msg = json.loads(raw)
if msg["type"] == "welcome":
print(f"Welcome: tier={msg['tier']}, "
f"cex={msg['allowedCex']}")
# Request a test announcement to verify integration
await ws.send(json.dumps({"type": "test"}))
elif msg["type"] in ("announcement", "test_announcement"):
prefix = "[TEST] " if msg["type"] == "test_announcement" else ""
print(f"{prefix}", end="")
on_announcement(msg)
elif msg["type"] == "heartbeat":
pass # Connection alive
except websockets.ConnectionClosed as e:
if e.rcvd:
reason = e.rcvd.reason
if reason in ("key_expired", "key_revoked"):
print(f"Key is no longer valid: {reason}")
return
print(f"Disconnected: {reason}")
except Exception as e:
print(f"Connection error: {e}")
backoff = min(2 ** attempt, 300)
print(f"Reconnecting in {backoff}s (attempt {attempt + 1})")
await asyncio.sleep(backoff)
print("Max retries exceeded")
if __name__ == "__main__":
asyncio.run(connect())
Requires: npm install ws
const WebSocket = require("ws");
const API_KEY = "dsk_your_key_here";
const WS_URL = "wss://cryptolisting.ws";
// Filter specific exchanges (optional):
// const WS_URL = "wss://cryptolisting.ws?cex=binance,upbit";
const HEARTBEAT_TIMEOUT_MS = 35_000;
const MAX_RETRIES = 20;
function onAnnouncement(msg) {
const nowUs = Date.now() * 1000;
const networkMs = (nowUs - msg.dispatchTimestampUs) / 1000;
const totalMs = (nowUs - msg.publishTimestampUs) / 1000;
console.log(`[${msg.listingType}] ${msg.ticker} on ${msg.publisher}`);
console.log(` Title: ${msg.title}`);
console.log(` Latency: ${totalMs.toFixed(2)}ms total, ${networkMs.toFixed(2)}ms network`);
}
function connect(attempt = 0) {
if (attempt >= MAX_RETRIES) {
console.log("Max retries exceeded");
return;
}
const ws = new WebSocket(WS_URL, {
headers: { "X-API-Key": API_KEY },
rejectUnauthorized: false,
});
let heartbeatTimer = null;
function resetHeartbeat() {
clearTimeout(heartbeatTimer);
heartbeatTimer = setTimeout(() => {
console.log("No heartbeat -- reconnecting");
ws.terminate();
}, HEARTBEAT_TIMEOUT_MS);
}
ws.on("open", () => {
console.log("Connected!");
resetHeartbeat();
});
ws.on("message", (data) => {
resetHeartbeat();
const msg = JSON.parse(data);
if (msg.type === "welcome") {
console.log(`Welcome: tier=${msg.tier}, cex=${msg.allowedCex}`);
// Request a test announcement to verify integration
ws.send(JSON.stringify({ type: "test" }));
} else if (msg.type === "announcement" || msg.type === "test_announcement") {
const prefix = msg.type === "test_announcement" ? "[TEST] " : "";
console.log(prefix);
onAnnouncement(msg);
}
// heartbeat: no action needed
});
ws.on("close", (code, reason) => {
clearTimeout(heartbeatTimer);
const reasonStr = reason.toString();
if (reasonStr === "key_expired" || reasonStr === "key_revoked") {
console.log(`Key is no longer valid: ${reasonStr}`);
return;
}
const backoff = Math.min(2 ** attempt, 300);
console.log(`Disconnected (${code}). Reconnecting in ${backoff}s...`);
setTimeout(() => connect(attempt + 1), backoff * 1000);
});
ws.on("error", (err) => {
console.error("WebSocket error:", err.message);
});
}
connect();
Requires: go get github.com/gorilla/websocket
package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"log"
"math"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
apiKey = "dsk_your_key_here"
wsURL = "wss://cryptolisting.ws"
// Filter specific exchanges (optional):
// wsURL = "wss://cryptolisting.ws?cex=binance,upbit"
heartbeatTimeout = 35 * time.Second
maxRetries = 20
)
type Message struct {
Type string `json:"type"`
Title string `json:"title,omitempty"`
Ticker string `json:"ticker,omitempty"`
Publisher string `json:"publisher,omitempty"`
ListingType string `json:"listingType,omitempty"`
PublishTimestampUs uint64 `json:"publishTimestampUs,omitempty"`
DetectedTimestampUs uint64 `json:"detectedTimestampUs,omitempty"`
DispatchTimestampUs uint64 `json:"dispatchTimestampUs,omitempty"`
Tier string `json:"tier,omitempty"`
MaxConnections int `json:"maxConnections,omitempty"`
AllowedCex string `json:"allowedCex,omitempty"`
TimestampNs uint64 `json:"timestampNs,omitempty"`
TimeUtc string `json:"timeUtc,omitempty"`
}
func onAnnouncement(msg Message) {
nowUs := uint64(time.Now().UnixMicro())
networkMs := float64(nowUs-msg.DispatchTimestampUs) / 1000
totalMs := float64(nowUs-msg.PublishTimestampUs) / 1000
fmt.Printf("[%s] %s on %s\n", msg.ListingType, msg.Ticker, msg.Publisher)
fmt.Printf(" Title: %s\n", msg.Title)
fmt.Printf(" Latency: %.2fms total, %.2fms network\n", totalMs, networkMs)
}
func connect() {
dialer := websocket.Dialer{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
headers := http.Header{"X-API-Key": {apiKey}}
for attempt := 0; attempt < maxRetries; attempt++ {
conn, resp, err := dialer.Dial(wsURL, headers)
if err != nil {
if resp != nil {
log.Printf("Connection failed (HTTP %d): %v", resp.StatusCode, err)
if resp.StatusCode == 403 {
log.Println("API key is invalid, expired, or revoked. Stopping.")
return
}
} else {
log.Printf("Connection failed: %v", err)
}
backoff := math.Min(math.Pow(2, float64(attempt)), 300)
log.Printf("Reconnecting in %.0fs (attempt %d)", backoff, attempt+1)
time.Sleep(time.Duration(backoff) * time.Second)
continue
}
log.Println("Connected!")
attempt = 0 // reset backoff on successful connection
conn.SetReadDeadline(time.Now().Add(heartbeatTimeout))
shouldStop := false
for {
_, rawMsg, err := conn.ReadMessage()
if err != nil {
if closeErr, ok := err.(*websocket.CloseError); ok {
reason := closeErr.Text
if reason == "key_expired" || reason == "key_revoked" {
log.Printf("Key is no longer valid: %s. Stopping.", reason)
shouldStop = true
} else {
log.Printf("Disconnected: %s (code %d)", reason, closeErr.Code)
}
} else {
log.Printf("Read error: %v", err)
}
break
}
conn.SetReadDeadline(time.Now().Add(heartbeatTimeout))
var msg Message
json.Unmarshal(rawMsg, &msg)
switch msg.Type {
case "welcome":
log.Printf("Welcome: tier=%s, cex=%s", msg.Tier, msg.AllowedCex)
// Request a test announcement to verify integration
conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"test"}`))
case "announcement", "test_announcement":
if msg.Type == "test_announcement" {
fmt.Print("[TEST] ")
}
onAnnouncement(msg)
case "heartbeat":
// Connection alive
}
}
conn.Close()
if shouldStop {
return
}
backoff := math.Min(math.Pow(2, float64(attempt)), 300)
log.Printf("Reconnecting in %.0fs (attempt %d)", backoff, attempt+1)
time.Sleep(time.Duration(backoff) * time.Second)
}
log.Println("Max retries exceeded")
}
func main() {
connect()
}
Requires in Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
native-tls = "0.2"
use futures_util::StreamExt;
use serde::Deserialize;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_tungstenite::{connect_async_tls_with_config, Connector};
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message as WsMessage;
const API_KEY: &str = "dsk_your_key_here";
const WS_URL: &str = "wss://cryptolisting.ws";
// Filter specific exchanges (optional):
// const WS_URL: &str = "wss://cryptolisting.ws?cex=binance,upbit";
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(35);
const MAX_RETRIES: u32 = 20;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Msg {
r#type: String,
#[serde(default)] title: String,
#[serde(default)] ticker: String,
#[serde(default)] publisher: String,
#[serde(default)] listing_type: String,
#[serde(default)] publish_timestamp_us: u64,
#[serde(default)] dispatch_timestamp_us: u64,
#[serde(default)] tier: String,
#[serde(default)] allowed_cex: String,
}
fn on_announcement(msg: &Msg) {
let now_us = SystemTime::now()
.duration_since(UNIX_EPOCH).unwrap()
.as_micros() as u64;
let network_ms = (now_us - msg.dispatch_timestamp_us) as f64 / 1000.0;
let total_ms = (now_us - msg.publish_timestamp_us) as f64 / 1000.0;
println!("[{}] {} on {}", msg.listing_type, msg.ticker, msg.publisher);
println!(" Title: {}", msg.title);
println!(" Latency: {:.2}ms total, {:.2}ms network", total_ms, network_ms);
}
#[tokio::main]
async fn main() {
let tls = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap();
for attempt in 0..MAX_RETRIES {
let request = http::Request::builder()
.uri(WS_URL)
.header("X-API-Key", API_KEY)
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key",
tokio_tungstenite::tungstenite::handshake::client::generate_key())
.body(())
.unwrap();
let connector = Connector::NativeTls(tls.clone());
let ws = match connect_async_tls_with_config(
request, None, false, Some(connector)
).await {
Ok((ws, _)) => ws,
Err(e) => {
eprintln!("Connection failed: {}", e);
let backoff = Duration::from_secs((2u64.pow(attempt)).min(300));
eprintln!("Reconnecting in {}s (attempt {})", backoff.as_secs(), attempt + 1);
tokio::time::sleep(backoff).await;
continue;
}
};
println!("Connected!");
let (_, mut read) = ws.split();
let mut should_stop = false;
loop {
match tokio::time::timeout(HEARTBEAT_TIMEOUT, read.next()).await {
Err(_) => {
eprintln!("No heartbeat -- reconnecting");
break;
}
Ok(None) => {
eprintln!("Connection closed");
break;
}
Ok(Some(Err(e))) => {
eprintln!("Read error: {}", e);
break;
}
Ok(Some(Ok(WsMessage::Close(Some(CloseFrame { reason, .. }))))) => {
if reason == "key_expired" || reason == "key_revoked" {
eprintln!("Key is no longer valid: {}. Stopping.", reason);
should_stop = true;
} else {
eprintln!("Disconnected: {}", reason);
}
break;
}
Ok(Some(Ok(frame))) => {
if let Ok(text) = frame.to_text() {
if let Ok(msg) = serde_json::from_str::<Msg>(text) {
match msg.r#type.as_str() {
"welcome" => {
println!("Welcome: tier={}, cex={}", msg.tier, msg.allowed_cex);
// Request a test announcement to verify integration
// (requires access to write half)
}
"announcement" | "test_announcement" => {
if msg.r#type == "test_announcement" {
print!("[TEST] ");
}
on_announcement(&msg);
}
"heartbeat" => {} // Connection alive
_ => {}
}
}
}
}
}
}
if should_stop {
return;
}
let backoff = Duration::from_secs((2u64.pow(attempt)).min(300));
eprintln!("Reconnecting in {}s (attempt {})", backoff.as_secs(), attempt + 1);
tokio::time::sleep(backoff).await;
}
eprintln!("Max retries exceeded");
}