205 lines
5.3 KiB
TypeScript
205 lines
5.3 KiB
TypeScript
/**
|
|
* Custom Yjs WebSocket provider that bridges over the existing rSpace WebSocket.
|
|
*
|
|
* Instead of opening a separate y-websocket connection, this provider sends
|
|
* Yjs sync/awareness messages as JSON payloads through the rSpace runtime's
|
|
* WebSocket, using `yjs-sync` and `yjs-awareness` message types.
|
|
*
|
|
* The server simply relays these messages to all other peers in the same space.
|
|
*/
|
|
|
|
import * as Y from 'yjs';
|
|
import { writeSyncStep1, writeUpdate, readSyncMessage } from 'y-protocols/sync';
|
|
import {
|
|
Awareness,
|
|
encodeAwarenessUpdate,
|
|
applyAwarenessUpdate,
|
|
removeAwarenessStates,
|
|
} from 'y-protocols/awareness';
|
|
import {
|
|
createEncoder,
|
|
toUint8Array,
|
|
length as encoderLength,
|
|
} from 'lib0/encoding';
|
|
import { createDecoder } from 'lib0/decoding';
|
|
|
|
/** Minimal interface for the rSpace runtime's custom message API. */
|
|
interface RuntimeBridge {
|
|
sendCustom(msg: Record<string, any>): void;
|
|
onCustomMessage(type: string, cb: (msg: any) => void): () => void;
|
|
onConnect(cb: () => void): () => void;
|
|
onDisconnect(cb: () => void): () => void;
|
|
isOnline: boolean;
|
|
}
|
|
|
|
export class RSpaceYjsProvider {
|
|
readonly doc: Y.Doc;
|
|
readonly awareness: Awareness;
|
|
readonly noteId: string;
|
|
|
|
private runtime: RuntimeBridge;
|
|
private unsubs: (() => void)[] = [];
|
|
private connected = false;
|
|
private synced = false;
|
|
|
|
constructor(noteId: string, ydoc: Y.Doc, runtime: RuntimeBridge) {
|
|
this.noteId = noteId;
|
|
this.doc = ydoc;
|
|
this.runtime = runtime;
|
|
this.awareness = new Awareness(ydoc);
|
|
|
|
this.setupListeners();
|
|
|
|
if (runtime.isOnline) {
|
|
this.onConnect();
|
|
}
|
|
}
|
|
|
|
private setupListeners(): void {
|
|
// Listen for Yjs sync messages from other peers
|
|
this.unsubs.push(
|
|
this.runtime.onCustomMessage('yjs-sync', (msg: any) => {
|
|
if (msg.noteId !== this.noteId) return;
|
|
this.handleSyncMessage(msg.data);
|
|
})
|
|
);
|
|
|
|
// Listen for awareness messages from other peers
|
|
this.unsubs.push(
|
|
this.runtime.onCustomMessage('yjs-awareness', (msg: any) => {
|
|
if (msg.noteId !== this.noteId) return;
|
|
this.handleAwarenessMessage(msg.data);
|
|
})
|
|
);
|
|
|
|
// Listen for connect/disconnect
|
|
this.unsubs.push(
|
|
this.runtime.onConnect(() => this.onConnect())
|
|
);
|
|
this.unsubs.push(
|
|
this.runtime.onDisconnect(() => this.onDisconnect())
|
|
);
|
|
|
|
// When local doc changes, send update to peers
|
|
const updateHandler = (update: Uint8Array, origin: any) => {
|
|
if (origin === 'remote') return; // Don't echo back remote updates
|
|
this.sendDocUpdate(update);
|
|
};
|
|
this.doc.on('update', updateHandler);
|
|
this.unsubs.push(() => this.doc.off('update', updateHandler));
|
|
|
|
// When local awareness changes, broadcast
|
|
const awarenessHandler = (changes: {
|
|
added: number[];
|
|
updated: number[];
|
|
removed: number[];
|
|
}, origin: string | null) => {
|
|
if (origin === 'remote') return;
|
|
const changedClients = changes.added.concat(changes.updated).concat(changes.removed);
|
|
const update = encodeAwarenessUpdate(this.awareness, changedClients);
|
|
this.runtime.sendCustom({
|
|
type: 'yjs-awareness',
|
|
noteId: this.noteId,
|
|
data: Array.from(update),
|
|
});
|
|
};
|
|
this.awareness.on('update', awarenessHandler);
|
|
this.unsubs.push(() => this.awareness.off('update', awarenessHandler));
|
|
}
|
|
|
|
private onConnect(): void {
|
|
if (this.connected) return;
|
|
this.connected = true;
|
|
|
|
// Send initial sync step 1 (state vector)
|
|
const encoder = createEncoder();
|
|
writeSyncStep1(encoder, this.doc);
|
|
this.runtime.sendCustom({
|
|
type: 'yjs-sync',
|
|
noteId: this.noteId,
|
|
data: Array.from(toUint8Array(encoder)),
|
|
});
|
|
|
|
// Send full awareness state
|
|
const awarenessUpdate = encodeAwarenessUpdate(
|
|
this.awareness,
|
|
[this.doc.clientID],
|
|
);
|
|
this.runtime.sendCustom({
|
|
type: 'yjs-awareness',
|
|
noteId: this.noteId,
|
|
data: Array.from(awarenessUpdate),
|
|
});
|
|
}
|
|
|
|
private onDisconnect(): void {
|
|
this.connected = false;
|
|
this.synced = false;
|
|
|
|
// Remove all remote awareness states on disconnect
|
|
const states = Array.from(this.awareness.getStates().keys())
|
|
.filter(client => client !== this.doc.clientID);
|
|
removeAwarenessStates(this.awareness, states, this);
|
|
}
|
|
|
|
private handleSyncMessage(data: number[]): void {
|
|
const decoder = createDecoder(new Uint8Array(data));
|
|
const encoder = createEncoder();
|
|
const messageType = readSyncMessage(
|
|
decoder, encoder, this.doc, 'remote'
|
|
);
|
|
|
|
// If the response encoder has content, send it back
|
|
if (encoderLength(encoder) > 0) {
|
|
this.runtime.sendCustom({
|
|
type: 'yjs-sync',
|
|
noteId: this.noteId,
|
|
data: Array.from(toUint8Array(encoder)),
|
|
});
|
|
}
|
|
|
|
// After receiving sync step 2, we're synced
|
|
if (messageType === 1) { // syncStep2
|
|
this.synced = true;
|
|
}
|
|
}
|
|
|
|
private handleAwarenessMessage(data: number[]): void {
|
|
applyAwarenessUpdate(
|
|
this.awareness,
|
|
new Uint8Array(data),
|
|
'remote',
|
|
);
|
|
}
|
|
|
|
private sendDocUpdate(update: Uint8Array): void {
|
|
if (!this.connected) return;
|
|
const encoder = createEncoder();
|
|
writeUpdate(encoder, update);
|
|
this.runtime.sendCustom({
|
|
type: 'yjs-sync',
|
|
noteId: this.noteId,
|
|
data: Array.from(toUint8Array(encoder)),
|
|
});
|
|
}
|
|
|
|
get isSynced(): boolean {
|
|
return this.synced;
|
|
}
|
|
|
|
destroy(): void {
|
|
// Remove local awareness state
|
|
removeAwarenessStates(
|
|
this.awareness,
|
|
[this.doc.clientID],
|
|
this,
|
|
);
|
|
// Clean up all listeners
|
|
for (const unsub of this.unsubs) {
|
|
try { unsub(); } catch { /* ignore */ }
|
|
}
|
|
this.unsubs = [];
|
|
this.connected = false;
|
|
}
|
|
}
|