rspace-online/server/local-first/sync-server.ts

354 lines
9.4 KiB
TypeScript

/**
* Layer 4: Sync (Server) — Multi-document WebSocket sync relay.
*
* Acts as a sync peer for multiple Automerge documents. Can operate in two modes:
*
* 1. **Relay mode** (encrypted docs): Forwards encrypted sync messages between peers.
* Server can't read document content — just routes by docId.
*
* 2. **Participant mode** (shared/unencrypted docs): Server maintains its own copy
* of each document, can read/index/validate. Used for Intent/Claim pattern.
*
* Wire protocol matches the client (shared/local-first/sync.ts).
*/
import * as Automerge from '@automerge/automerge';
// ============================================================================
// TYPES
// ============================================================================
interface SyncMessage {
type: 'sync';
docId: string;
data: number[];
}
interface SubscribeMessage {
type: 'subscribe';
docIds: string[];
}
interface UnsubscribeMessage {
type: 'unsubscribe';
docIds: string[];
}
interface AwarenessMessage {
type: 'awareness';
docId: string;
peer: string;
cursor?: { x: number; y: number };
selection?: string;
username?: string;
color?: string;
}
type WireMessage =
| SyncMessage
| SubscribeMessage
| UnsubscribeMessage
| AwarenessMessage
| { type: 'ping' }
| { type: 'pong' };
export interface Peer {
id: string;
ws: WebSocket;
subscribedDocs: Set<string>;
syncStates: Map<string, Automerge.SyncState>;
claims?: Record<string, unknown>; // Auth claims
}
export interface SyncServerOptions {
/** If true, server maintains Automerge docs and participates in sync (default: true) */
participantMode?: boolean;
/** Called when a document changes (participant mode only) */
onDocChange?: (docId: string, doc: Automerge.Doc<any>) => void;
}
// ============================================================================
// SyncServer
// ============================================================================
export class SyncServer {
#peers = new Map<string, Peer>();
#docs = new Map<string, Automerge.Doc<any>>();
#docSubscribers = new Map<string, Set<string>>(); // docId → Set<peerId>
#participantMode: boolean;
#onDocChange?: (docId: string, doc: Automerge.Doc<any>) => void;
constructor(opts: SyncServerOptions = {}) {
this.#participantMode = opts.participantMode ?? true;
this.#onDocChange = opts.onDocChange;
}
/**
* Register a new WebSocket peer.
*/
addPeer(peerId: string, ws: WebSocket, claims?: Record<string, unknown>): Peer {
const peer: Peer = {
id: peerId,
ws,
subscribedDocs: new Set(),
syncStates: new Map(),
claims,
};
this.#peers.set(peerId, peer);
return peer;
}
/**
* Remove a peer and clean up subscriptions.
*/
removePeer(peerId: string): void {
const peer = this.#peers.get(peerId);
if (!peer) return;
// Remove from all document subscriber sets
for (const docId of peer.subscribedDocs) {
const subs = this.#docSubscribers.get(docId);
if (subs) {
subs.delete(peerId);
if (subs.size === 0) {
this.#docSubscribers.delete(docId);
}
}
}
this.#peers.delete(peerId);
}
/**
* Handle an incoming message from a peer.
*/
handleMessage(peerId: string, raw: string | ArrayBuffer): void {
const peer = this.#peers.get(peerId);
if (!peer) return;
try {
const data = typeof raw === 'string' ? raw : new TextDecoder().decode(raw);
const msg: WireMessage = JSON.parse(data);
switch (msg.type) {
case 'subscribe':
this.#handleSubscribe(peer, msg as SubscribeMessage);
break;
case 'unsubscribe':
this.#handleUnsubscribe(peer, msg as UnsubscribeMessage);
break;
case 'sync':
this.#handleSync(peer, msg as SyncMessage);
break;
case 'awareness':
this.#handleAwareness(peer, msg as AwarenessMessage);
break;
case 'ping':
this.#sendToPeer(peer, { type: 'pong' });
break;
}
} catch (e) {
console.error(`[SyncServer] Error handling message from ${peerId}:`, e);
}
}
/**
* Get a server-side document (participant mode).
*/
getDoc<T>(docId: string): Automerge.Doc<T> | undefined {
return this.#docs.get(docId);
}
/**
* Set/replace a server-side document and sync to all subscribed peers.
*/
setDoc(docId: string, doc: Automerge.Doc<any>): void {
this.#docs.set(docId, doc);
this.#syncDocToAllPeers(docId);
}
/**
* Apply a change to a server-side document (e.g. for Intent/Claim validation).
*/
changeDoc<T>(docId: string, message: string, fn: (doc: T) => void): Automerge.Doc<T> | null {
let doc = this.#docs.get(docId);
if (!doc) return null;
doc = Automerge.change(doc, message, fn as any);
this.#docs.set(docId, doc);
this.#syncDocToAllPeers(docId);
if (this.#onDocChange) {
this.#onDocChange(docId, doc);
}
return doc;
}
/**
* Get list of connected peer IDs.
*/
getPeerIds(): string[] {
return Array.from(this.#peers.keys());
}
/**
* Get subscribers for a document.
*/
getDocSubscribers(docId: string): string[] {
return Array.from(this.#docSubscribers.get(docId) ?? []);
}
// ---------- Private ----------
#handleSubscribe(peer: Peer, msg: SubscribeMessage): void {
for (const docId of msg.docIds) {
peer.subscribedDocs.add(docId);
let subs = this.#docSubscribers.get(docId);
if (!subs) {
subs = new Set();
this.#docSubscribers.set(docId, subs);
}
subs.add(peer.id);
// Initialize sync state for this peer-doc pair
if (!peer.syncStates.has(docId)) {
peer.syncStates.set(docId, Automerge.initSyncState());
}
// If participant mode and we have a doc, send initial sync
if (this.#participantMode && this.#docs.has(docId)) {
this.#sendSyncToPeer(peer, docId);
}
}
}
#handleUnsubscribe(peer: Peer, msg: UnsubscribeMessage): void {
for (const docId of msg.docIds) {
peer.subscribedDocs.delete(docId);
peer.syncStates.delete(docId);
const subs = this.#docSubscribers.get(docId);
if (subs) {
subs.delete(peer.id);
if (subs.size === 0) {
this.#docSubscribers.delete(docId);
}
}
}
}
#handleSync(peer: Peer, msg: SyncMessage): void {
const { docId, data } = msg;
const syncMsg = new Uint8Array(data);
if (this.#participantMode) {
// Server participates: apply sync message to server's doc
let doc = this.#docs.get(docId);
if (!doc) {
// Create an empty doc if this is the first time we see this docId
doc = Automerge.init();
this.#docs.set(docId, doc);
}
let syncState = peer.syncStates.get(docId) ?? Automerge.initSyncState();
const [newDoc, newSyncState] = Automerge.receiveSyncMessage(doc, syncState, syncMsg);
const changed = newDoc !== doc;
this.#docs.set(docId, newDoc);
peer.syncStates.set(docId, newSyncState);
// Send response sync message back to this peer
this.#sendSyncToPeer(peer, docId);
// If doc changed, sync to other subscribers
if (changed) {
this.#syncDocToOtherPeers(docId, peer.id);
if (this.#onDocChange) {
this.#onDocChange(docId, newDoc);
}
}
} else {
// Relay mode: forward sync message to all other subscribers
const subs = this.#docSubscribers.get(docId);
if (!subs) return;
for (const subPeerId of subs) {
if (subPeerId === peer.id) continue;
const subPeer = this.#peers.get(subPeerId);
if (subPeer) {
this.#sendToPeer(subPeer, msg);
}
}
}
}
#handleAwareness(peer: Peer, msg: AwarenessMessage): void {
// Forward awareness to all other subscribers of this document
const subs = this.#docSubscribers.get(msg.docId);
if (!subs) return;
for (const subPeerId of subs) {
if (subPeerId === peer.id) continue;
const subPeer = this.#peers.get(subPeerId);
if (subPeer) {
this.#sendToPeer(subPeer, msg);
}
}
}
#sendSyncToPeer(peer: Peer, docId: string): void {
const doc = this.#docs.get(docId);
if (!doc) return;
let syncState = peer.syncStates.get(docId) ?? Automerge.initSyncState();
const [newSyncState, syncMessage] = Automerge.generateSyncMessage(doc, syncState);
peer.syncStates.set(docId, newSyncState);
if (syncMessage) {
this.#sendToPeer(peer, {
type: 'sync',
docId,
data: Array.from(syncMessage),
});
}
}
#syncDocToAllPeers(docId: string): void {
const subs = this.#docSubscribers.get(docId);
if (!subs) return;
for (const peerId of subs) {
const peer = this.#peers.get(peerId);
if (peer) {
this.#sendSyncToPeer(peer, docId);
}
}
}
#syncDocToOtherPeers(docId: string, excludePeerId: string): void {
const subs = this.#docSubscribers.get(docId);
if (!subs) return;
for (const peerId of subs) {
if (peerId === excludePeerId) continue;
const peer = this.#peers.get(peerId);
if (peer) {
this.#sendSyncToPeer(peer, docId);
}
}
}
#sendToPeer(peer: Peer, msg: object): void {
try {
if (peer.ws.readyState === WebSocket.OPEN) {
peer.ws.send(JSON.stringify(msg));
}
} catch (e) {
console.error(`[SyncServer] Failed to send to peer ${peer.id}:`, e);
}
}
}