rspace-online/modules/rnotes/yjs-ws-provider.ts

205 lines
5.3 KiB
TypeScript

/**
* Custom Yjs WebSocket provider that bridges over the existing rSpace WebSocket.
*
* Instead of opening a separate y-websocket connection, this provider sends
* Yjs sync/awareness messages as JSON payloads through the rSpace runtime's
* WebSocket, using `yjs-sync` and `yjs-awareness` message types.
*
* The server simply relays these messages to all other peers in the same space.
*/
import * as Y from 'yjs';
import { writeSyncStep1, writeUpdate, readSyncMessage } from 'y-protocols/sync';
import {
Awareness,
encodeAwarenessUpdate,
applyAwarenessUpdate,
removeAwarenessStates,
} from 'y-protocols/awareness';
import {
createEncoder,
toUint8Array,
length as encoderLength,
} from 'lib0/encoding';
import { createDecoder } from 'lib0/decoding';
/** Minimal interface for the rSpace runtime's custom message API. */
interface RuntimeBridge {
sendCustom(msg: Record<string, any>): void;
onCustomMessage(type: string, cb: (msg: any) => void): () => void;
onConnect(cb: () => void): () => void;
onDisconnect(cb: () => void): () => void;
isOnline: boolean;
}
export class RSpaceYjsProvider {
readonly doc: Y.Doc;
readonly awareness: Awareness;
readonly noteId: string;
private runtime: RuntimeBridge;
private unsubs: (() => void)[] = [];
private connected = false;
private synced = false;
constructor(noteId: string, ydoc: Y.Doc, runtime: RuntimeBridge) {
this.noteId = noteId;
this.doc = ydoc;
this.runtime = runtime;
this.awareness = new Awareness(ydoc);
this.setupListeners();
if (runtime.isOnline) {
this.onConnect();
}
}
private setupListeners(): void {
// Listen for Yjs sync messages from other peers
this.unsubs.push(
this.runtime.onCustomMessage('yjs-sync', (msg: any) => {
if (msg.noteId !== this.noteId) return;
this.handleSyncMessage(msg.data);
})
);
// Listen for awareness messages from other peers
this.unsubs.push(
this.runtime.onCustomMessage('yjs-awareness', (msg: any) => {
if (msg.noteId !== this.noteId) return;
this.handleAwarenessMessage(msg.data);
})
);
// Listen for connect/disconnect
this.unsubs.push(
this.runtime.onConnect(() => this.onConnect())
);
this.unsubs.push(
this.runtime.onDisconnect(() => this.onDisconnect())
);
// When local doc changes, send update to peers
const updateHandler = (update: Uint8Array, origin: any) => {
if (origin === 'remote') return; // Don't echo back remote updates
this.sendDocUpdate(update);
};
this.doc.on('update', updateHandler);
this.unsubs.push(() => this.doc.off('update', updateHandler));
// When local awareness changes, broadcast
const awarenessHandler = (changes: {
added: number[];
updated: number[];
removed: number[];
}, origin: string | null) => {
if (origin === 'remote') return;
const changedClients = changes.added.concat(changes.updated).concat(changes.removed);
const update = encodeAwarenessUpdate(this.awareness, changedClients);
this.runtime.sendCustom({
type: 'yjs-awareness',
noteId: this.noteId,
data: Array.from(update),
});
};
this.awareness.on('update', awarenessHandler);
this.unsubs.push(() => this.awareness.off('update', awarenessHandler));
}
private onConnect(): void {
if (this.connected) return;
this.connected = true;
// Send initial sync step 1 (state vector)
const encoder = createEncoder();
writeSyncStep1(encoder, this.doc);
this.runtime.sendCustom({
type: 'yjs-sync',
noteId: this.noteId,
data: Array.from(toUint8Array(encoder)),
});
// Send full awareness state
const awarenessUpdate = encodeAwarenessUpdate(
this.awareness,
[this.doc.clientID],
);
this.runtime.sendCustom({
type: 'yjs-awareness',
noteId: this.noteId,
data: Array.from(awarenessUpdate),
});
}
private onDisconnect(): void {
this.connected = false;
this.synced = false;
// Remove all remote awareness states on disconnect
const states = Array.from(this.awareness.getStates().keys())
.filter(client => client !== this.doc.clientID);
removeAwarenessStates(this.awareness, states, this);
}
private handleSyncMessage(data: number[]): void {
const decoder = createDecoder(new Uint8Array(data));
const encoder = createEncoder();
const messageType = readSyncMessage(
decoder, encoder, this.doc, 'remote'
);
// If the response encoder has content, send it back
if (encoderLength(encoder) > 0) {
this.runtime.sendCustom({
type: 'yjs-sync',
noteId: this.noteId,
data: Array.from(toUint8Array(encoder)),
});
}
// After receiving sync step 2, we're synced
if (messageType === 1) { // syncStep2
this.synced = true;
}
}
private handleAwarenessMessage(data: number[]): void {
applyAwarenessUpdate(
this.awareness,
new Uint8Array(data),
'remote',
);
}
private sendDocUpdate(update: Uint8Array): void {
if (!this.connected) return;
const encoder = createEncoder();
writeUpdate(encoder, update);
this.runtime.sendCustom({
type: 'yjs-sync',
noteId: this.noteId,
data: Array.from(toUint8Array(encoder)),
});
}
get isSynced(): boolean {
return this.synced;
}
destroy(): void {
// Remove local awareness state
removeAwarenessStates(
this.awareness,
[this.doc.clientID],
this,
);
// Clean up all listeners
for (const unsub of this.unsubs) {
try { unsub(); } catch { /* ignore */ }
}
this.unsubs = [];
this.connected = false;
}
}