/** * Custom Yjs WebSocket provider that bridges over the existing rSpace WebSocket. * * Instead of opening a separate y-websocket connection, this provider sends * Yjs sync/awareness messages as JSON payloads through the rSpace runtime's * WebSocket, using `yjs-sync` and `yjs-awareness` message types. * * The server simply relays these messages to all other peers in the same space. */ import * as Y from 'yjs'; import { writeSyncStep1, writeUpdate, readSyncMessage } from 'y-protocols/sync'; import { Awareness, encodeAwarenessUpdate, applyAwarenessUpdate, removeAwarenessStates, } from 'y-protocols/awareness'; import { createEncoder, toUint8Array, length as encoderLength, } from 'lib0/encoding'; import { createDecoder } from 'lib0/decoding'; /** Minimal interface for the rSpace runtime's custom message API. */ interface RuntimeBridge { sendCustom(msg: Record): void; onCustomMessage(type: string, cb: (msg: any) => void): () => void; onConnect(cb: () => void): () => void; onDisconnect(cb: () => void): () => void; isOnline: boolean; } export class RSpaceYjsProvider { readonly doc: Y.Doc; readonly awareness: Awareness; readonly noteId: string; private runtime: RuntimeBridge; private unsubs: (() => void)[] = []; private connected = false; private synced = false; constructor(noteId: string, ydoc: Y.Doc, runtime: RuntimeBridge) { this.noteId = noteId; this.doc = ydoc; this.runtime = runtime; this.awareness = new Awareness(ydoc); this.setupListeners(); if (runtime.isOnline) { this.onConnect(); } } private setupListeners(): void { // Listen for Yjs sync messages from other peers this.unsubs.push( this.runtime.onCustomMessage('yjs-sync', (msg: any) => { if (msg.noteId !== this.noteId) return; this.handleSyncMessage(msg.data); }) ); // Listen for awareness messages from other peers this.unsubs.push( this.runtime.onCustomMessage('yjs-awareness', (msg: any) => { if (msg.noteId !== this.noteId) return; this.handleAwarenessMessage(msg.data); }) ); // Listen for connect/disconnect this.unsubs.push( this.runtime.onConnect(() => this.onConnect()) ); this.unsubs.push( this.runtime.onDisconnect(() => this.onDisconnect()) ); // When local doc changes, send update to peers const updateHandler = (update: Uint8Array, origin: any) => { if (origin === 'remote') return; // Don't echo back remote updates this.sendDocUpdate(update); }; this.doc.on('update', updateHandler); this.unsubs.push(() => this.doc.off('update', updateHandler)); // When local awareness changes, broadcast const awarenessHandler = (changes: { added: number[]; updated: number[]; removed: number[]; }, origin: string | null) => { if (origin === 'remote') return; const changedClients = changes.added.concat(changes.updated).concat(changes.removed); const update = encodeAwarenessUpdate(this.awareness, changedClients); this.runtime.sendCustom({ type: 'yjs-awareness', noteId: this.noteId, data: Array.from(update), }); }; this.awareness.on('update', awarenessHandler); this.unsubs.push(() => this.awareness.off('update', awarenessHandler)); } private onConnect(): void { if (this.connected) return; this.connected = true; // Send initial sync step 1 (state vector) const encoder = createEncoder(); writeSyncStep1(encoder, this.doc); this.runtime.sendCustom({ type: 'yjs-sync', noteId: this.noteId, data: Array.from(toUint8Array(encoder)), }); // Send full awareness state const awarenessUpdate = encodeAwarenessUpdate( this.awareness, [this.doc.clientID], ); this.runtime.sendCustom({ type: 'yjs-awareness', noteId: this.noteId, data: Array.from(awarenessUpdate), }); } private onDisconnect(): void { this.connected = false; this.synced = false; // Remove all remote awareness states on disconnect const states = Array.from(this.awareness.getStates().keys()) .filter(client => client !== this.doc.clientID); removeAwarenessStates(this.awareness, states, this); } private handleSyncMessage(data: number[]): void { const decoder = createDecoder(new Uint8Array(data)); const encoder = createEncoder(); const messageType = readSyncMessage( decoder, encoder, this.doc, 'remote' ); // If the response encoder has content, send it back if (encoderLength(encoder) > 0) { this.runtime.sendCustom({ type: 'yjs-sync', noteId: this.noteId, data: Array.from(toUint8Array(encoder)), }); } // After receiving sync step 2, we're synced if (messageType === 1) { // syncStep2 this.synced = true; } } private handleAwarenessMessage(data: number[]): void { applyAwarenessUpdate( this.awareness, new Uint8Array(data), 'remote', ); } private sendDocUpdate(update: Uint8Array): void { if (!this.connected) return; const encoder = createEncoder(); writeUpdate(encoder, update); this.runtime.sendCustom({ type: 'yjs-sync', noteId: this.noteId, data: Array.from(toUint8Array(encoder)), }); } get isSynced(): boolean { return this.synced; } destroy(): void { // Remove local awareness state removeAwarenessStates( this.awareness, [this.doc.clientID], this, ); // Clean up all listeners for (const unsub of this.unsubs) { try { unsub(); } catch { /* ignore */ } } this.unsubs = []; this.connected = false; } }