/** * Layer 4: Sync (Server) — Multi-document WebSocket sync relay. * * Acts as a sync peer for multiple Automerge documents. Can operate in two modes: * * 1. **Relay mode** (encrypted docs): Forwards encrypted sync messages between peers. * Server can't read document content — just routes by docId. * * 2. **Participant mode** (shared/unencrypted docs): Server maintains its own copy * of each document, can read/index/validate. Used for Intent/Claim pattern. * * Wire protocol matches the client (shared/local-first/sync.ts). */ import * as Automerge from '@automerge/automerge'; // ============================================================================ // TYPES // ============================================================================ /** Minimal WebSocket interface — works with both browser WebSocket and Bun ServerWebSocket */ export interface SyncWebSocket { send(data: string): void; readyState: number; } interface SyncMessage { type: 'sync'; docId: string; data: number[]; } interface SubscribeMessage { type: 'subscribe'; docIds: string[]; } interface UnsubscribeMessage { type: 'unsubscribe'; docIds: string[]; } interface AwarenessMessage { type: 'awareness'; docId: string; peer: string; cursor?: { x: number; y: number }; selection?: string; username?: string; color?: string; } type WireMessage = | SyncMessage | SubscribeMessage | UnsubscribeMessage | AwarenessMessage | { type: 'ping' } | { type: 'pong' }; export interface Peer { id: string; ws: SyncWebSocket; subscribedDocs: Set; syncStates: Map; claims?: Record; // Auth claims } export interface SyncServerOptions { /** If true, server maintains Automerge docs and participates in sync (default: true) */ participantMode?: boolean; /** Called when a document changes (participant mode only) */ onDocChange?: (docId: string, doc: Automerge.Doc) => void; } // ============================================================================ // SyncServer // ============================================================================ export class SyncServer { #peers = new Map(); #docs = new Map>(); #docSubscribers = new Map>(); // docId → Set #participantMode: boolean; #onDocChange?: (docId: string, doc: Automerge.Doc) => void; constructor(opts: SyncServerOptions = {}) { this.#participantMode = opts.participantMode ?? true; this.#onDocChange = opts.onDocChange; } /** * Register a new WebSocket peer. */ addPeer(peerId: string, ws: SyncWebSocket, claims?: Record): Peer { const peer: Peer = { id: peerId, ws, subscribedDocs: new Set(), syncStates: new Map(), claims, }; this.#peers.set(peerId, peer); return peer; } /** * Remove a peer and clean up subscriptions. */ removePeer(peerId: string): void { const peer = this.#peers.get(peerId); if (!peer) return; // Remove from all document subscriber sets for (const docId of peer.subscribedDocs) { const subs = this.#docSubscribers.get(docId); if (subs) { subs.delete(peerId); if (subs.size === 0) { this.#docSubscribers.delete(docId); } } } this.#peers.delete(peerId); } /** * Handle an incoming message from a peer. */ handleMessage(peerId: string, raw: string | ArrayBuffer): void { const peer = this.#peers.get(peerId); if (!peer) return; try { const data = typeof raw === 'string' ? raw : new TextDecoder().decode(raw); const msg: WireMessage = JSON.parse(data); switch (msg.type) { case 'subscribe': this.#handleSubscribe(peer, msg as SubscribeMessage); break; case 'unsubscribe': this.#handleUnsubscribe(peer, msg as UnsubscribeMessage); break; case 'sync': this.#handleSync(peer, msg as SyncMessage); break; case 'awareness': this.#handleAwareness(peer, msg as AwarenessMessage); break; case 'ping': this.#sendToPeer(peer, { type: 'pong' }); break; } } catch (e) { console.error(`[SyncServer] Error handling message from ${peerId}:`, e); } } /** * Get a server-side document (participant mode). */ getDoc(docId: string): Automerge.Doc | undefined { return this.#docs.get(docId); } /** * Set/replace a server-side document and sync to all subscribed peers. */ setDoc(docId: string, doc: Automerge.Doc): void { this.#docs.set(docId, doc); this.#syncDocToAllPeers(docId); } /** * Apply a change to a server-side document (e.g. for Intent/Claim validation). */ changeDoc(docId: string, message: string, fn: (doc: T) => void): Automerge.Doc | null { let doc = this.#docs.get(docId); if (!doc) return null; doc = Automerge.change(doc, message, fn as any); this.#docs.set(docId, doc); this.#syncDocToAllPeers(docId); if (this.#onDocChange) { this.#onDocChange(docId, doc); } return doc; } /** * Get list of connected peer IDs. */ getPeerIds(): string[] { return Array.from(this.#peers.keys()); } /** * Get subscribers for a document. */ getDocSubscribers(docId: string): string[] { return Array.from(this.#docSubscribers.get(docId) ?? []); } // ---------- Private ---------- #handleSubscribe(peer: Peer, msg: SubscribeMessage): void { for (const docId of msg.docIds) { peer.subscribedDocs.add(docId); let subs = this.#docSubscribers.get(docId); if (!subs) { subs = new Set(); this.#docSubscribers.set(docId, subs); } subs.add(peer.id); // Initialize sync state for this peer-doc pair if (!peer.syncStates.has(docId)) { peer.syncStates.set(docId, Automerge.initSyncState()); } // If participant mode and we have a doc, send initial sync if (this.#participantMode && this.#docs.has(docId)) { this.#sendSyncToPeer(peer, docId); } } } #handleUnsubscribe(peer: Peer, msg: UnsubscribeMessage): void { for (const docId of msg.docIds) { peer.subscribedDocs.delete(docId); peer.syncStates.delete(docId); const subs = this.#docSubscribers.get(docId); if (subs) { subs.delete(peer.id); if (subs.size === 0) { this.#docSubscribers.delete(docId); } } } } #handleSync(peer: Peer, msg: SyncMessage): void { const { docId, data } = msg; const syncMsg = new Uint8Array(data); if (this.#participantMode) { // Server participates: apply sync message to server's doc let doc = this.#docs.get(docId); if (!doc) { // Create an empty doc if this is the first time we see this docId doc = Automerge.init(); this.#docs.set(docId, doc); } let syncState = peer.syncStates.get(docId) ?? Automerge.initSyncState(); const [newDoc, newSyncState] = Automerge.receiveSyncMessage(doc, syncState, syncMsg); const changed = newDoc !== doc; this.#docs.set(docId, newDoc); peer.syncStates.set(docId, newSyncState); // Send response sync message back to this peer this.#sendSyncToPeer(peer, docId); // If doc changed, sync to other subscribers if (changed) { this.#syncDocToOtherPeers(docId, peer.id); if (this.#onDocChange) { this.#onDocChange(docId, newDoc); } } } else { // Relay mode: forward sync message to all other subscribers const subs = this.#docSubscribers.get(docId); if (!subs) return; for (const subPeerId of subs) { if (subPeerId === peer.id) continue; const subPeer = this.#peers.get(subPeerId); if (subPeer) { this.#sendToPeer(subPeer, msg); } } } } #handleAwareness(peer: Peer, msg: AwarenessMessage): void { // Forward awareness to all other subscribers of this document const subs = this.#docSubscribers.get(msg.docId); if (!subs) return; for (const subPeerId of subs) { if (subPeerId === peer.id) continue; const subPeer = this.#peers.get(subPeerId); if (subPeer) { this.#sendToPeer(subPeer, msg); } } } #sendSyncToPeer(peer: Peer, docId: string): void { const doc = this.#docs.get(docId); if (!doc) return; let syncState = peer.syncStates.get(docId) ?? Automerge.initSyncState(); const [newSyncState, syncMessage] = Automerge.generateSyncMessage(doc, syncState); peer.syncStates.set(docId, newSyncState); if (syncMessage) { this.#sendToPeer(peer, { type: 'sync', docId, data: Array.from(syncMessage), }); } } #syncDocToAllPeers(docId: string): void { const subs = this.#docSubscribers.get(docId); if (!subs) return; for (const peerId of subs) { const peer = this.#peers.get(peerId); if (peer) { this.#sendSyncToPeer(peer, docId); } } } #syncDocToOtherPeers(docId: string, excludePeerId: string): void { const subs = this.#docSubscribers.get(docId); if (!subs) return; for (const peerId of subs) { if (peerId === excludePeerId) continue; const peer = this.#peers.get(peerId); if (peer) { this.#sendSyncToPeer(peer, docId); } } } #sendToPeer(peer: Peer, msg: object): void { try { if (peer.ws.readyState === 1) { peer.ws.send(JSON.stringify(msg)); } } catch (e) { console.error(`[SyncServer] Failed to send to peer ${peer.id}:`, e); } } }