fix: implement real-time Automerge sync across clients

- Add document ID coordination via server to ensure all clients sync to same document
- Add new endpoints GET/POST /room/:roomId/documentId for document ID management
- Store automergeDocumentId in Durable Object storage
- Add enhanced logging to CloudflareAdapter send() method for debugging
- Add sharePolicy to Automerge Repo to enable document sharing
- Fix TypeScript errors in useAutomergeSyncRepo

This fixes the issue where each client was creating its own Automerge document
with a unique ID, preventing real-time sync. Now all clients in a room use the
same document ID, enabling proper real-time collaboration.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2025-11-16 19:45:36 -07:00
parent 32e5fdb21c
commit e1f4e83383
4 changed files with 157 additions and 18 deletions

View File

@ -379,16 +379,24 @@ export class CloudflareNetworkAdapter extends NetworkAdapter {
if (this.websocket && this.websocket.readyState === WebSocket.OPEN) { if (this.websocket && this.websocket.readyState === WebSocket.OPEN) {
// Check if this is a binary sync message from Automerge Repo // Check if this is a binary sync message from Automerge Repo
if (message.type === 'sync' && (message as any).data instanceof ArrayBuffer) { if (message.type === 'sync' && (message as any).data instanceof ArrayBuffer) {
console.log('🔌 CloudflareAdapter: Sending binary sync message (Automerge protocol)') console.log('📤 CloudflareAdapter: Sending binary sync message (Automerge protocol)', {
dataLength: (message as any).data.byteLength,
documentId: (message as any).documentId,
targetId: message.targetId
})
// Send binary data directly for Automerge's native sync protocol // Send binary data directly for Automerge's native sync protocol
this.websocket.send((message as any).data) this.websocket.send((message as any).data)
} else if (message.type === 'sync' && (message as any).data instanceof Uint8Array) { } else if (message.type === 'sync' && (message as any).data instanceof Uint8Array) {
console.log('🔌 CloudflareAdapter: Sending Uint8Array sync message (Automerge protocol)') console.log('📤 CloudflareAdapter: Sending Uint8Array sync message (Automerge protocol)', {
dataLength: (message as any).data.length,
documentId: (message as any).documentId,
targetId: message.targetId
})
// Convert Uint8Array to ArrayBuffer and send // Convert Uint8Array to ArrayBuffer and send
this.websocket.send((message as any).data.buffer) this.websocket.send((message as any).data.buffer)
} else { } else {
// Handle text-based messages (backward compatibility and control messages) // Handle text-based messages (backward compatibility and control messages)
console.log('Sending WebSocket message:', message.type) console.log('📤 Sending WebSocket message:', message.type)
// Debug: Log patch content if it's a patch message // Debug: Log patch content if it's a patch message
if (message.type === 'patch' && (message as any).patches) { if (message.type === 'patch' && (message as any).patches) {
console.log('🔍 Sending patches:', (message as any).patches.length, 'patches') console.log('🔍 Sending patches:', (message as any).patches.length, 'patches')
@ -402,6 +410,11 @@ export class CloudflareNetworkAdapter extends NetworkAdapter {
} }
this.websocket.send(JSON.stringify(message)) this.websocket.send(JSON.stringify(message))
} }
} else {
console.warn('⚠️ CloudflareAdapter: Cannot send message - WebSocket not open', {
messageType: message.type,
readyState: this.websocket?.readyState
})
} }
} }

View File

@ -102,9 +102,18 @@ export function useAutomergeSync(config: AutomergeSyncConfig): TLStoreWithStatus
const [repo] = useState(() => { const [repo] = useState(() => {
const adapter = new CloudflareNetworkAdapter(workerUrl, roomId, applyJsonSyncData) const adapter = new CloudflareNetworkAdapter(workerUrl, roomId, applyJsonSyncData)
return new Repo({ const repo = new Repo({
network: [adapter] network: [adapter],
// Enable sharing of all documents with all peers
sharePolicy: async () => true
}) })
// Log when sync messages are sent/received
adapter.on('message', (msg: any) => {
console.log('🔄 CloudflareAdapter received message from network:', msg.type)
})
return repo
}) })
// Initialize Automerge document handle // Initialize Automerge document handle
@ -114,16 +123,56 @@ export function useAutomergeSync(config: AutomergeSyncConfig): TLStoreWithStatus
const initializeHandle = async () => { const initializeHandle = async () => {
try { try {
console.log("🔌 Initializing Automerge Repo with NetworkAdapter for room:", roomId) console.log("🔌 Initializing Automerge Repo with NetworkAdapter for room:", roomId)
if (mounted) {
// CRITICAL: Create or find the document for this room
// We use repo.create() which generates a proper Automerge document ID
// The document will be shared across clients via the WebSocket sync protocol
console.log(`🔌 Creating Automerge document for room: ${roomId}`)
// Create a new document - Automerge will generate a proper document ID if (mounted) {
// All clients connecting to the same room will sync via the WebSocket // CRITICAL FIX: Get or create a consistent document ID for this room
const handle = repo.create<TLStoreSnapshot>() // All clients in the same room MUST use the same document ID for sync to work
let documentId: string | null = null
try {
// First, try to get the document ID from the server
const response = await fetch(`${workerUrl}/room/${roomId}/documentId`)
if (response.ok) {
const data = await response.json() as { documentId: string }
documentId = data.documentId
console.log(`📥 Got existing document ID from server: ${documentId}`)
} else if (response.status === 404) {
console.log(`📝 No document ID found on server for room ${roomId}, will create new one`)
}
} catch (error) {
console.warn(`⚠️ Could not fetch document ID from server:`, error)
}
let handle: DocHandle<TLStoreSnapshot>
if (documentId) {
// Try to find the existing document
const foundHandle = await repo.find<TLStoreSnapshot>(documentId as any)
if (!foundHandle) {
console.log(`📝 Document ${documentId} not in local repo, creating handle`)
handle = repo.create<TLStoreSnapshot>()
} else {
console.log(`✅ Found existing document in local repo: ${documentId}`)
handle = foundHandle
}
} else {
// Create a new document and register its ID with the server
handle = repo.create<TLStoreSnapshot>()
documentId = handle.documentId
console.log(`📝 Created new document with ID: ${documentId}`)
// Register this document ID with the server so other clients use the same one
try {
await fetch(`${workerUrl}/room/${roomId}/documentId`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ documentId })
})
console.log(`✅ Registered document ID with server: ${documentId}`)
} catch (error) {
console.error(`❌ Failed to register document ID with server:`, error)
}
}
console.log("Found/Created Automerge handle via Repo:", { console.log("Found/Created Automerge handle via Repo:", {
handleId: handle.documentId, handleId: handle.documentId,
@ -133,19 +182,19 @@ export function useAutomergeSync(config: AutomergeSyncConfig): TLStoreWithStatus
// Wait for the handle to be ready // Wait for the handle to be ready
await handle.whenReady() await handle.whenReady()
// Initialize document with default store if it's new/empty // Initialize document with default store if it's new/empty
const currentDoc = handle.doc() as any const currentDoc = handle.doc() as any
if (!currentDoc || !currentDoc.store || Object.keys(currentDoc.store).length === 0) { if (!currentDoc || !currentDoc.store || Object.keys(currentDoc.store).length === 0) {
console.log("📝 Document is new/empty - initializing with default store") console.log("📝 Document is new/empty - initializing with default store")
// Try to load initial data from server for new documents // Try to load initial data from server for new documents
try { try {
const response = await fetch(`${workerUrl}/room/${roomId}`) const response = await fetch(`${workerUrl}/room/${roomId}`)
if (response.ok) { if (response.ok) {
const serverDoc = await response.json() as TLStoreSnapshot const serverDoc = await response.json() as TLStoreSnapshot
const serverRecordCount = Object.keys(serverDoc.store || {}).length const serverRecordCount = Object.keys(serverDoc.store || {}).length
if (serverDoc.store && serverRecordCount > 0) { if (serverDoc.store && serverRecordCount > 0) {
console.log(`📥 Loading ${serverRecordCount} records from server into new document`) console.log(`📥 Loading ${serverRecordCount} records from server into new document`)
handle.change((doc: any) => { handle.change((doc: any) => {

View File

@ -27,6 +27,8 @@ export class AutomergeDurableObject {
// Cache R2 document hash to avoid reloading when unchanged // Cache R2 document hash to avoid reloading when unchanged
private cachedR2Hash: string | null = null private cachedR2Hash: string | null = null
private cachedR2Doc: any = null private cachedR2Doc: any = null
// Store the Automerge document ID for this room
private automergeDocumentId: string | null = null
constructor(private readonly ctx: DurableObjectState, env: Environment) { constructor(private readonly ctx: DurableObjectState, env: Environment) {
this.r2 = env.TLDRAW_BUCKET this.r2 = env.TLDRAW_BUCKET
@ -35,6 +37,9 @@ export class AutomergeDurableObject {
this.roomId = ((await this.ctx.storage.get("roomId")) ?? null) as this.roomId = ((await this.ctx.storage.get("roomId")) ?? null) as
| string | string
| null | null
this.automergeDocumentId = ((await this.ctx.storage.get("automergeDocumentId")) ?? null) as
| string
| null
}) })
} }
@ -82,7 +87,7 @@ export class AutomergeDurableObject {
this.roomId = request.params.roomId this.roomId = request.params.roomId
}) })
} }
const doc = (await request.json()) as any const doc = (await request.json()) as any
await this.updateDocument(doc) await this.updateDocument(doc)
@ -96,6 +101,57 @@ export class AutomergeDurableObject {
}, },
}) })
}) })
.get("/room/:roomId/documentId", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
if (!this.automergeDocumentId) {
return new Response(JSON.stringify({ error: "No document ID found" }), {
status: 404,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
}
return new Response(JSON.stringify({ documentId: this.automergeDocumentId }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
})
.post("/room/:roomId/documentId", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
const { documentId } = (await request.json()) as { documentId: string }
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("automergeDocumentId", documentId)
this.automergeDocumentId = documentId
})
console.log(`📝 Stored document ID ${documentId} for room ${this.roomId}`)
return new Response(JSON.stringify({ success: true, documentId }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
})
// `fetch` is the entry point for all requests to the Durable Object // `fetch` is the entry point for all requests to the Durable Object
fetch(request: Request): Response | Promise<Response> { fetch(request: Request): Response | Promise<Response> {

View File

@ -212,6 +212,27 @@ const router = AutoRouter<IRequest, [env: Environment, ctx: ExecutionContext]>({
}) })
}) })
// Get the Automerge document ID for a room
.get("/room/:roomId/documentId", async (request, env) => {
const id = env.AUTOMERGE_DURABLE_OBJECT.idFromName(request.params.roomId)
const room = env.AUTOMERGE_DURABLE_OBJECT.get(id)
return room.fetch(request.url, {
headers: request.headers,
method: "GET",
})
})
// Set the Automerge document ID for a room
.post("/room/:roomId/documentId", async (request, env) => {
const id = env.AUTOMERGE_DURABLE_OBJECT.idFromName(request.params.roomId)
const room = env.AUTOMERGE_DURABLE_OBJECT.get(id)
return room.fetch(request.url, {
method: "POST",
body: request.body,
headers: request.headers,
})
})
.post("/daily/rooms", async (req) => { .post("/daily/rooms", async (req) => {
const apiKey = req.headers.get('Authorization')?.split('Bearer ')[1] const apiKey = req.headers.get('Authorization')?.split('Bearer ')[1]