Realtime
Server-push — the Django Channels / Daphne counterpart — over
Centrifugo, a self-hosted real-time messaging
server. Like every optional service in the stack it is env-gated: without
the CENTRIFUGO_* variables @repo/realtime no-ops and the app falls back to
whatever polling it already does. Nothing depends on it to boot.
docker compose --profile realtime up -dThe split
The browser opens a WebSocket and subscribes; the server publishes over Centrifugo's HTTP API. The two never trust the browser to publish — messages originate server-side, which is the security model that makes a shared channel safe.
Server side
A short-lived HS256 token authenticates the socket as the signed-in user; the HMAC secret never leaves the server. Publishing is a best-effort side channel — it never throws, so callers fire-and-forget:
/**
* Mint a Centrifugo connection token: an HS256 JWT whose `sub` is the user id,
* signed with the shared HMAC secret. Centrifugo verifies it on WebSocket
* connect, so the socket is authenticated as that user. Default TTL 1 hour.
*/
export function issueConnectionToken(userId: string, ttlSeconds = 3600): string {
const secret = process.env.CENTRIFUGO_TOKEN_HMAC_SECRET;
if (!secret) throw new Error("CENTRIFUGO_TOKEN_HMAC_SECRET is not set");
const header = base64url(JSON.stringify({ alg: "HS256", typ: "JWT" }));
// `exp` would need a wall clock; the caller passes ttl and we let the
// platform clock fill it in — kept simple and stateless.
const now = Math.floor(Date.now() / 1000);
const payload = base64url(JSON.stringify({ sub: userId, exp: now + ttlSeconds }));
const signature = createHmac("sha256", secret).update(`${header}.${payload}`).digest("base64url");
return `${header}.${payload}.${signature}`;
}
/** The browser WebSocket endpoint, e.g. ws://localhost:8000/connection/websocket. */
export function realtimeWsUrl(): string {
return process.env.CENTRIFUGO_WS_URL ?? "ws://localhost:8000/connection/websocket";
}/**
* Publish a JSON message to a channel through Centrifugo's HTTP API. A no-op
* (returns false) when realtime is disabled, so callers can fire-and-forget
* without guarding. Never throws on a delivery failure — realtime is a
* best-effort side channel, not the source of truth.
*/
export async function publish(channel: string, data: unknown): Promise<boolean> {
if (!isRealtimeEnabled()) return false;
const apiUrl = process.env.CENTRIFUGO_API_URL ?? "http://localhost:8000/api";
try {
const res = await fetch(`${apiUrl}/publish`, {
method: "POST",
headers: {
"content-type": "application/json",
"X-API-Key": process.env.CENTRIFUGO_API_KEY!,
},
body: JSON.stringify({ channel, data }),
signal: AbortSignal.timeout(5000),
});
return res.ok;
} catch {
return false;
}
}Browser side
@repo/realtime/client is a thin wrapper over the centrifuge client — pure
transport, no app types or strings. The consumer fetches { url, token, channel } from a server function and subscribes:
/**
* Connect with a server-minted token and subscribe to one channel. `onMessage`
* fires for every publication; `onState` reports connect/disconnect so the UI
* can show a live/offline dot. Returns a handle whose `close()` tears it all
* down (call it on unmount).
*/
export function subscribe(opts: {
url: string;
token: string;
channel: string;
onMessage: (data: unknown) => void;
onState?: (connected: boolean) => void;
}): RealtimeHandle {
const centrifuge = new Centrifuge(opts.url, { token: opts.token });
centrifuge.on("connected", () => opts.onState?.(true));
centrifuge.on("disconnected", () => opts.onState?.(false));
const sub: Subscription = centrifuge.newSubscription(opts.channel);
sub.on("publication", (ctx) => opts.onMessage(ctx.data));
sub.subscribe();
centrifuge.connect();
return {
close: () => {
sub.unsubscribe();
centrifuge.disconnect();
},
};
}Try it
/sandbox/realtime subscribes to a per-user channel over a real WebSocket,
then a publish round-trips the message back into the page live. The block spec
(e2e/sandbox/realtime.spec.ts) waits for the socket to actually connect, then
asserts a published message arrives through the subscription — not via a local
echo.