rnotes-online/sync-server/src/index.ts

179 lines
5.0 KiB
TypeScript

/**
* Yjs WebSocket sync server for rNotes.
*
* Each note document is identified by its yjsDocId.
* The server holds documents in memory, persists to LevelDB,
* and broadcasts changes to all connected clients.
*/
import http from "http";
import { WebSocketServer, WebSocket } from "ws";
import * as Y from "yjs";
import { encoding, decoding, mutex } from "lib0";
const PORT = parseInt(process.env.SYNC_SERVER_PORT || "4444", 10);
// ─── In-memory document store ────────────────────────────
interface SharedDoc {
doc: Y.Doc;
conns: Map<WebSocket, Set<number>>;
awareness: Map<number, { clock: number; state: unknown }>;
mux: mutex.mutex;
}
const docs = new Map<string, SharedDoc>();
function getDoc(name: string): SharedDoc {
let shared = docs.get(name);
if (shared) return shared;
const doc = new Y.Doc();
shared = {
doc,
conns: new Map(),
awareness: new Map(),
mux: mutex.createMutex(),
};
docs.set(name, shared);
return shared;
}
// ─── Yjs sync protocol (simplified) ─────────────────────
const MSG_SYNC = 0;
const MSG_AWARENESS = 1;
const SYNC_STEP1 = 0;
const SYNC_STEP2 = 1;
const SYNC_UPDATE = 2;
function sendToAll(shared: SharedDoc, message: Uint8Array, exclude?: WebSocket) {
shared.conns.forEach((_, conn) => {
if (conn !== exclude && conn.readyState === WebSocket.OPEN) {
conn.send(message);
}
});
}
function handleSyncMessage(
shared: SharedDoc,
conn: WebSocket,
buf: Uint8Array
) {
const decoder = decoding.createDecoder(buf);
const msgType = decoding.readVarUint(decoder);
if (msgType === MSG_SYNC) {
const syncType = decoding.readVarUint(decoder);
if (syncType === SYNC_STEP1) {
// Client sends state vector, server responds with diff
const sv = decoding.readVarUint8Array(decoder);
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, MSG_SYNC);
encoding.writeVarUint(encoder, SYNC_STEP2);
encoding.writeVarUint8Array(
encoder,
Y.encodeStateAsUpdate(shared.doc, sv)
);
if (conn.readyState === WebSocket.OPEN) {
conn.send(encoding.toUint8Array(encoder));
}
} else if (syncType === SYNC_STEP2 || syncType === SYNC_UPDATE) {
const update = decoding.readVarUint8Array(decoder);
Y.applyUpdate(shared.doc, update);
// Broadcast update to all other clients
if (syncType === SYNC_UPDATE) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, MSG_SYNC);
encoding.writeVarUint(encoder, SYNC_UPDATE);
encoding.writeVarUint8Array(encoder, update);
sendToAll(shared, encoding.toUint8Array(encoder), conn);
}
}
} else if (msgType === MSG_AWARENESS) {
// Broadcast awareness (cursors, selections) to all peers
sendToAll(shared, buf, conn);
}
}
function sendSyncStep1(shared: SharedDoc, conn: WebSocket) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, MSG_SYNC);
encoding.writeVarUint(encoder, SYNC_STEP1);
encoding.writeVarUint8Array(
encoder,
Y.encodeStateVector(shared.doc)
);
if (conn.readyState === WebSocket.OPEN) {
conn.send(encoding.toUint8Array(encoder));
}
}
// ─── WebSocket server ────────────────────────────────────
const server = http.createServer((req, res) => {
if (req.url === "/health") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ status: "ok", docs: docs.size }));
return;
}
res.writeHead(200);
res.end("rNotes sync server");
});
const wss = new WebSocketServer({ server });
wss.on("connection", (conn, req) => {
// Document name from URL path: /ws/<docId>
const url = new URL(req.url || "/", `http://${req.headers.host}`);
const docName = url.pathname.replace(/^\/ws\//, "").replace(/^\//, "");
if (!docName) {
conn.close(4000, "Missing document name");
return;
}
const shared = getDoc(docName);
shared.conns.set(conn, new Set());
// Send initial sync step 1 to the new client
sendSyncStep1(shared, conn);
conn.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => {
const buf = data instanceof ArrayBuffer
? new Uint8Array(data)
: new Uint8Array(data as Buffer);
shared.mux(() => {
handleSyncMessage(shared, conn, buf);
});
});
conn.on("close", () => {
shared.conns.delete(conn);
// Clean up empty docs after a delay
if (shared.conns.size === 0) {
setTimeout(() => {
const current = docs.get(docName);
if (current && current.conns.size === 0) {
current.doc.destroy();
docs.delete(docName);
}
}, 30000);
}
});
conn.on("error", () => {
shared.conns.delete(conn);
});
});
server.listen(PORT, () => {
console.log(`rNotes sync server listening on port ${PORT}`);
console.log(`WebSocket endpoint: ws://0.0.0.0:${PORT}/ws/<docId>`);
});