/** * Automerge CRDT Sync Manager * * This is the core component that implements proper CRDT sync semantics: * - Maintains the authoritative Automerge document on the server * - Tracks sync states per connected peer * - Processes incoming sync messages with proper CRDT merge * - Generates outgoing sync messages (only deltas, not full documents) * - Ensures deletions are preserved across offline/reconnect scenarios * * @see https://automerge.org/docs/cookbook/real-time/ */ import { Automerge, initializeAutomerge } from './automerge-init' import { AutomergeR2Storage, TLStoreSnapshot } from './automerge-r2-storage' interface SyncPeerState { syncState: Automerge.SyncState lastActivity: number } /** * Manages Automerge CRDT sync for a single room */ export class AutomergeSyncManager { private doc: Automerge.Doc | null = null private peerSyncStates: Map = new Map() private storage: AutomergeR2Storage private roomId: string private isInitialized: boolean = false private initPromise: Promise | null = null private pendingSave: boolean = false private saveTimeout: ReturnType | null = null // Throttle saves to avoid excessive R2 writes private readonly SAVE_DEBOUNCE_MS = 2000 constructor(r2: R2Bucket, roomId: string) { this.storage = new AutomergeR2Storage(r2) this.roomId = roomId } /** * Initialize the sync manager * Loads document from R2 or creates new one */ async initialize(): Promise { if (this.isInitialized) { return } if (this.initPromise) { return this.initPromise } this.initPromise = this._initialize() return this.initPromise } private async _initialize(): Promise { await initializeAutomerge() console.log(`๐Ÿ”ง Initializing AutomergeSyncManager for room ${this.roomId}`) // Try to load existing document from R2 let doc = await this.storage.loadDocument(this.roomId) if (!doc) { // Check if there's a legacy JSON document to migrate const legacyDoc = await this.loadLegacyJsonDocument() if (legacyDoc) { console.log(`๐Ÿ”„ Found legacy JSON document, migrating to Automerge format`) doc = await this.storage.migrateFromJson(this.roomId, legacyDoc) } } if (!doc) { // Create new empty document console.log(`๐Ÿ“ Creating new Automerge document for room ${this.roomId}`) doc = Automerge.init() doc = Automerge.change(doc, 'Initialize empty store', (d) => { d.store = {} }) } this.doc = doc this.isInitialized = true const shapeCount = this.getShapeCount() console.log(`โœ… AutomergeSyncManager initialized: ${shapeCount} shapes`) } /** * Load legacy JSON document from R2 * Used for migration from old format */ private async loadLegacyJsonDocument(): Promise { try { const key = this.storage.getLegacyJsonKey(this.roomId) const object = await (this.storage as any).r2?.get(key) if (object) { const json = await object.json() if (json?.store) { return json as TLStoreSnapshot } } return null } catch (error) { console.log(`No legacy JSON document found for room ${this.roomId}`) return null } } /** * Handle incoming binary sync message from a peer * This is the core CRDT merge operation * * @returns Response message to send back to the peer (or null if no response needed) */ async receiveSyncMessage(peerId: string, message: Uint8Array): Promise { await this.initialize() if (!this.doc) { throw new Error('Document not initialized') } // Get or create sync state for this peer let peerState = this.peerSyncStates.get(peerId) if (!peerState) { peerState = { syncState: Automerge.initSyncState(), lastActivity: Date.now() } this.peerSyncStates.set(peerId, peerState) console.log(`๐Ÿค New peer connected: ${peerId}`) } peerState.lastActivity = Date.now() const shapeCountBefore = this.getShapeCount() try { // CRITICAL: This is where CRDT merge happens! // Automerge.receiveSyncMessage properly merges changes from the peer // including deletions (tracked as operations, not absence) const [newDoc, newSyncState, _patch] = Automerge.receiveSyncMessage( this.doc, peerState.syncState, message ) this.doc = newDoc peerState.syncState = newSyncState this.peerSyncStates.set(peerId, peerState) const shapeCountAfter = this.getShapeCount() if (shapeCountBefore !== shapeCountAfter) { console.log(`๐Ÿ“Š Document changed: ${shapeCountBefore} โ†’ ${shapeCountAfter} shapes (peer: ${peerId})`) } // Schedule save to R2 (debounced) this.scheduleSave() // Generate response message (if we have changes to send back) const [nextSyncState, responseMessage] = Automerge.generateSyncMessage( this.doc, peerState.syncState ) if (responseMessage) { peerState.syncState = nextSyncState this.peerSyncStates.set(peerId, peerState) console.log(`๐Ÿ“ค Sending sync response to ${peerId}: ${responseMessage.byteLength} bytes`) return responseMessage } return null } catch (error) { console.error(`โŒ Error processing sync message from ${peerId}:`, error) // Reset sync state for this peer on error this.peerSyncStates.delete(peerId) throw error } } /** * Generate initial sync message for a newly connected peer * This sends our current document state to bring them up to date */ async generateSyncMessageForPeer(peerId: string): Promise { await this.initialize() if (!this.doc) { return null } // Get or create sync state for this peer let peerState = this.peerSyncStates.get(peerId) if (!peerState) { peerState = { syncState: Automerge.initSyncState(), lastActivity: Date.now() } this.peerSyncStates.set(peerId, peerState) } peerState.lastActivity = Date.now() // Generate sync message const [nextSyncState, message] = Automerge.generateSyncMessage( this.doc, peerState.syncState ) if (message) { peerState.syncState = nextSyncState this.peerSyncStates.set(peerId, peerState) console.log(`๐Ÿ“ค Generated initial sync message for ${peerId}: ${message.byteLength} bytes`) return message } return null } /** * Apply a local change to the document * Used when receiving JSON data from legacy clients */ async applyLocalChange( description: string, changeFn: (doc: TLStoreSnapshot) => void ): Promise { await this.initialize() if (!this.doc) { throw new Error('Document not initialized') } const shapeCountBefore = this.getShapeCount() this.doc = Automerge.change(this.doc, description, changeFn) const shapeCountAfter = this.getShapeCount() console.log(`๐Ÿ“ Applied local change: "${description}" (shapes: ${shapeCountBefore} โ†’ ${shapeCountAfter})`) this.scheduleSave() } /** * Get the current document as JSON * Used for legacy compatibility and debugging */ async getDocumentJson(): Promise { await this.initialize() if (!this.doc) { return null } // Convert Automerge document to plain JSON return JSON.parse(JSON.stringify(this.doc)) as TLStoreSnapshot } /** * Handle peer disconnection * Clean up sync state and flush any pending saves */ async handlePeerDisconnect(peerId: string): Promise { if (this.peerSyncStates.has(peerId)) { this.peerSyncStates.delete(peerId) console.log(`๐Ÿ‘‹ Peer disconnected: ${peerId}`) // If there's a pending save, flush it immediately to prevent data loss if (this.pendingSave) { console.log(`๐Ÿ’พ Flushing pending save on peer disconnect`) await this.forceSave() } } } /** * Get the number of shapes in the document */ getShapeCount(): number { if (!this.doc?.store) return 0 return Object.values(this.doc.store).filter((r: any) => r?.typeName === 'shape').length } /** * Get the number of records in the document */ getRecordCount(): number { if (!this.doc?.store) return 0 return Object.keys(this.doc.store).length } /** * Schedule a save to R2 (debounced) */ private scheduleSave(): void { this.pendingSave = true if (this.saveTimeout) { clearTimeout(this.saveTimeout) } this.saveTimeout = setTimeout(async () => { if (this.pendingSave && this.doc) { this.pendingSave = false await this.storage.saveDocument(this.roomId, this.doc) } }, this.SAVE_DEBOUNCE_MS) } /** * Force immediate save to R2 * Call this before shutting down */ async forceSave(): Promise { if (this.saveTimeout) { clearTimeout(this.saveTimeout) this.saveTimeout = null } if (this.doc) { this.pendingSave = false await this.storage.saveDocument(this.roomId, this.doc) } } /** * Broadcast changes to all connected peers except the sender * Returns map of peerId -> sync message */ async generateBroadcastMessages(excludePeerId?: string): Promise> { const messages = new Map() for (const [peerId, peerState] of this.peerSyncStates) { if (peerId === excludePeerId) continue const [nextSyncState, message] = Automerge.generateSyncMessage( this.doc!, peerState.syncState ) if (message) { peerState.syncState = nextSyncState this.peerSyncStates.set(peerId, peerState) messages.set(peerId, message) } } return messages } /** * Get list of connected peer IDs */ getConnectedPeers(): string[] { return Array.from(this.peerSyncStates.keys()) } /** * Clean up stale peer connections (inactive for > 5 minutes) */ cleanupStalePeers(): void { const STALE_THRESHOLD = 5 * 60 * 1000 // 5 minutes const now = Date.now() for (const [peerId, peerState] of this.peerSyncStates) { if (now - peerState.lastActivity > STALE_THRESHOLD) { console.log(`๐Ÿงน Cleaning up stale peer: ${peerId}`) this.peerSyncStates.delete(peerId) } } } }