From 1e68150b60070665cb50e89aad4397ffcd191212 Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Sun, 16 Nov 2025 19:45:36 -0700 Subject: [PATCH] fix: implement real-time Automerge sync across clients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add document ID coordination via server to ensure all clients sync to same document - Add new endpoints GET/POST /room/:roomId/documentId for document ID management - Store automergeDocumentId in Durable Object storage - Add enhanced logging to CloudflareAdapter send() method for debugging - Add sharePolicy to Automerge Repo to enable document sharing - Fix TypeScript errors in useAutomergeSyncRepo This fixes the issue where each client was creating its own Automerge document with a unique ID, preventing real-time sync. Now all clients in a room use the same document ID, enabling proper real-time collaboration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/automerge/CloudflareAdapter.ts | 19 +++++-- src/automerge/useAutomergeSyncRepo.ts | 77 ++++++++++++++++++++++----- worker/AutomergeDurableObject.ts | 58 +++++++++++++++++++- worker/worker.ts | 21 ++++++++ 4 files changed, 157 insertions(+), 18 deletions(-) diff --git a/src/automerge/CloudflareAdapter.ts b/src/automerge/CloudflareAdapter.ts index bfa5275..811a5e1 100644 --- a/src/automerge/CloudflareAdapter.ts +++ b/src/automerge/CloudflareAdapter.ts @@ -379,16 +379,24 @@ export class CloudflareNetworkAdapter extends NetworkAdapter { if (this.websocket && this.websocket.readyState === WebSocket.OPEN) { // Check if this is a binary sync message from Automerge Repo if (message.type === 'sync' && (message as any).data instanceof ArrayBuffer) { - console.log('🔌 CloudflareAdapter: Sending binary sync message (Automerge protocol)') + console.log('📤 CloudflareAdapter: Sending binary sync message (Automerge protocol)', { + dataLength: (message as any).data.byteLength, + documentId: (message as any).documentId, + targetId: message.targetId + }) // Send binary data directly for Automerge's native sync protocol this.websocket.send((message as any).data) } else if (message.type === 'sync' && (message as any).data instanceof Uint8Array) { - console.log('🔌 CloudflareAdapter: Sending Uint8Array sync message (Automerge protocol)') + console.log('📤 CloudflareAdapter: Sending Uint8Array sync message (Automerge protocol)', { + dataLength: (message as any).data.length, + documentId: (message as any).documentId, + targetId: message.targetId + }) // Convert Uint8Array to ArrayBuffer and send this.websocket.send((message as any).data.buffer) } else { // Handle text-based messages (backward compatibility and control messages) - console.log('Sending WebSocket message:', message.type) + console.log('📤 Sending WebSocket message:', message.type) // Debug: Log patch content if it's a patch message if (message.type === 'patch' && (message as any).patches) { console.log('🔍 Sending patches:', (message as any).patches.length, 'patches') @@ -402,6 +410,11 @@ export class CloudflareNetworkAdapter extends NetworkAdapter { } this.websocket.send(JSON.stringify(message)) } + } else { + console.warn('⚠️ CloudflareAdapter: Cannot send message - WebSocket not open', { + messageType: message.type, + readyState: this.websocket?.readyState + }) } } diff --git a/src/automerge/useAutomergeSyncRepo.ts b/src/automerge/useAutomergeSyncRepo.ts index 516a8b8..4a8c3db 100644 --- a/src/automerge/useAutomergeSyncRepo.ts +++ b/src/automerge/useAutomergeSyncRepo.ts @@ -102,9 +102,18 @@ export function useAutomergeSync(config: AutomergeSyncConfig): TLStoreWithStatus const [repo] = useState(() => { const adapter = new CloudflareNetworkAdapter(workerUrl, roomId, applyJsonSyncData) - return new Repo({ - network: [adapter] + const repo = new Repo({ + network: [adapter], + // Enable sharing of all documents with all peers + sharePolicy: async () => true }) + + // Log when sync messages are sent/received + adapter.on('message', (msg: any) => { + console.log('🔄 CloudflareAdapter received message from network:', msg.type) + }) + + return repo }) // Initialize Automerge document handle @@ -114,16 +123,56 @@ export function useAutomergeSync(config: AutomergeSyncConfig): TLStoreWithStatus const initializeHandle = async () => { try { console.log("🔌 Initializing Automerge Repo with NetworkAdapter for room:", roomId) - - if (mounted) { - // CRITICAL: Create or find the document for this room - // We use repo.create() which generates a proper Automerge document ID - // The document will be shared across clients via the WebSocket sync protocol - console.log(`🔌 Creating Automerge document for room: ${roomId}`) - // Create a new document - Automerge will generate a proper document ID - // All clients connecting to the same room will sync via the WebSocket - const handle = repo.create() + if (mounted) { + // CRITICAL FIX: Get or create a consistent document ID for this room + // All clients in the same room MUST use the same document ID for sync to work + let documentId: string | null = null + + try { + // First, try to get the document ID from the server + const response = await fetch(`${workerUrl}/room/${roomId}/documentId`) + if (response.ok) { + const data = await response.json() as { documentId: string } + documentId = data.documentId + console.log(`📥 Got existing document ID from server: ${documentId}`) + } else if (response.status === 404) { + console.log(`📝 No document ID found on server for room ${roomId}, will create new one`) + } + } catch (error) { + console.warn(`⚠️ Could not fetch document ID from server:`, error) + } + + let handle: DocHandle + + if (documentId) { + // Try to find the existing document + const foundHandle = await repo.find(documentId as any) + if (!foundHandle) { + console.log(`📝 Document ${documentId} not in local repo, creating handle`) + handle = repo.create() + } else { + console.log(`✅ Found existing document in local repo: ${documentId}`) + handle = foundHandle + } + } else { + // Create a new document and register its ID with the server + handle = repo.create() + documentId = handle.documentId + console.log(`📝 Created new document with ID: ${documentId}`) + + // Register this document ID with the server so other clients use the same one + try { + await fetch(`${workerUrl}/room/${roomId}/documentId`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ documentId }) + }) + console.log(`✅ Registered document ID with server: ${documentId}`) + } catch (error) { + console.error(`❌ Failed to register document ID with server:`, error) + } + } console.log("Found/Created Automerge handle via Repo:", { handleId: handle.documentId, @@ -133,19 +182,19 @@ export function useAutomergeSync(config: AutomergeSyncConfig): TLStoreWithStatus // Wait for the handle to be ready await handle.whenReady() - + // Initialize document with default store if it's new/empty const currentDoc = handle.doc() as any if (!currentDoc || !currentDoc.store || Object.keys(currentDoc.store).length === 0) { console.log("📝 Document is new/empty - initializing with default store") - + // Try to load initial data from server for new documents try { const response = await fetch(`${workerUrl}/room/${roomId}`) if (response.ok) { const serverDoc = await response.json() as TLStoreSnapshot const serverRecordCount = Object.keys(serverDoc.store || {}).length - + if (serverDoc.store && serverRecordCount > 0) { console.log(`📥 Loading ${serverRecordCount} records from server into new document`) handle.change((doc: any) => { diff --git a/worker/AutomergeDurableObject.ts b/worker/AutomergeDurableObject.ts index ab33126..dcc7fc0 100644 --- a/worker/AutomergeDurableObject.ts +++ b/worker/AutomergeDurableObject.ts @@ -27,6 +27,8 @@ export class AutomergeDurableObject { // Cache R2 document hash to avoid reloading when unchanged private cachedR2Hash: string | null = null private cachedR2Doc: any = null + // Store the Automerge document ID for this room + private automergeDocumentId: string | null = null constructor(private readonly ctx: DurableObjectState, env: Environment) { this.r2 = env.TLDRAW_BUCKET @@ -35,6 +37,9 @@ export class AutomergeDurableObject { this.roomId = ((await this.ctx.storage.get("roomId")) ?? null) as | string | null + this.automergeDocumentId = ((await this.ctx.storage.get("automergeDocumentId")) ?? null) as + | string + | null }) } @@ -82,7 +87,7 @@ export class AutomergeDurableObject { this.roomId = request.params.roomId }) } - + const doc = (await request.json()) as any await this.updateDocument(doc) @@ -96,6 +101,57 @@ export class AutomergeDurableObject { }, }) }) + .get("/room/:roomId/documentId", async (request) => { + // Initialize roomId if not already set + if (!this.roomId) { + await this.ctx.blockConcurrencyWhile(async () => { + await this.ctx.storage.put("roomId", request.params.roomId) + this.roomId = request.params.roomId + }) + } + + if (!this.automergeDocumentId) { + return new Response(JSON.stringify({ error: "No document ID found" }), { + status: 404, + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": request.headers.get("Origin") || "*", + }, + }) + } + + return new Response(JSON.stringify({ documentId: this.automergeDocumentId }), { + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": request.headers.get("Origin") || "*", + }, + }) + }) + .post("/room/:roomId/documentId", async (request) => { + // Initialize roomId if not already set + if (!this.roomId) { + await this.ctx.blockConcurrencyWhile(async () => { + await this.ctx.storage.put("roomId", request.params.roomId) + this.roomId = request.params.roomId + }) + } + + const { documentId } = (await request.json()) as { documentId: string } + + await this.ctx.blockConcurrencyWhile(async () => { + await this.ctx.storage.put("automergeDocumentId", documentId) + this.automergeDocumentId = documentId + }) + + console.log(`📝 Stored document ID ${documentId} for room ${this.roomId}`) + + return new Response(JSON.stringify({ success: true, documentId }), { + headers: { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": request.headers.get("Origin") || "*", + }, + }) + }) // `fetch` is the entry point for all requests to the Durable Object fetch(request: Request): Response | Promise { diff --git a/worker/worker.ts b/worker/worker.ts index 86bf6bd..0a5ed77 100644 --- a/worker/worker.ts +++ b/worker/worker.ts @@ -212,6 +212,27 @@ const router = AutoRouter({ }) }) + // Get the Automerge document ID for a room + .get("/room/:roomId/documentId", async (request, env) => { + const id = env.AUTOMERGE_DURABLE_OBJECT.idFromName(request.params.roomId) + const room = env.AUTOMERGE_DURABLE_OBJECT.get(id) + return room.fetch(request.url, { + headers: request.headers, + method: "GET", + }) + }) + + // Set the Automerge document ID for a room + .post("/room/:roomId/documentId", async (request, env) => { + const id = env.AUTOMERGE_DURABLE_OBJECT.idFromName(request.params.roomId) + const room = env.AUTOMERGE_DURABLE_OBJECT.get(id) + return room.fetch(request.url, { + method: "POST", + body: request.body, + headers: request.headers, + }) + }) + .post("/daily/rooms", async (req) => { const apiKey = req.headers.get('Authorization')?.split('Bearer ')[1]