/** * Layer 4: Sync (Client) — Multi-document WebSocket sync. * * Multiplexes sync for multiple Automerge documents over a single * WebSocket connection per space. Handles: * - Subscribe/unsubscribe to individual documents * - Automerge sync protocol per document * - Offline queue (changes made offline replayed on reconnect) * - Presence/awareness per document * * Wire protocol (JSON envelope, binary sync data as number arrays): * { type: 'sync', docId, data: number[] } — Automerge sync message * { type: 'subscribe', docIds: string[] } — Start syncing docs * { type: 'unsubscribe', docIds: string[] } — Stop syncing docs * { type: 'awareness', docId, peer, cursor? } — Presence * { type: 'ping' } / { type: 'pong' } — Keep-alive */ import * as Automerge from '@automerge/automerge'; import type { DocumentId } from './document'; import { DocumentManager } from './document'; import { EncryptedDocStore } from './storage'; // ============================================================================ // TYPES // ============================================================================ export interface SyncMessage { type: 'sync'; docId: string; data: number[]; } export interface SubscribeMessage { type: 'subscribe'; docIds: string[]; } export interface UnsubscribeMessage { type: 'unsubscribe'; docIds: string[]; } export interface AwarenessMessage { type: 'awareness'; docId: string; peer: string; cursor?: { x: number; y: number }; selection?: string; username?: string; color?: string; } export type WireMessage = | SyncMessage | SubscribeMessage | UnsubscribeMessage | AwarenessMessage | { type: 'ping' } | { type: 'pong' }; export interface DocSyncManagerOptions { documents: DocumentManager; store?: EncryptedDocStore; /** Peer ID for this client (defaults to random) */ peerId?: string; /** Auto-reconnect with exponential backoff (default: true) */ autoReconnect?: boolean; /** Max reconnect delay in ms (default: 30000) */ maxReconnectDelay?: number; } type DocChangeCallback = (doc: any) => void; type AwarenessCallback = (msg: AwarenessMessage) => void; type ConnectionCallback = () => void; // ============================================================================ // DocSyncManager (Client) // ============================================================================ export class DocSyncManager { #documents: DocumentManager; #store: EncryptedDocStore | null; #peerId: string; #ws: WebSocket | null = null; #wsUrl: string | null = null; #space: string | null = null; #autoReconnect: boolean; #maxReconnectDelay: number; #reconnectAttempts = 0; // Per-document sync state #syncStates = new Map(); #subscribedDocs = new Set(); // Event listeners #changeListeners = new Map>(); #awarenessListeners = new Map>(); #connectListeners = new Set(); #disconnectListeners = new Set(); // Save debounce #saveTimers = new Map>(); // Keep-alive #pingInterval: ReturnType | null = null; constructor(opts: DocSyncManagerOptions) { this.#documents = opts.documents; this.#store = opts.store ?? null; this.#peerId = opts.peerId ?? generatePeerId(); this.#autoReconnect = opts.autoReconnect ?? true; this.#maxReconnectDelay = opts.maxReconnectDelay ?? 30000; } get peerId(): string { return this.#peerId; } get isConnected(): boolean { return this.#ws?.readyState === WebSocket.OPEN; } /** * Connect to the sync server for a space. */ async connect(wsUrl: string, space: string): Promise { this.#wsUrl = wsUrl; this.#space = space; if (this.#ws?.readyState === WebSocket.OPEN) return; return new Promise((resolve, reject) => { this.#ws = new WebSocket(wsUrl); this.#ws.binaryType = 'arraybuffer'; this.#ws.onopen = () => { this.#reconnectAttempts = 0; // Subscribe to all currently tracked docs if (this.#subscribedDocs.size > 0) { this.#send({ type: 'subscribe', docIds: Array.from(this.#subscribedDocs), }); // Initiate sync for each document for (const docId of this.#subscribedDocs) { this.#sendSyncMessage(docId); } } // Start keep-alive this.#startPing(); for (const cb of this.#connectListeners) { try { cb(); } catch { /* ignore */ } } resolve(); }; this.#ws.onmessage = (event) => { this.#handleMessage(event.data); }; this.#ws.onclose = () => { this.#stopPing(); for (const cb of this.#disconnectListeners) { try { cb(); } catch { /* ignore */ } } this.#attemptReconnect(); }; this.#ws.onerror = () => { // onclose will fire after onerror if (this.#ws?.readyState !== WebSocket.OPEN) { reject(new Error('WebSocket connection failed')); } }; }); } /** * Subscribe to sync for one or more documents. */ async subscribe(docIds: DocumentId[]): Promise { const newIds: string[] = []; for (const id of docIds) { if (!this.#subscribedDocs.has(id)) { this.#subscribedDocs.add(id); newIds.push(id); // Initialize sync state from store if available if (this.#store && !this.#syncStates.has(id)) { const saved = await this.#store.loadSyncState(id, this.#peerId); if (saved) { this.#syncStates.set(id, Automerge.decodeSyncState(saved)); } } if (!this.#syncStates.has(id)) { this.#syncStates.set(id, Automerge.initSyncState()); } } } if (newIds.length > 0 && this.isConnected) { this.#send({ type: 'subscribe', docIds: newIds }); for (const id of newIds) { this.#sendSyncMessage(id as DocumentId); } } } /** * Unsubscribe from sync for one or more documents. */ unsubscribe(docIds: DocumentId[]): void { const removed: string[] = []; for (const id of docIds) { if (this.#subscribedDocs.delete(id)) { removed.push(id); } } if (removed.length > 0 && this.isConnected) { this.#send({ type: 'unsubscribe', docIds: removed }); } } /** * Apply a local change to a document and sync to server. */ change(docId: DocumentId, message: string, fn: (doc: T) => void): void { this.#documents.change(docId, message, fn); this.#sendSyncMessage(docId); this.#scheduleSave(docId); } /** * Send awareness/presence update for a document. */ sendAwareness(docId: DocumentId, data: Partial): void { this.#send({ type: 'awareness', docId, peer: this.#peerId, ...data, } as AwarenessMessage); } /** * Listen for changes on a specific document. */ onChange(docId: DocumentId, cb: DocChangeCallback): () => void { let set = this.#changeListeners.get(docId); if (!set) { set = new Set(); this.#changeListeners.set(docId, set); } set.add(cb); return () => { set!.delete(cb); }; } /** * Listen for awareness updates on a specific document. */ onAwareness(docId: DocumentId, cb: AwarenessCallback): () => void { let set = this.#awarenessListeners.get(docId); if (!set) { set = new Set(); this.#awarenessListeners.set(docId, set); } set.add(cb); return () => { set!.delete(cb); }; } /** * Listen for connection events. */ onConnect(cb: ConnectionCallback): () => void { this.#connectListeners.add(cb); return () => { this.#connectListeners.delete(cb); }; } /** * Listen for disconnection events. */ onDisconnect(cb: ConnectionCallback): () => void { this.#disconnectListeners.add(cb); return () => { this.#disconnectListeners.delete(cb); }; } /** * Disconnect from server. */ disconnect(): void { this.#autoReconnect = false; this.#stopPing(); if (this.#ws) { this.#ws.close(); this.#ws = null; } } /** * Flush all pending saves and sync state to IndexedDB. */ async flush(): Promise { const promises: Promise[] = []; for (const [docId, timer] of this.#saveTimers) { clearTimeout(timer); const doc = this.#documents.get(docId); if (doc && this.#store) { const binary = Automerge.save(doc); const meta = this.#documents.getMeta(docId); promises.push( this.#store.saveImmediate(docId, binary, meta ? { module: meta.module, collection: meta.collection, version: meta.version, } : undefined) ); } } this.#saveTimers.clear(); // Persist all sync states for (const [docId, state] of this.#syncStates) { if (this.#store) { const encoded = Automerge.encodeSyncState(state); promises.push(this.#store.saveSyncState(docId, this.#peerId, encoded)); } } await Promise.all(promises); } // ---------- Private ---------- #handleMessage(raw: ArrayBuffer | string): void { try { const data = typeof raw === 'string' ? raw : new TextDecoder().decode(raw); const msg: WireMessage = JSON.parse(data); switch (msg.type) { case 'sync': this.#handleSyncMessage(msg as SyncMessage); break; case 'awareness': this.#handleAwareness(msg as AwarenessMessage); break; case 'pong': // Keep-alive acknowledged break; } } catch (e) { console.error('[DocSyncManager] Failed to handle message:', e); } } #handleSyncMessage(msg: SyncMessage): void { const docId = msg.docId as DocumentId; const doc = this.#documents.get(docId); if (!doc) return; let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState(); const syncMsg = new Uint8Array(msg.data); const [newDoc, newSyncState] = Automerge.receiveSyncMessage( doc, syncState, syncMsg ); this.#documents.set(docId, newDoc); this.#syncStates.set(docId, newSyncState); this.#scheduleSave(docId); // Notify change listeners if (newDoc !== doc) { const listeners = this.#changeListeners.get(docId); if (listeners) { for (const cb of listeners) { try { cb(newDoc); } catch { /* ignore */ } } } } // Send response sync message if needed this.#sendSyncMessage(docId); } #handleAwareness(msg: AwarenessMessage): void { const docId = msg.docId as DocumentId; const listeners = this.#awarenessListeners.get(docId); if (!listeners) return; for (const cb of listeners) { try { cb(msg); } catch { /* ignore */ } } } #sendSyncMessage(docId: DocumentId): void { const doc = this.#documents.get(docId); if (!doc) return; let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState(); const [newSyncState, syncMessage] = Automerge.generateSyncMessage(doc, syncState); this.#syncStates.set(docId, newSyncState); if (syncMessage && this.isConnected) { this.#send({ type: 'sync', docId, data: Array.from(syncMessage), }); } // Persist sync state if (this.#store) { const encoded = Automerge.encodeSyncState(newSyncState); this.#store.saveSyncState(docId, this.#peerId, encoded).catch(() => {}); } } #send(msg: WireMessage): void { if (this.#ws?.readyState === WebSocket.OPEN) { this.#ws.send(JSON.stringify(msg)); } } #scheduleSave(docId: DocumentId): void { if (!this.#store) return; const existing = this.#saveTimers.get(docId); if (existing) clearTimeout(existing); this.#saveTimers.set(docId, setTimeout(() => { this.#saveTimers.delete(docId); const doc = this.#documents.get(docId); if (!doc) return; const binary = Automerge.save(doc); const meta = this.#documents.getMeta(docId); this.#store!.save(docId, binary, meta ? { module: meta.module, collection: meta.collection, version: meta.version, } : undefined); }, 2000)); } #attemptReconnect(): void { if (!this.#autoReconnect || !this.#wsUrl || !this.#space) return; this.#reconnectAttempts++; const delay = Math.min( 1000 * Math.pow(2, this.#reconnectAttempts - 1), this.#maxReconnectDelay ); setTimeout(() => { if (this.#wsUrl && this.#space) { this.connect(this.#wsUrl, this.#space).catch(() => {}); } }, delay); } #startPing(): void { this.#stopPing(); this.#pingInterval = setInterval(() => { this.#send({ type: 'ping' }); }, 30000); } #stopPing(): void { if (this.#pingInterval) { clearInterval(this.#pingInterval); this.#pingInterval = null; } } } function generatePeerId(): string { if (typeof crypto !== 'undefined' && crypto.randomUUID) { return crypto.randomUUID(); } return `peer-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; }