///
import { AutoRouter, IRequest, error } from "itty-router"
import throttle from "lodash.throttle"
import { Environment } from "./types"
import { AutomergeSyncManager } from "./automerge-sync-manager"
// each whiteboard room is hosted in a DurableObject:
// https://developers.cloudflare.com/durable-objects/
// there's only ever one durable object instance per room. it keeps all the room state in memory and
// handles websocket connections. periodically, it persists the room state to the R2 bucket.
export class AutomergeDurableObject {
private r2: R2Bucket
// the room ID will be missing whilst the room is being initialized
private roomId: string | null = null
// when we load the room from the R2 bucket, we keep it here. it's a promise so we only ever
// load it once.
private roomPromise: Promise | null = null
// Store the current Automerge document state
private currentDoc: any = null
// Track connected WebSocket clients
private clients: Map = new Map()
// Track last persisted state to detect changes
private lastPersistedHash: string | null = null
// Track if document was converted from old format (for JSON sync decision)
private wasConvertedFromOldFormat: boolean = false
// Cache R2 document hash to avoid reloading when unchanged
private cachedR2Hash: string | null = null
private cachedR2Doc: any = null
// Store the Automerge document ID for this room
private automergeDocumentId: string | null = null
// CRDT Sync Manager - handles proper Automerge sync protocol
private syncManager: AutomergeSyncManager | null = null
// Flag to enable/disable CRDT sync (for gradual rollout)
// ENABLED: Automerge WASM now works with fixed import path
private useCrdtSync: boolean = true
// Maximum shape count for CRDT sync - documents larger than this use JSON sync
// to avoid CPU timeout during Automerge binary conversion
// With Automerge.from() optimization, testing higher threshold
// 7,495 shapes caused CPU timeout with init()+change(), trying 5000 with from()
private static readonly CRDT_SYNC_MAX_SHAPES = 5000
// Tombstone tracking - keeps track of deleted shape IDs to prevent resurrection
// When a shape is deleted, its ID is added here and persisted to R2
// This prevents offline clients from resurrecting deleted shapes
private deletedShapeIds: Set = new Set()
// Flag to track if tombstones have been loaded from R2
private tombstonesLoaded: boolean = false
constructor(private readonly ctx: DurableObjectState, env: Environment) {
this.r2 = env.TLDRAW_BUCKET
ctx.blockConcurrencyWhile(async () => {
this.roomId = ((await this.ctx.storage.get("roomId")) ?? null) as
| string
| null
this.automergeDocumentId = ((await this.ctx.storage.get("automergeDocumentId")) ?? null) as
| string
| null
})
}
private readonly router = AutoRouter({
catch: (e) => {
console.log(e)
return error(e)
},
})
// when we get a connection request, we stash the room id if needed and handle the connection
.get("/connect/:roomId", async (request) => {
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
return this.handleConnect(request)
})
.get("/room/:roomId", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
const doc = await this.getDocument()
return new Response(JSON.stringify(doc), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Max-Age": "86400",
},
})
})
.post("/room/:roomId", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
const doc = (await request.json()) as any
await this.updateDocument(doc)
return new Response(JSON.stringify(doc), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Max-Age": "86400",
},
})
})
.get("/room/:roomId/documentId", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
if (!this.automergeDocumentId) {
return new Response(JSON.stringify({ error: "No document ID found" }), {
status: 404,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
}
return new Response(JSON.stringify({ documentId: this.automergeDocumentId }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
})
.post("/room/:roomId/documentId", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
const { documentId } = (await request.json()) as { documentId: string }
// CRITICAL: Only set the document ID if one doesn't already exist
// This prevents race conditions where multiple clients try to set different document IDs
let actualDocumentId: string = documentId
await this.ctx.blockConcurrencyWhile(async () => {
if (!this.automergeDocumentId) {
// No document ID exists yet, use the one provided by the client
await this.ctx.storage.put("automergeDocumentId", documentId)
this.automergeDocumentId = documentId
actualDocumentId = documentId
console.log(`ð Stored NEW document ID ${documentId} for room ${this.roomId}`)
} else {
// Document ID already exists, return the existing one
actualDocumentId = this.automergeDocumentId
console.log(`â ïļ Document ID already exists for room ${this.roomId}, returning existing: ${actualDocumentId}`)
}
})
return new Response(JSON.stringify({ success: true, documentId: actualDocumentId }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
})
// =============================================================================
// Version History API
// =============================================================================
.get("/room/:roomId/history", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
// Ensure sync manager is initialized
if (!this.syncManager) {
this.syncManager = new AutomergeSyncManager(this.r2, this.roomId!)
await this.syncManager.initialize()
}
const history = await this.syncManager.getHistory()
return new Response(JSON.stringify({ history }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
})
})
.get("/room/:roomId/snapshot/:hash", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
// Ensure sync manager is initialized
if (!this.syncManager) {
this.syncManager = new AutomergeSyncManager(this.r2, this.roomId!)
await this.syncManager.initialize()
}
const hash = request.params.hash
const snapshot = await this.syncManager.getSnapshotAtHash(hash)
if (!snapshot) {
return new Response(JSON.stringify({ error: "Snapshot not found" }), {
status: 404,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
}
return new Response(JSON.stringify({ snapshot }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
})
})
.post("/room/:roomId/diff", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
// Ensure sync manager is initialized
if (!this.syncManager) {
this.syncManager = new AutomergeSyncManager(this.r2, this.roomId!)
await this.syncManager.initialize()
}
const { fromHash, toHash } = (await request.json()) as { fromHash: string | null; toHash: string | null }
const diff = await this.syncManager.getDiff(fromHash, toHash)
if (!diff) {
return new Response(JSON.stringify({ error: "Could not compute diff" }), {
status: 500,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
}
return new Response(JSON.stringify({ diff }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
})
})
.post("/room/:roomId/revert", async (request) => {
// Initialize roomId if not already set
if (!this.roomId) {
await this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", request.params.roomId)
this.roomId = request.params.roomId
})
}
// Ensure sync manager is initialized
if (!this.syncManager) {
this.syncManager = new AutomergeSyncManager(this.r2, this.roomId!)
await this.syncManager.initialize()
}
const { hash } = (await request.json()) as { hash: string }
const success = await this.syncManager.revertToHash(hash)
if (!success) {
return new Response(JSON.stringify({ error: "Could not revert to version" }), {
status: 500,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
},
})
}
// Broadcast the revert to all connected clients
const snapshot = await this.syncManager.getDocumentJson()
this.broadcastToAll({ type: "full_sync", store: snapshot?.store || {} })
return new Response(JSON.stringify({ success: true }), {
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
})
})
// `fetch` is the entry point for all requests to the Durable Object
fetch(request: Request): Response | Promise {
try {
return this.router.fetch(request)
} catch (err) {
console.error("Error in DO fetch:", err)
return new Response(
JSON.stringify({
error: "Internal Server Error",
message: (err as Error).message,
}),
{
status: 500,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS, UPGRADE",
"Access-Control-Allow-Headers":
"Content-Type, Authorization, Upgrade, Connection",
"Access-Control-Max-Age": "86400",
"Access-Control-Allow-Credentials": "true",
},
},
)
}
}
// what happens when someone tries to connect to this room?
async handleConnect(request: IRequest): Promise {
console.log(`ð AutomergeDurableObject: Received connection request for room ${this.roomId}`)
console.log(`ð AutomergeDurableObject: CRDT state: useCrdtSync=${this.useCrdtSync}, hasSyncManager=${!!this.syncManager}`)
if (!this.roomId) {
console.error(`â AutomergeDurableObject: Room not initialized`)
return new Response("Room not initialized", { status: 400 })
}
// Initialize CRDT sync manager if not already done
// First, check document size - large documents use JSON sync to avoid CPU timeout
if (this.useCrdtSync && !this.syncManager) {
// Quick check: estimate document size from legacy JSON before initializing CRDT
const docEstimate = await this.estimateDocumentSize()
if (docEstimate.shapeCount > AutomergeDurableObject.CRDT_SYNC_MAX_SHAPES) {
console.log(`â ïļ Document too large for CRDT sync (${docEstimate.shapeCount} shapes > ${AutomergeDurableObject.CRDT_SYNC_MAX_SHAPES} max), using JSON sync`)
this.useCrdtSync = false
} else {
console.log(`ð§ Initializing CRDT sync manager for room ${this.roomId} (${docEstimate.shapeCount} shapes)`)
this.syncManager = new AutomergeSyncManager(this.r2, this.roomId)
try {
await this.syncManager.initialize()
console.log(`â
CRDT sync manager initialized (${this.syncManager.getShapeCount()} shapes)`)
} catch (error) {
console.error(`â Failed to initialize CRDT sync manager:`, error)
// Disable CRDT sync on initialization failure
this.useCrdtSync = false
this.syncManager = null
}
}
}
const sessionId = request.query.sessionId as string
console.log(`ð AutomergeDurableObject: Session ID: ${sessionId}`)
if (!sessionId) {
console.error(`â AutomergeDurableObject: Missing sessionId`)
return new Response("Missing sessionId", { status: 400 })
}
// Check if this is a WebSocket upgrade request
const upgradeHeader = request.headers.get("Upgrade")
console.log(`ð AutomergeDurableObject: Upgrade header: ${upgradeHeader}`)
if (!upgradeHeader || upgradeHeader !== "websocket") {
console.error(`â AutomergeDurableObject: Expected Upgrade: websocket, got: ${upgradeHeader}`)
return new Response("Expected Upgrade: websocket", { status: 426 })
}
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
try {
console.log(`ð AutomergeDurableObject: Accepting WebSocket connection for session: ${sessionId}`)
serverWebSocket.accept()
// Store the client connection
this.clients.set(sessionId, serverWebSocket)
console.log(`ð AutomergeDurableObject: Stored client connection for session: ${sessionId}`)
// Set up message handling
serverWebSocket.addEventListener("message", async (event) => {
try {
// Handle binary messages (Automerge's native sync protocol)
if (event.data instanceof ArrayBuffer) {
console.log(`ð AutomergeDurableObject: Received binary message from ${sessionId}`)
// Pass binary data directly to other clients for Automerge sync
await this.handleBinaryMessage(sessionId, event.data)
} else if (event.data instanceof Blob) {
// Convert Blob to ArrayBuffer
const buffer = await event.data.arrayBuffer()
console.log(`ð AutomergeDurableObject: Received Blob message, converted to ArrayBuffer`)
await this.handleBinaryMessage(sessionId, buffer)
} else {
// Handle text messages (JSON for backward compatibility and control messages)
const message = JSON.parse(event.data)
console.log(`ð AutomergeDurableObject: Received message from ${sessionId}:`, message.type)
this.handleMessage(sessionId, message)
}
} catch (error) {
console.error("â AutomergeDurableObject: Error parsing WebSocket message:", error)
}
})
// Handle disconnection
serverWebSocket.addEventListener("close", (event) => {
console.log(`ð AutomergeDurableObject: Client disconnected: ${sessionId}, code: ${event.code}, reason: ${event.reason}`)
this.clients.delete(sessionId)
// Broadcast leave message to all other clients so they can remove presence
this.broadcastToOthers(sessionId, {
type: 'leave',
sessionId: sessionId
})
// Clean up sync manager state for this peer and flush pending saves
if (this.syncManager) {
this.syncManager.handlePeerDisconnect(sessionId).catch((error) => {
console.error(`â Error handling peer disconnect:`, error)
})
}
})
// Handle WebSocket errors
serverWebSocket.addEventListener("error", (error) => {
console.error(`â AutomergeDurableObject: WebSocket error for session ${sessionId}:`, error)
})
// Add a small delay to ensure the WebSocket is properly established
setTimeout(() => {
if (serverWebSocket.readyState === WebSocket.OPEN) {
console.log(`ð AutomergeDurableObject: WebSocket connection confirmed open for session ${sessionId}`)
} else {
console.error(`â AutomergeDurableObject: WebSocket connection failed for session ${sessionId}, state: ${serverWebSocket.readyState}`)
}
}, 100)
// Send a simple test message first to ensure WebSocket is working
console.log(`ð AutomergeDurableObject: Sending test message to client: ${sessionId}`)
try {
// Send a simple test message first
serverWebSocket.send(JSON.stringify({
type: "test",
message: "WebSocket connection established",
timestamp: Date.now()
}))
console.log(`ð AutomergeDurableObject: Test message sent to client: ${sessionId}`)
// CRITICAL: No JSON sync - all data flows through Automerge sync protocol
// Old format content is converted to Automerge format server-side during getDocument()
// and saved back to R2, then Automerge sync loads it normally
console.log(`ð AutomergeDurableObject: Document ready for Automerge sync (was converted: ${this.wasConvertedFromOldFormat})`)
const doc = await this.getDocument()
const shapeCount = doc.store ? Object.values(doc.store).filter((record: any) => record.typeName === 'shape').length : 0
console.log(`ð AutomergeDurableObject: Document loaded:`, {
hasStore: !!doc.store,
storeKeys: doc.store ? Object.keys(doc.store).length : 0,
shapes: shapeCount,
wasConvertedFromOldFormat: this.wasConvertedFromOldFormat
})
// CRITICAL: Send initial sync message to client to bring them up to date
// This kicks off the Automerge sync protocol
if (this.useCrdtSync && this.syncManager) {
try {
const initialSyncMessage = await this.syncManager.generateSyncMessageForPeer(sessionId)
if (initialSyncMessage) {
serverWebSocket.send(initialSyncMessage)
console.log(`ðĪ Sent initial CRDT sync message to ${sessionId}: ${initialSyncMessage.byteLength} bytes`)
} else {
console.log(`âđïļ No initial sync message needed for ${sessionId} (client may be up to date)`)
}
} catch (syncError) {
console.error(`â Error sending initial sync message to ${sessionId}:`, syncError)
}
}
} catch (error) {
console.error(`â AutomergeDurableObject: Error sending document to client ${sessionId}:`, error)
console.error(`â AutomergeDurableObject: Error stack:`, error instanceof Error ? error.stack : 'No stack trace')
// Don't close the WebSocket on document send errors, just log them
}
console.log(`ð AutomergeDurableObject: Returning WebSocket response for session ${sessionId}`)
return new Response(null, {
status: 101,
webSocket: clientWebSocket,
headers: {
"Access-Control-Allow-Origin": request.headers.get("Origin") || "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS, UPGRADE",
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Credentials": "true",
Upgrade: "websocket",
Connection: "Upgrade",
},
})
} catch (error) {
console.error("â AutomergeDurableObject: WebSocket connection error:", error)
if (error instanceof Error) {
console.error("â AutomergeDurableObject: Error stack:", error.stack)
console.error("â AutomergeDurableObject: Error details:", {
message: error.message,
name: error.name
})
}
// Only close the WebSocket if it's still open
if (serverWebSocket.readyState === WebSocket.OPEN) {
console.log("â AutomergeDurableObject: Closing WebSocket due to error")
serverWebSocket.close(1011, "Failed to initialize connection")
}
return new Response("Failed to establish WebSocket connection", {
status: 500,
})
}
}
private async handleMessage(sessionId: string, message: any) {
console.log(`Handling message from ${sessionId}:`, message.type)
switch (message.type) {
case "ping":
// Handle keep-alive ping
const client = this.clients.get(sessionId)
if (client && client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
type: "pong",
timestamp: Date.now()
}))
}
break
case "pong":
// Handle keep-alive pong (just acknowledge)
console.log(`Received pong from ${sessionId}`)
break
case "sync":
// Handle Automerge sync message
if (message.data) {
// This is a sync message with data - broadcast to other clients
// CRITICAL: Don't require documentId - JSON sync messages might not have it
// but they still need to be broadcast for real-time collaboration
await this.handleSyncMessage(sessionId, message)
} else {
// This is a sync request (no data) - send current document state
const doc = await this.getDocument()
const client = this.clients.get(sessionId)
if (client) {
// Use consistent document ID format: automerge:${roomId}
// This matches what the client uses when calling repo.find()
const documentId = message.documentId || `automerge:${this.roomId}`
// Send the document as a sync message
client.send(JSON.stringify({
type: "sync",
senderId: "server",
targetId: sessionId,
documentId: documentId,
data: doc
}))
}
}
break
case "request":
// Handle document request
const doc = await this.getDocument()
const requestClient = this.clients.get(sessionId)
if (requestClient) {
// Use consistent document ID format: automerge:${roomId}
// This matches what the client uses when calling repo.find()
const documentId = message.documentId || `automerge:${this.roomId}`
requestClient.send(JSON.stringify({
type: "sync",
senderId: "server",
targetId: sessionId,
documentId: documentId,
data: doc
}))
}
break
case "request-document-state":
// Handle document state request from worker (for persistence)
await this.handleDocumentStateRequest(sessionId)
break
case "presence":
// Handle presence updates (cursors, selections)
// Broadcast to all other clients but don't persist
console.log(`ð Received presence update from ${sessionId}, user: ${message.userId}`)
// Add senderId so clients can filter out echoes
const presenceMessage = {
...message,
senderId: sessionId
}
this.broadcastToOthers(sessionId, presenceMessage)
break
case "leave":
// Handle explicit leave message (client is about to disconnect)
// Broadcast to all other clients so they can remove presence
console.log(`ð Received leave message from ${sessionId}`)
this.broadcastToOthers(sessionId, {
type: 'leave',
sessionId: message.sessionId || sessionId
})
break
default:
console.log("Unknown message type:", message.type)
}
}
private async handleBinaryMessage(sessionId: string, data: ArrayBuffer) {
// Handle incoming binary Automerge sync data from client
console.log(`ð Worker: Handling binary sync message from ${sessionId}, size: ${data.byteLength} bytes`)
// Check if CRDT sync is enabled
if (this.useCrdtSync && this.syncManager) {
try {
// CRITICAL: Use proper CRDT sync protocol
// This ensures deletions and concurrent edits are merged correctly
const uint8Data = new Uint8Array(data)
const response = await this.syncManager.receiveSyncMessage(sessionId, uint8Data)
// Send response back to the client (if any)
if (response) {
const client = this.clients.get(sessionId)
if (client && client.readyState === WebSocket.OPEN) {
client.send(response)
console.log(`ðĪ Sent sync response to ${sessionId}: ${response.byteLength} bytes`)
}
}
// Broadcast changes to other connected clients
const broadcastMessages = await this.syncManager.generateBroadcastMessages(sessionId)
for (const [peerId, message] of broadcastMessages) {
const client = this.clients.get(peerId)
if (client && client.readyState === WebSocket.OPEN) {
client.send(message)
console.log(`ðĪ Broadcast sync to ${peerId}: ${message.byteLength} bytes`)
}
}
// CRITICAL: Keep currentDoc in sync with the CRDT document
// This ensures HTTP endpoints and other code paths see the latest state
const crdtDoc = await this.syncManager.getDocumentJson()
if (crdtDoc) {
this.currentDoc = crdtDoc
// Clear R2 cache since document has been updated via CRDT
this.cachedR2Doc = null
this.cachedR2Hash = null
}
console.log(`â
CRDT sync processed for ${sessionId} (${this.syncManager.getShapeCount()} shapes)`)
} catch (error) {
console.error(`â CRDT sync error for ${sessionId}:`, error)
// Fall back to relay mode on error
this.broadcastBinaryToOthers(sessionId, data)
}
} else {
// Legacy mode: Broadcast binary data directly to other clients
// This is the old behavior that doesn't handle CRDT properly
console.log(`â ïļ Using legacy relay mode (CRDT sync disabled)`)
this.broadcastBinaryToOthers(sessionId, data)
}
}
private async handleSyncMessage(sessionId: string, message: any) {
// Handle incoming Automerge sync data from client (JSON format for backward compatibility)
console.log(`ð Worker: Handling sync message from ${sessionId}:`, {
hasData: !!message.data,
dataType: typeof message.data,
isArrayBuffer: message.data instanceof ArrayBuffer,
documentId: message.documentId
})
// For Automerge's native protocol, we need to handle binary data
if (message.data instanceof ArrayBuffer) {
console.log(`ð Worker: Processing binary Automerge sync data`)
// Handle binary data
await this.handleBinaryMessage(sessionId, message.data)
} else {
// Handle JSON sync data (backward compatibility)
console.log(`ð Worker: Processing JSON sync data`)
this.broadcastToOthers(sessionId, message)
}
}
private broadcastBinaryToOthers(senderId: string, data: ArrayBuffer) {
// Broadcast binary Automerge sync data to all other clients
let broadcastCount = 0
for (const [sessionId, client] of this.clients) {
if (sessionId !== senderId && client.readyState === WebSocket.OPEN) {
try {
console.log(`ð Worker: Broadcasting binary sync data (${data.byteLength} bytes) to ${sessionId}`)
client.send(data)
broadcastCount++
} catch (error) {
console.error(`â Worker: Error broadcasting binary data to ${sessionId}:`, error)
}
}
}
if (broadcastCount > 0) {
console.log(`ð Worker: Broadcast binary sync data to ${broadcastCount} client(s)`)
}
}
private broadcastToOthers(senderId: string, message: any) {
// Broadcast JSON messages (backward compatibility and control messages)
let broadcastCount = 0
for (const [sessionId, client] of this.clients) {
if (sessionId !== senderId && client.readyState === WebSocket.OPEN) {
try {
if (message.data instanceof ArrayBuffer) {
// Send binary data for Automerge protocol
console.log(`ð Worker: Broadcasting binary sync data to ${sessionId}`)
client.send(message.data)
} else {
// Send JSON data for backward compatibility
console.log(`ð Worker: Broadcasting JSON sync data to ${sessionId}`)
client.send(JSON.stringify(message))
}
broadcastCount++
} catch (error) {
console.error(`â Worker: Error broadcasting to ${sessionId}:`, error)
}
}
}
if (broadcastCount > 0) {
console.log(`ð Worker: Broadcast message to ${broadcastCount} client(s)`)
}
}
private broadcastToAll(message: any) {
// Broadcast JSON messages to ALL connected clients (for system events like revert)
let broadcastCount = 0
for (const [sessionId, client] of this.clients) {
if (client.readyState === WebSocket.OPEN) {
try {
console.log(`ð Worker: Broadcasting to ${sessionId}`)
client.send(JSON.stringify(message))
broadcastCount++
} catch (error) {
console.error(`â Worker: Error broadcasting to ${sessionId}:`, error)
}
}
}
console.log(`ð Worker: Broadcast message to ALL ${broadcastCount} client(s)`)
}
// Generate a fast hash of the document state for change detection
// OPTIMIZED: Instead of JSON.stringify on entire document (expensive for large docs),
// we hash based on record IDs, types, and metadata only
private generateDocHash(doc: any): string {
const storeData = doc.store || {}
const storeKeys = Object.keys(storeData).sort()
// Fast hash: combine record count + sorted key fingerprint + metadata
let hash = storeKeys.length // Start with record count
// Hash the record IDs and key metadata (much faster than stringifying full records)
for (let i = 0; i < storeKeys.length; i++) {
const key = storeKeys[i]
const record = storeData[key]
// Hash the record ID
for (let j = 0; j < key.length; j++) {
const char = key.charCodeAt(j)
hash = ((hash << 5) - hash) + char
hash = hash & hash // Convert to 32-bit integer
}
// Include record type and metadata for better change detection
if (record) {
// Hash typeName if available
if (record.typeName) {
for (let j = 0; j < record.typeName.length; j++) {
hash = ((hash << 5) - hash) + record.typeName.charCodeAt(j)
hash = hash & hash
}
}
// Hash key properties for better collision resistance
// Use index, x/y for shapes, parentId for common records
if (record.index !== undefined) {
hash = ((hash << 5) - hash) + (typeof record.index === 'string' ? record.index.length : record.index)
hash = hash & hash
}
if (record.x !== undefined && record.y !== undefined) {
hash = ((hash << 5) - hash) + Math.floor(record.x + record.y)
hash = hash & hash
}
// CRITICAL: Include text content in hash for Markdown and similar shapes
// This ensures text changes are detected for R2 persistence
if (record.props?.text !== undefined && typeof record.props.text === 'string') {
hash = ((hash << 5) - hash) + record.props.text.length
hash = hash & hash
// Include first 50 chars for better collision resistance
const textSample = record.props.text.substring(0, 50)
for (let j = 0; j < textSample.length; j++) {
hash = ((hash << 5) - hash) + textSample.charCodeAt(j)
hash = hash & hash
}
}
// Also include content for ObsNote shapes
if (record.props?.content !== undefined && typeof record.props.content === 'string') {
hash = ((hash << 5) - hash) + record.props.content.length
hash = hash & hash
const contentSample = record.props.content.substring(0, 50)
for (let j = 0; j < contentSample.length; j++) {
hash = ((hash << 5) - hash) + contentSample.charCodeAt(j)
hash = hash & hash
}
}
}
}
const hashString = hash.toString()
console.log(`Server generated hash (optimized):`, {
hash: hashString,
storeKeys: storeKeys.length,
sampleKeys: storeKeys.slice(0, 3)
})
return hashString
}
private async applyPatch(patch: any) {
// For now, we'll store patches and apply them to the document
// In a full implementation, you'd want to use Automerge's patch application
console.log("Applying patch:", patch)
this.schedulePersistToR2()
}
/**
* Quick estimate of document size without full CRDT conversion
* Used to decide whether to use CRDT sync or fall back to JSON sync
*/
private async estimateDocumentSize(): Promise<{ shapeCount: number; recordCount: number }> {
if (!this.roomId) {
return { shapeCount: 0, recordCount: 0 }
}
try {
// Try legacy JSON first (faster to check)
const legacyObject = await this.r2.get(`rooms/${this.roomId}`)
if (legacyObject) {
const text = await legacyObject.text()
const doc = JSON.parse(text)
// Handle different document formats
let store: Record = {}
if (doc.store) {
store = doc.store
} else if (Array.isArray(doc) && doc[0]?.type === 'store') {
// Array format with store in first element
store = doc[0]?.value || {}
}
const recordCount = Object.keys(store).length
const shapeCount = Object.values(store).filter((r: any) => r?.typeName === 'shape').length
console.log(`ð Document size estimate: ${shapeCount} shapes, ${recordCount} records`)
return { shapeCount, recordCount }
}
// If no legacy JSON, check automerge metadata without loading full binary
const metadataObject = await this.r2.get(`rooms/${this.roomId}/metadata.json`)
if (metadataObject) {
const text = await metadataObject.text()
const metadata = JSON.parse(text)
const shapeCount = parseInt(metadata.shapeCount || '0', 10)
const recordCount = parseInt(metadata.recordCount || '0', 10)
console.log(`ð Document size from metadata: ${shapeCount} shapes, ${recordCount} records`)
return { shapeCount, recordCount }
}
} catch (error) {
console.error(`â Error estimating document size:`, error)
}
return { shapeCount: 0, recordCount: 0 }
}
async getDocument() {
if (!this.roomId) throw new Error("Missing roomId")
// CRDT MODE: If sync manager is active, return its document
// This ensures HTTP endpoints return the authoritative CRDT state
if (this.useCrdtSync && this.syncManager) {
try {
const crdtDoc = await this.syncManager.getDocumentJson()
if (crdtDoc && crdtDoc.store && Object.keys(crdtDoc.store).length > 0) {
const shapeCount = Object.values(crdtDoc.store).filter((r: any) => r?.typeName === 'shape').length
console.log(`ðĨ getDocument: Returning CRDT document (${shapeCount} shapes)`)
// Keep currentDoc in sync with CRDT state
this.currentDoc = crdtDoc
return crdtDoc
}
console.log(`â ïļ getDocument: CRDT document is empty, falling back to R2`)
} catch (error) {
console.error(`â getDocument: Error getting CRDT document:`, error)
}
}
// FALLBACK: Load from R2 JSON if CRDT is not available
// If R2 load is in progress or completed, wait for it and return the result
if (this.roomPromise) {
const doc = await this.roomPromise
// After R2 load, merge any client updates that happened during load
if (this.currentDoc && this.currentDoc !== doc) {
// Merge client updates into R2-loaded document
if (doc.store && this.currentDoc.store) {
Object.entries(this.currentDoc.store).forEach(([id, record]) => {
doc.store[id] = record
})
}
this.currentDoc = doc
}
return this.currentDoc || doc
}
// Otherwise, start loading from R2 (only once)
if (!this.roomPromise) {
this.roomPromise = (async () => {
let initialDoc: any
let wasConverted = false
try {
// fetch the document from R2
const docFromBucket = await this.r2.get(`rooms/${this.roomId}`)
if (docFromBucket) {
try {
const rawDoc = await docFromBucket.json()
const r2ShapeCount = (rawDoc as any).store ?
Object.values((rawDoc as any).store).filter((r: any) => r?.typeName === 'shape').length :
(Array.isArray(rawDoc) ? rawDoc.filter((r: any) => r?.state?.typeName === 'shape').length : 0)
console.log(`Loaded raw document from R2 for room ${this.roomId}:`, {
isArray: Array.isArray(rawDoc),
length: Array.isArray(rawDoc) ? rawDoc.length : 'not array',
hasStore: !!(rawDoc as any).store,
hasDocuments: !!(rawDoc as any).documents,
shapeCount: r2ShapeCount,
storeKeys: (rawDoc as any).store ? Object.keys((rawDoc as any).store).length : 0,
sampleKeys: Array.isArray(rawDoc) ? rawDoc.slice(0, 3).map((r: any) => r.state?.id) : []
})
// Convert Automerge document format to TLStoreSnapshot format
if (Array.isArray(rawDoc)) {
// This is the raw Automerge document format - convert to store format
console.log(`Converting Automerge document format to store format for room ${this.roomId}`)
initialDoc = this.convertAutomergeToStore(rawDoc)
wasConverted = true
const customRecords = Object.values(initialDoc.store).filter((r: any) =>
r.id && typeof r.id === 'string' && r.id.startsWith('obsidian_vault:')
)
console.log(`Conversion completed:`, {
storeKeys: Object.keys(initialDoc.store).length,
shapeCount: Object.values(initialDoc.store).filter((r: any) => r.typeName === 'shape').length,
customRecordCount: customRecords.length,
customRecordIds: customRecords.map((r: any) => r.id).slice(0, 5)
})
} else if ((rawDoc as any).store) {
// This is already in store format
initialDoc = rawDoc
const customRecords = Object.values(initialDoc.store).filter((r: any) =>
r.id && typeof r.id === 'string' && r.id.startsWith('obsidian_vault:')
)
console.log(`Document already in store format:`, {
storeKeys: Object.keys(initialDoc.store).length,
shapeCount: Object.values(initialDoc.store).filter((r: any) => r.typeName === 'shape').length,
customRecordCount: customRecords.length,
customRecordIds: customRecords.map((r: any) => r.id).slice(0, 5)
})
} else if ((rawDoc as any).documents && !((rawDoc as any).store)) {
// Migrate old format (documents array) to new format (store object)
console.log(`Migrating old documents format to new store format for room ${this.roomId}`)
initialDoc = this.migrateDocumentsToStore(rawDoc)
wasConverted = true
const customRecords = Object.values(initialDoc.store).filter((r: any) =>
r.id && typeof r.id === 'string' && r.id.startsWith('obsidian_vault:')
)
console.log(`Migration completed:`, {
storeKeys: Object.keys(initialDoc.store).length,
shapeCount: Object.values(initialDoc.store).filter((r: any) => r.typeName === 'shape').length,
customRecordCount: customRecords.length,
customRecordIds: customRecords.map((r: any) => r.id).slice(0, 5)
})
} else {
console.log(`Unknown document format, creating new document`)
initialDoc = this.createEmptyDocument()
}
// Migrate shapes to ensure they have required properties
if (initialDoc.store) {
console.log(`ð Server-side: Starting shape migration for room ${this.roomId}`)
initialDoc = this.migrateShapeProperties(initialDoc)
console.log(`â
Server-side: Shape migration completed for room ${this.roomId}`)
}
} catch (jsonError) {
console.error(`Error parsing JSON from R2 for room ${this.roomId}:`, jsonError)
// If JSON parsing fails, create a new document
initialDoc = this.createEmptyDocument()
}
} else {
console.log(`No document found in R2 for room ${this.roomId}, creating new one`)
initialDoc = this.createEmptyDocument()
}
} catch (r2Error) {
console.error(`Error loading from R2 for room ${this.roomId}:`, r2Error)
// If R2 loading fails, create a new document
initialDoc = this.createEmptyDocument()
}
this.currentDoc = initialDoc
// Store conversion flag for JSON sync decision
this.wasConvertedFromOldFormat = wasConverted
// Load tombstones to prevent resurrection of deleted shapes
await this.loadTombstones()
console.log(`ðŠĶ Tombstone state after load: ${this.deletedShapeIds.size} tombstones for room ${this.roomId}`)
// Initialize the last persisted hash with the loaded document
this.lastPersistedHash = this.generateDocHash(initialDoc)
// If document was converted/migrated, persist it immediately to save in new format
if (wasConverted && initialDoc.store && Object.keys(initialDoc.store).length > 0) {
const shapeCount = Object.values(initialDoc.store).filter((r: any) => r.typeName === 'shape').length
console.log(`ðĶ Persisting converted document to R2 in new format for room ${this.roomId} (${shapeCount} shapes)`)
// Persist immediately without throttling for converted documents
try {
const docJson = JSON.stringify(initialDoc)
await this.r2.put(`rooms/${this.roomId}`, docJson, {
httpMetadata: {
contentType: 'application/json'
}
})
this.lastPersistedHash = this.generateDocHash(initialDoc)
console.log(`â
Successfully persisted converted document for room ${this.roomId} with ${shapeCount} shapes`)
} catch (persistError) {
console.error(`â Error persisting converted document for room ${this.roomId}:`, persistError)
}
}
return initialDoc
})()
}
return this.roomPromise
}
/**
* Assign sequential indices to shapes to preserve layer order during format conversion.
* Uses tldraw's fractional indexing format: a1, a2, a3, etc.
* Shapes are sorted by their original array index to maintain the order they were stored in.
*/
private assignSequentialIndices(store: any, shapesNeedingIndex: { id: string, originalIndex: number }[]): void {
if (shapesNeedingIndex.length === 0) return
// Sort shapes by their original array index to preserve layer order
shapesNeedingIndex.sort((a, b) => a.originalIndex - b.originalIndex)
// Check if shapes already have valid indices we should preserve
// Valid tldraw fractional index: starts with a lowercase letter followed by alphanumeric characters
// Examples: a1, a2, b1, c10, a1V, a1Lz, etc. (the letter increments as indices grow)
const isValidIndex = (idx: any): boolean => {
if (!idx || typeof idx !== 'string' || idx.length === 0) return false
// Valid fractional index format: lowercase letter followed by alphanumeric (a1, b1, c10, a1V, etc.)
if (/^[a-z][a-zA-Z0-9]+$/.test(idx)) return true
// Also allow 'Z' prefix for very high indices
if (/^Z[a-z]/i.test(idx)) return true
return false
}
// Count how many shapes have valid indices
let validIndexCount = 0
let invalidIndexCount = 0
const existingIndices: string[] = []
for (const { id } of shapesNeedingIndex) {
const shape = store[id]
if (shape && isValidIndex(shape.index)) {
validIndexCount++
existingIndices.push(shape.index)
} else {
invalidIndexCount++
}
}
console.log(`ð Index assignment check: ${validIndexCount} valid, ${invalidIndexCount} invalid out of ${shapesNeedingIndex.length} shapes`)
// If all shapes have valid indices, preserve them
if (invalidIndexCount === 0) {
console.log(`â
All shapes have valid indices, preserving existing layer order`)
return
}
// If some have valid indices and some don't, we need to be careful
// Assign new indices only to shapes that need them, fitting them into the existing sequence
if (validIndexCount > 0 && invalidIndexCount > 0) {
console.log(`â ïļ Mixed valid/invalid indices detected. Assigning new indices to ${invalidIndexCount} shapes while preserving ${validIndexCount} valid indices.`)
// For simplicity, if we have a mix, reassign all shapes to ensure proper ordering
// This is safer than trying to interleave new indices between existing ones
console.log(`ð Reassigning all shape indices to ensure consistent layer order`)
}
// Assign sequential indices: a1, a2, a3, etc.
// Using simple integer increments provides clear layer ordering
let indexCounter = 1
const assignedIndices: string[] = []
for (const { id, originalIndex } of shapesNeedingIndex) {
const shape = store[id]
if (!shape) continue
const newIndex = `a${indexCounter}`
const oldIndex = shape.index
if (oldIndex !== newIndex) {
shape.index = newIndex
assignedIndices.push(`${id}: ${oldIndex || 'undefined'} -> ${newIndex}`)
}
indexCounter++
}
if (assignedIndices.length > 0) {
console.log(`ðĒ Assigned sequential indices to ${assignedIndices.length} shapes:`)
// Log first 10 assignments for debugging
assignedIndices.slice(0, 10).forEach(msg => console.log(` ${msg}`))
if (assignedIndices.length > 10) {
console.log(` ... and ${assignedIndices.length - 10} more`)
}
}
}
private convertAutomergeToStore(automergeDoc: any[]): any {
const store: any = {}
const conversionStats = {
total: automergeDoc.length,
converted: 0,
skipped: 0,
errors: 0,
errorDetails: [] as string[],
customRecords: [] as string[] // Track custom record IDs (obsidian_vault, etc.)
}
// Track shapes that need index assignment for layer order preservation
const shapesNeedingIndex: { id: string, originalIndex: number }[] = []
// Convert each Automerge record to store format
automergeDoc.forEach((record: any, arrayIndex: number) => {
try {
// Validate record structure
if (!record) {
conversionStats.skipped++
conversionStats.errorDetails.push(`Record at index ${arrayIndex} is null or undefined`)
return
}
if (!record.state) {
conversionStats.skipped++
conversionStats.errorDetails.push(`Record at index ${arrayIndex} missing state property`)
return
}
if (!record.state.id) {
conversionStats.skipped++
conversionStats.errorDetails.push(`Record at index ${arrayIndex} missing state.id`)
return
}
// Validate ID is a string
if (typeof record.state.id !== 'string') {
conversionStats.skipped++
conversionStats.errorDetails.push(`Record at index ${arrayIndex} has invalid state.id type: ${typeof record.state.id}`)
return
}
// Track custom records (obsidian_vault, etc.)
if (record.state.id.startsWith('obsidian_vault:')) {
conversionStats.customRecords.push(record.state.id)
}
// Extract the state and use it as the store record
store[record.state.id] = record.state
// Track shapes that need index assignment (preserve array order for layer order)
if (record.state.typeName === 'shape') {
shapesNeedingIndex.push({ id: record.state.id, originalIndex: arrayIndex })
}
conversionStats.converted++
} catch (error) {
conversionStats.errors++
const errorMsg = `Error converting record at index ${arrayIndex}: ${error instanceof Error ? error.message : String(error)}`
conversionStats.errorDetails.push(errorMsg)
console.error(`â Conversion error:`, errorMsg)
}
})
// CRITICAL: Assign sequential indices to shapes to preserve layer order
// Shapes earlier in the array should have lower indices (rendered first/behind)
// Use fractional indexing format: a1, a2, a3, etc.
this.assignSequentialIndices(store, shapesNeedingIndex)
console.log(`ð Automerge to Store conversion statistics:`, {
total: conversionStats.total,
converted: conversionStats.converted,
skipped: conversionStats.skipped,
errors: conversionStats.errors,
storeKeys: Object.keys(store).length,
customRecordCount: conversionStats.customRecords.length,
customRecordIds: conversionStats.customRecords.slice(0, 10),
errorCount: conversionStats.errorDetails.length
})
// Verify custom records are preserved
if (conversionStats.customRecords.length > 0) {
console.log(`â
Verified ${conversionStats.customRecords.length} custom records preserved during conversion`)
}
if (conversionStats.errorDetails.length > 0 && conversionStats.errorDetails.length <= 10) {
console.warn(`â ïļ Conversion warnings (showing first 10):`, conversionStats.errorDetails.slice(0, 10))
} else if (conversionStats.errorDetails.length > 10) {
console.warn(`â ïļ Conversion warnings (${conversionStats.errorDetails.length} total, showing first 10):`, conversionStats.errorDetails.slice(0, 10))
}
return {
store,
schema: {
version: 1,
recordVersions: {}
}
}
}
private createEmptyDocument() {
return {
store: {
"document:document": {
gridSize: 10,
name: "",
meta: {},
id: "document:document",
typeName: "document",
},
"pointer:pointer": {
id: "pointer:pointer",
typeName: "pointer",
x: 0,
y: 0,
lastActivityTimestamp: 0,
meta: {},
},
"page:page": {
meta: {},
id: "page:page",
name: "Page 1",
index: "a1",
typeName: "page",
},
"camera:page:page": {
x: 0,
y: 0,
z: 1,
meta: {},
id: "camera:page:page",
typeName: "camera",
},
"instance_page_state:page:page": {
editingShapeId: null,
croppingShapeId: null,
selectedShapeIds: [],
hoveredShapeId: null,
erasingShapeIds: [],
hintingShapeIds: [],
focusedGroupId: null,
meta: {},
id: "instance_page_state:page:page",
pageId: "page:page",
typeName: "instance_page_state",
},
"instance:instance": {
followingUserId: null,
opacityForNextShape: 1,
stylesForNextShape: {},
brush: { x: 0, y: 0, w: 0, h: 0 },
zoomBrush: { x: 0, y: 0, w: 0, h: 0 },
scribbles: [],
cursor: {
type: "default",
rotation: 0,
},
isFocusMode: false,
exportBackground: true,
isDebugMode: false,
isToolLocked: false,
screenBounds: {
x: 0,
y: 0,
w: 720,
h: 400,
},
isGridMode: false,
isPenMode: false,
chatMessage: "",
isChatting: false,
highlightedUserIds: [],
isFocused: true,
devicePixelRatio: 2,
insets: [false, false, false, false],
isCoarsePointer: false,
isHoveringCanvas: false,
openMenus: [],
isChangingStyle: false,
isReadonly: false,
meta: {},
id: "instance:instance",
currentPageId: "page:page",
typeName: "instance",
},
},
schema: {
schemaVersion: 2,
sequences: {
"com.tldraw.store": 4,
"com.tldraw.asset": 1,
"com.tldraw.camera": 1,
"com.tldraw.document": 2,
"com.tldraw.instance": 25,
"com.tldraw.instance_page_state": 5,
"com.tldraw.page": 1,
"com.tldraw.instance_presence": 5,
"com.tldraw.pointer": 1,
"com.tldraw.shape": 4,
"com.tldraw.asset.bookmark": 2,
"com.tldraw.asset.image": 4,
"com.tldraw.asset.video": 4,
"com.tldraw.shape.group": 0,
"com.tldraw.shape.text": 2,
"com.tldraw.shape.bookmark": 2,
"com.tldraw.shape.draw": 2,
"com.tldraw.shape.geo": 9,
"com.tldraw.shape.note": 7,
"com.tldraw.shape.line": 5,
"com.tldraw.shape.frame": 0,
"com.tldraw.shape.arrow": 5,
"com.tldraw.shape.highlight": 1,
"com.tldraw.shape.embed": 4,
"com.tldraw.shape.image": 3,
"com.tldraw.shape.video": 2,
"com.tldraw.shape.container": 0,
"com.tldraw.shape.element": 0,
"com.tldraw.binding.arrow": 0,
"com.tldraw.binding.layout": 0,
"obsidian_vault": 1
}
}
}
}
private async updateDocument(newDoc: any) {
// CRITICAL: Wait for R2 load to complete before processing updates
// This ensures we have all shapes from R2 before merging client updates
if (this.roomPromise) {
try {
await this.roomPromise
} catch (e) {
// R2 load might have failed, continue anyway
console.warn(`â ïļ R2 load failed, continuing with client update:`, e)
}
}
// TOMBSTONE HANDLING: Load tombstones if not yet loaded
if (!this.tombstonesLoaded) {
await this.loadTombstones()
}
// Filter out tombstoned shapes from incoming document to prevent resurrection
let processedNewStore = newDoc?.store || {}
if (newDoc?.store && this.deletedShapeIds.size > 0) {
const { filteredStore, removedCount } = this.filterTombstonedShapes(newDoc.store)
if (removedCount > 0) {
console.log(`ðŠĶ Filtered ${removedCount} tombstoned shapes from incoming update (preventing resurrection)`)
processedNewStore = filteredStore
}
}
const oldShapeCount = this.currentDoc?.store ? Object.values(this.currentDoc.store).filter((r: any) => r?.typeName === 'shape').length : 0
const newShapeCount = processedNewStore ? Object.values(processedNewStore).filter((r: any) => r?.typeName === 'shape').length : 0
// Get list of old shape IDs to check if we're losing any
const oldShapeIds = this.currentDoc?.store ?
Object.values(this.currentDoc.store)
.filter((r: any) => r?.typeName === 'shape')
.map((r: any) => r.id) : []
const newShapeIds = processedNewStore ?
Object.values(processedNewStore)
.filter((r: any) => r?.typeName === 'shape')
.map((r: any) => r.id) : []
// TOMBSTONE HANDLING: Detect deletions from current doc
// Shapes in current doc that aren't in the incoming doc are being deleted
let newDeletions = 0
if (this.currentDoc?.store && processedNewStore) {
newDeletions = this.detectDeletions(this.currentDoc.store, processedNewStore)
if (newDeletions > 0) {
console.log(`ðŠĶ Detected ${newDeletions} new shape deletions, saving tombstones`)
// Save tombstones immediately to persist deletion tracking
await this.saveTombstones()
}
}
// CRITICAL: Replace the entire store with the processed client document
// The client's document is authoritative and includes deletions
// Tombstoned shapes have already been filtered out to prevent resurrection
// Clear R2 cache since document has been updated
this.cachedR2Doc = null
this.cachedR2Hash = null
if (this.currentDoc && processedNewStore) {
// Count records before update
const recordsBefore = Object.keys(this.currentDoc.store || {}).length
// Replace the entire store with the processed client's version
this.currentDoc.store = { ...processedNewStore }
// Count records after update
const recordsAfter = Object.keys(this.currentDoc.store).length
// Update schema if provided
if (newDoc.schema) {
this.currentDoc.schema = newDoc.schema
}
console.log(`ð updateDocument: Replaced store with client document: ${recordsBefore} -> ${recordsAfter} records (client sent ${Object.keys(newDoc.store || {}).length}, after tombstone filter: ${Object.keys(processedNewStore).length})`)
} else {
// If no current doc yet, set it (R2 load should have completed by now)
// Use processed store which has tombstoned shapes filtered out
console.log(`ð updateDocument: No current doc, setting to new doc (${newShapeCount} shapes after tombstone filter)`)
this.currentDoc = { ...newDoc, store: processedNewStore }
}
const finalShapeCount = this.currentDoc?.store ? Object.values(this.currentDoc.store).filter((r: any) => r?.typeName === 'shape').length : 0
const finalShapeIds = this.currentDoc?.store ?
Object.values(this.currentDoc.store)
.filter((r: any) => r?.typeName === 'shape')
.map((r: any) => r.id) : []
// Check for lost shapes (excluding intentional deletions tracked as tombstones)
const lostShapes = oldShapeIds.filter(id => !finalShapeIds.includes(id) && !this.deletedShapeIds.has(id))
if (lostShapes.length > 0) {
console.error(`â CRITICAL: Lost ${lostShapes.length} shapes during merge (not tracked as deletions)! Lost IDs:`, lostShapes)
}
// Log intentional deletions separately (for debugging)
const intentionallyDeleted = oldShapeIds.filter(id => !finalShapeIds.includes(id) && this.deletedShapeIds.has(id))
if (intentionallyDeleted.length > 0) {
console.log(`ðŠĶ ${intentionallyDeleted.length} shapes intentionally deleted (tracked as tombstones)`)
}
if (finalShapeCount !== oldShapeCount) {
console.log(`ð Document updated: shape count changed from ${oldShapeCount} to ${finalShapeCount} (merged from client with ${newShapeCount} shapes)`)
// CRITICAL: Always persist when shape count changes
console.log(`ðĪ Triggering R2 persistence due to shape count change`)
this.schedulePersistToR2()
} else if (newShapeCount < oldShapeCount) {
console.log(`â ïļ Client sent ${newShapeCount} shapes but server has ${oldShapeCount}. Merged to preserve all shapes (final: ${finalShapeCount})`)
// Persist to ensure we save the merged state
console.log(`ðĪ Triggering R2 persistence to save merged state`)
this.schedulePersistToR2()
} else if (newShapeCount === oldShapeCount && oldShapeCount > 0) {
// OPTIMIZED: Fast comparison without expensive JSON.stringify
// Check if any records were actually added/updated using lightweight comparison
let recordsChanged = false
const newStore = newDoc.store || {}
const currentStore = this.currentDoc?.store || {}
// Quick check: compare record counts and IDs first
const newKeys = Object.keys(newStore)
const currentKeys = Object.keys(currentStore)
if (newKeys.length !== currentKeys.length) {
recordsChanged = true
} else {
// Check for new or removed records
for (const id of newKeys) {
if (!currentStore[id]) {
recordsChanged = true
break
}
}
if (!recordsChanged) {
for (const id of currentKeys) {
if (!newStore[id]) {
recordsChanged = true
break
}
}
}
// Only do deep comparison if structure matches (avoid expensive JSON.stringify)
if (!recordsChanged) {
// Lightweight comparison: check if record types or key properties changed
for (const id of newKeys) {
const newRecord = newStore[id]
const currentRecord = currentStore[id]
if (!currentRecord) continue
// Quick checks: typeName, type, x, y (most common changes)
if (newRecord.typeName !== currentRecord.typeName ||
newRecord.type !== currentRecord.type ||
(newRecord.x !== currentRecord.x) ||
(newRecord.y !== currentRecord.y)) {
recordsChanged = true
break
}
}
}
}
if (recordsChanged) {
console.log(`âđïļ Client sent ${newShapeCount} shapes, server had ${oldShapeCount}. Records were updated. Merge complete (final: ${finalShapeCount})`)
// Persist if records were updated
console.log(`ðĪ Triggering R2 persistence due to record updates`)
this.schedulePersistToR2()
} else {
console.log(`âđïļ Client sent ${newShapeCount} shapes, server had ${oldShapeCount}. No changes detected, skipping persistence.`)
}
} else {
// New shapes or other changes - always persist
console.log(`ð Document updated: scheduling persistence (old: ${oldShapeCount}, new: ${newShapeCount}, final: ${finalShapeCount})`)
console.log(`ðĪ Triggering R2 persistence for new shapes/changes`)
this.schedulePersistToR2()
}
}
// Migrate old documents format to new store format
private migrateDocumentsToStore(oldDoc: any): any {
const newDoc = {
store: {},
schema: oldDoc.schema || this.createEmptyDocument().schema
}
const migrationStats = {
total: 0,
converted: 0,
skipped: 0,
errors: 0,
errorDetails: [] as string[],
recordTypes: {} as Record,
customRecords: [] as string[] // Track custom record IDs (obsidian_vault, etc.)
}
// Track shapes for layer order preservation
const shapesNeedingIndex: { id: string, originalIndex: number }[] = []
// Convert documents array to store object
if (oldDoc.documents && Array.isArray(oldDoc.documents)) {
migrationStats.total = oldDoc.documents.length
oldDoc.documents.forEach((doc: any, arrayIndex: number) => {
try {
// Validate document structure
if (!doc) {
migrationStats.skipped++
migrationStats.errorDetails.push(`Document at index ${arrayIndex} is null or undefined`)
return
}
if (!doc.state) {
migrationStats.skipped++
migrationStats.errorDetails.push(`Document at index ${arrayIndex} missing state property`)
return
}
if (!doc.state.id) {
migrationStats.skipped++
migrationStats.errorDetails.push(`Document at index ${arrayIndex} missing state.id`)
return
}
if (!doc.state.typeName) {
migrationStats.skipped++
migrationStats.errorDetails.push(`Document at index ${arrayIndex} missing state.typeName (id: ${doc.state.id})`)
return
}
// Validate ID is a string
if (typeof doc.state.id !== 'string') {
migrationStats.skipped++
migrationStats.errorDetails.push(`Document at index ${arrayIndex} has invalid state.id type: ${typeof doc.state.id}`)
return
}
// Track record types
const typeName = doc.state.typeName
migrationStats.recordTypes[typeName] = (migrationStats.recordTypes[typeName] || 0) + 1
// Track custom records (obsidian_vault, etc.)
if (doc.state.id.startsWith('obsidian_vault:')) {
migrationStats.customRecords.push(doc.state.id)
}
// Extract the state and use it as the store record
(newDoc.store as any)[doc.state.id] = doc.state
// Track shapes for layer order preservation
if (doc.state.typeName === 'shape') {
shapesNeedingIndex.push({ id: doc.state.id, originalIndex: arrayIndex })
}
migrationStats.converted++
} catch (error) {
migrationStats.errors++
const errorMsg = `Error migrating document at index ${arrayIndex}: ${error instanceof Error ? error.message : String(error)}`
migrationStats.errorDetails.push(errorMsg)
console.error(`â Migration error:`, errorMsg)
}
})
} else {
console.warn(`â ïļ migrateDocumentsToStore: oldDoc.documents is not an array or doesn't exist`)
}
// CRITICAL: Assign sequential indices to shapes to preserve layer order
this.assignSequentialIndices(newDoc.store, shapesNeedingIndex)
// Count shapes after migration
const shapeCount = Object.values(newDoc.store).filter((r: any) => r?.typeName === 'shape').length
console.log(`ð Documents to Store migration statistics:`, {
total: migrationStats.total,
converted: migrationStats.converted,
skipped: migrationStats.skipped,
errors: migrationStats.errors,
storeKeys: Object.keys(newDoc.store).length,
recordTypes: migrationStats.recordTypes,
shapeCount: shapeCount,
customRecordCount: migrationStats.customRecords.length,
customRecordIds: migrationStats.customRecords.slice(0, 10),
errorCount: migrationStats.errorDetails.length
})
// CRITICAL: Log if shapes are missing after migration
if (shapeCount === 0 && migrationStats.recordTypes['shape'] === undefined) {
console.warn(`â ïļ Migration completed but NO shapes found! This might indicate old format didn't have shapes or they were filtered out.`)
} else if (migrationStats.recordTypes['shape'] && shapeCount !== migrationStats.recordTypes['shape']) {
console.warn(`â ïļ Shape count mismatch: Expected ${migrationStats.recordTypes['shape']} shapes but found ${shapeCount} after migration`)
} else if (shapeCount > 0) {
console.log(`â
Migration successfully converted ${shapeCount} shapes from old format to new format`)
}
// Verify custom records are preserved
if (migrationStats.customRecords.length > 0) {
console.log(`â
Verified ${migrationStats.customRecords.length} custom records preserved during migration`)
}
if (migrationStats.errorDetails.length > 0 && migrationStats.errorDetails.length <= 10) {
console.warn(`â ïļ Migration warnings (showing first 10):`, migrationStats.errorDetails.slice(0, 10))
} else if (migrationStats.errorDetails.length > 10) {
console.warn(`â ïļ Migration warnings (${migrationStats.errorDetails.length} total, showing first 10):`, migrationStats.errorDetails.slice(0, 10))
}
return newDoc
}
// Migrate shape properties to ensure they have required fields
private migrateShapeProperties(doc: any): any {
if (!doc.store) return doc
const migrationStats = {
total: 0,
migrated: 0,
skipped: 0,
errors: 0,
errorDetails: [] as string[],
shapeTypes: {} as Record,
customShapes: [] as string[] // Track custom shape IDs
}
const store = { ...doc.store }
// Fix all shape records to ensure they have required properties
Object.keys(store).forEach(key => {
const record = store[key]
if (record && record.typeName === 'shape') {
migrationStats.total++
// Track shape types
const shapeType = record.type || 'unknown'
migrationStats.shapeTypes[shapeType] = (migrationStats.shapeTypes[shapeType] || 0) + 1
// Track custom shapes (non-standard TLDraw shapes)
const customShapeTypes = ['ObsNote', 'Holon', 'FathomMeetingsBrowser', 'FathomNote', 'HolonBrowser', 'ObsidianBrowser', 'ImageGen', 'VideoGen', 'Multmux']
if (customShapeTypes.includes(shapeType)) {
migrationStats.customShapes.push(record.id)
}
try {
const originalRecord = { ...record }
let needsUpdate = false
// Ensure isLocked property exists and is a boolean
if (record.isLocked === undefined || typeof record.isLocked !== 'boolean') {
record.isLocked = false
needsUpdate = true
}
// Ensure other required shape properties exist
// CRITICAL: Preserve original coordinates - only reset if truly missing or invalid
// Log when coordinates are being reset to help debug frame children coordinate issues
const originalX = record.x
const originalY = record.y
if (record.x === undefined || record.x === null || typeof record.x !== 'number' || isNaN(record.x)) {
console.log(`ð§ Server: Resetting X coordinate for shape ${record.id} (type: ${record.type}, parentId: ${record.parentId}). Original value:`, originalX)
record.x = 0
needsUpdate = true
}
if (record.y === undefined || record.y === null || typeof record.y !== 'number' || isNaN(record.y)) {
console.log(`ð§ Server: Resetting Y coordinate for shape ${record.id} (type: ${record.type}, parentId: ${record.parentId}). Original value:`, originalY)
record.y = 0
needsUpdate = true
}
if (record.rotation === undefined) {
record.rotation = 0
needsUpdate = true
}
if (record.opacity === undefined) {
record.opacity = 1
needsUpdate = true
}
if (!record.meta || typeof record.meta !== 'object') {
record.meta = {}
needsUpdate = true
}
// NOTE: Index assignment is now handled by assignSequentialIndices() during format conversion
// We only need to ensure index exists, not validate the format here
// This preserves layer order that was established during conversion
if (!record.index || typeof record.index !== 'string') {
// Only assign a default if truly missing - the conversion functions should have handled this
console.log(`â ïļ Server: Shape ${record.id} missing index after conversion, assigning fallback`)
record.index = 'a1'
needsUpdate = true
}
// Special handling for geo shapes - ensure w and h are in props, not top level
const isGeoShape = record.type === 'geo' ||
(record.typeName === 'shape' && 'w' in record && 'h' in record)
if (isGeoShape) {
// If we don't have a type but have w/h, assume it's a geo shape
if (!record.type) {
record.type = 'geo'
console.log(`Setting type to 'geo' for shape with w/h properties:`, {
id: record.id,
w: record.w,
h: record.h
})
}
// CRITICAL: Move w and h from top level to props to prevent validation errors
if ('w' in record) {
console.log(`ð§ Server: Moving w from top level to props for geo shape ${record.id}`)
if (!record.props) record.props = {}
if (record.props.w === undefined) {
record.props.w = record.w
}
delete record.w
needsUpdate = true
}
if ('h' in record) {
console.log(`ð§ Server: Moving h from top level to props for geo shape ${record.id}`)
if (!record.props) record.props = {}
if (record.props.h === undefined) {
record.props.h = record.h
}
delete record.h
needsUpdate = true
}
// Ensure props property exists with defaults
if (!record.props || typeof record.props !== 'object') {
record.props = {
w: 100,
h: 100,
geo: 'rectangle',
dash: 'draw',
growY: 0,
url: '',
scale: 1,
color: 'black',
labelColor: 'black',
fill: 'none',
size: 'm',
font: 'draw',
align: 'middle',
verticalAlign: 'middle'
}
needsUpdate = true
}
// Handle richText property for geo shapes - only fix structure, don't add if missing
if ('richText' in record.props) {
if (record.props.richText === undefined || record.props.richText === null) {
console.log(`ð§ Worker: Adding missing richText property for geo shape:`, record.id)
record.props.richText = { content: [], type: 'doc' }
needsUpdate = true
} else if (Array.isArray(record.props.richText)) {
console.log(`ð§ Worker: Converting richText array to object for geo shape:`, record.id)
record.props.richText = {
content: record.props.richText,
type: 'doc'
}
needsUpdate = true
} else if (typeof record.props.richText !== 'object' || !record.props.richText.content) {
console.log(`ð§ Worker: Fixing invalid richText structure for geo shape:`, record.id)
record.props.richText = { content: [], type: 'doc' }
needsUpdate = true
}
}
// Don't add richText if it doesn't exist - let TLDraw handle it naturally
// Only move w and h from top level to props if they're not already in props
// This preserves the original data structure
if ('w' in record && typeof record.w === 'number' && record.props.w === undefined) {
record.props.w = record.w
needsUpdate = true
}
if ('h' in record && typeof record.h === 'number' && record.props.h === undefined) {
record.props.h = record.h
needsUpdate = true
}
}
// Special handling for text shapes - ensure required properties exist
if (record.type === 'text') {
if (!record.props || typeof record.props !== 'object') {
record.props = {}
needsUpdate = true
}
// CRITICAL: color is REQUIRED for text shapes and must be a valid color value
const validColors = ['black', 'grey', 'light-violet', 'violet', 'blue', 'light-blue', 'yellow', 'orange', 'green', 'light-green', 'light-red', 'red', 'white']
if (!record.props.color || typeof record.props.color !== 'string' || !validColors.includes(record.props.color)) {
record.props.color = 'black'
needsUpdate = true
}
// Ensure other required text shape properties have defaults
if (typeof record.props.w !== 'number') {
record.props.w = 300
needsUpdate = true
}
if (!record.props.size || typeof record.props.size !== 'string') {
record.props.size = 'm'
needsUpdate = true
}
if (!record.props.font || typeof record.props.font !== 'string') {
record.props.font = 'draw'
needsUpdate = true
}
if (!record.props.textAlign || typeof record.props.textAlign !== 'string') {
record.props.textAlign = 'start'
needsUpdate = true
}
if (typeof record.props.autoSize !== 'boolean') {
record.props.autoSize = false
needsUpdate = true
}
if (typeof record.props.scale !== 'number') {
record.props.scale = 1
needsUpdate = true
}
// Ensure richText structure is correct
if (record.props.richText) {
if (Array.isArray(record.props.richText)) {
record.props.richText = { content: record.props.richText, type: 'doc' }
needsUpdate = true
} else if (typeof record.props.richText === 'object' && record.props.richText !== null) {
if (!record.props.richText.type) {
record.props.richText = { ...record.props.richText, type: 'doc' }
needsUpdate = true
}
if (!record.props.richText.content) {
record.props.richText = { ...record.props.richText, content: [] }
needsUpdate = true
}
}
}
// Remove invalid properties for text shapes (these cause validation errors)
// Remove properties that are only valid for custom shapes, not standard TLDraw text shapes
const invalidTextProps = ['h', 'geo', 'text', 'isEditing', 'editingContent', 'isTranscribing', 'isPaused', 'fixedHeight', 'pinnedToView', 'isModified', 'originalContent', 'editingName', 'editingDescription', 'isConnected', 'holonId', 'noteId', 'title', 'content', 'tags', 'showPreview', 'backgroundColor', 'textColor']
invalidTextProps.forEach(prop => {
if (prop in record.props) {
delete record.props[prop]
needsUpdate = true
}
})
}
// Special handling for Multmux shapes - ensure all required props exist
// Old shapes may have wsUrl (removed) or undefined values
// CRITICAL: Every prop must be explicitly defined - undefined values cause ValidationError
if (record.type === 'Multmux') {
if (!record.props || typeof record.props !== 'object') {
record.props = {}
needsUpdate = true
}
// Remove deprecated wsUrl prop
if ('wsUrl' in record.props) {
delete record.props.wsUrl
needsUpdate = true
}
// CRITICAL: Create clean props with all required values - no undefined allowed
const w = (typeof record.props.w === 'number' && !isNaN(record.props.w)) ? record.props.w : 800
const h = (typeof record.props.h === 'number' && !isNaN(record.props.h)) ? record.props.h : 600
const sessionId = (typeof record.props.sessionId === 'string') ? record.props.sessionId : ''
const sessionName = (typeof record.props.sessionName === 'string') ? record.props.sessionName : ''
const token = (typeof record.props.token === 'string') ? record.props.token : ''
const serverUrl = (typeof record.props.serverUrl === 'string') ? record.props.serverUrl : 'http://localhost:3000'
const pinnedToView = (record.props.pinnedToView === true) ? true : false
// Filter out any undefined or non-string elements from tags array
let tags: string[] = ['terminal', 'multmux']
if (Array.isArray(record.props.tags)) {
const filteredTags = record.props.tags.filter((t: any) => typeof t === 'string' && t !== '')
if (filteredTags.length > 0) {
tags = filteredTags
}
}
// Check if any prop needs updating
if (record.props.w !== w || record.props.h !== h ||
record.props.sessionId !== sessionId || record.props.sessionName !== sessionName ||
record.props.token !== token || record.props.serverUrl !== serverUrl ||
record.props.pinnedToView !== pinnedToView ||
JSON.stringify(record.props.tags) !== JSON.stringify(tags)) {
record.props.w = w
record.props.h = h
record.props.sessionId = sessionId
record.props.sessionName = sessionName
record.props.token = token
record.props.serverUrl = serverUrl
record.props.pinnedToView = pinnedToView
record.props.tags = tags
needsUpdate = true
}
}
if (needsUpdate) {
migrationStats.migrated++
// Only log detailed migration info for first few shapes to avoid spam
if (migrationStats.migrated <= 5) {
console.log(`Migrating shape ${record.id}:`, {
id: record.id,
type: record.type,
originalIsLocked: originalRecord.isLocked,
newIsLocked: record.isLocked,
hadX: 'x' in originalRecord,
hadY: 'y' in originalRecord,
hadRotation: 'rotation' in originalRecord,
hadOpacity: 'opacity' in originalRecord,
hadMeta: 'meta' in originalRecord,
hadW: 'w' in originalRecord,
hadH: 'h' in originalRecord,
propsW: record.props?.w,
propsH: record.props?.h,
// Verify custom shape props are preserved
hasCustomProps: customShapeTypes.includes(shapeType) ? Object.keys(record.props || {}).length : undefined
})
}
} else {
// Count non-migrated shapes
migrationStats.skipped++
}
} catch (error) {
migrationStats.errors++
const errorMsg = `Error migrating shape ${record.id}: ${error instanceof Error ? error.message : String(error)}`
migrationStats.errorDetails.push(errorMsg)
console.error(`â Shape migration error:`, errorMsg)
}
}
})
console.log(`ð Shape migration statistics:`, {
total: migrationStats.total,
migrated: migrationStats.migrated,
skipped: migrationStats.skipped,
errors: migrationStats.errors,
shapeTypes: migrationStats.shapeTypes,
customShapesCount: migrationStats.customShapes.length,
customShapeIds: migrationStats.customShapes.slice(0, 10), // Show first 10
errorCount: migrationStats.errorDetails.length
})
if (migrationStats.errorDetails.length > 0 && migrationStats.errorDetails.length <= 10) {
console.warn(`â ïļ Shape migration warnings (showing first 10):`, migrationStats.errorDetails.slice(0, 10))
} else if (migrationStats.errorDetails.length > 10) {
console.warn(`â ïļ Shape migration warnings (${migrationStats.errorDetails.length} total, showing first 10):`, migrationStats.errorDetails.slice(0, 10))
}
// Verify custom shapes are preserved
if (migrationStats.customShapes.length > 0) {
console.log(`â
Verified ${migrationStats.customShapes.length} custom shapes preserved during migration`)
}
return {
...doc,
store
}
}
// we throttle persistence so it only happens every 2 seconds, batching all updates
schedulePersistToR2 = throttle(async () => {
console.log(`ðĪ schedulePersistToR2 called for room ${this.roomId}`)
// CRDT MODE: Sync manager handles all persistence to automerge.bin
// Skip JSON persistence when CRDT is active to avoid dual storage
if (this.useCrdtSync && this.syncManager) {
console.log(`ðĪ CRDT mode active - sync manager handles persistence to automerge.bin`)
// Force sync manager to save immediately
try {
await this.syncManager.forceSave()
console.log(`â
CRDT document saved via sync manager`)
} catch (error) {
console.error(`â Error saving CRDT document:`, error)
}
return
}
if (!this.roomId || !this.currentDoc) {
console.log(`â ïļ Cannot persist to R2: roomId=${this.roomId}, currentDoc=${!!this.currentDoc}`)
return
}
// CRITICAL: Load current R2 state and merge with this.currentDoc before saving
// This ensures we never overwrite old shapes that might be in R2 but not in currentDoc
let mergedDoc = { ...this.currentDoc }
let r2ShapeCount = 0
let mergedShapeCount = 0
try {
// OPTIMIZATION: Only reload R2 if we don't have a cached version or if it might have changed
// Since currentDoc is authoritative (includes deletions), we can skip R2 merge in most cases
// Only merge if we suspect there might be data in R2 that's not in currentDoc
let r2Doc: any = null
// Check if we need to reload R2 (only if cache is invalid or missing)
if (!this.cachedR2Doc) {
const docFromBucket = await this.r2.get(`rooms/${this.roomId}`)
if (docFromBucket) {
try {
r2Doc = await docFromBucket.json() as any
// Cache the R2 document
this.cachedR2Doc = r2Doc
this.cachedR2Hash = this.generateDocHash(r2Doc)
} catch (r2ParseError) {
console.warn(`â ïļ Error parsing R2 document, using current document:`, r2ParseError)
r2Doc = null
}
}
} else {
// Use cached R2 document
r2Doc = this.cachedR2Doc
}
if (r2Doc) {
r2ShapeCount = r2Doc.store ?
Object.values(r2Doc.store).filter((r: any) => r?.typeName === 'shape').length : 0
// CRITICAL: Use currentDoc as the source of truth (has the latest state including deletions)
// Don't merge in old records from R2 - currentDoc is authoritative
mergedDoc = { ...this.currentDoc }
mergedDoc.store = { ...this.currentDoc.store }
// Update schema from currentDoc if it exists
if (this.currentDoc.schema) {
mergedDoc.schema = this.currentDoc.schema
}
mergedShapeCount = Object.values(mergedDoc.store).filter((r: any) => r?.typeName === 'shape').length
// Only log merge details if there's a significant difference
if (Math.abs(mergedShapeCount - r2ShapeCount) > 0) {
const mergedShapeTypeCounts = Object.values(mergedDoc.store)
.filter((r: any) => r?.typeName === 'shape')
.reduce((acc: any, r: any) => {
const type = r?.type || 'unknown'
acc[type] = (acc[type] || 0) + 1
return acc
}, {})
console.log(`ð Merging R2 state with current state before persistence:`, {
r2Shapes: r2ShapeCount,
currentShapes: this.currentDoc.store ? Object.values(this.currentDoc.store).filter((r: any) => r?.typeName === 'shape').length : 0,
mergedShapes: mergedShapeCount,
r2Records: Object.keys(r2Doc.store || {}).length,
currentRecords: Object.keys(this.currentDoc.store || {}).length,
mergedRecords: Object.keys(mergedDoc.store || {}).length
})
console.log(`ð Merged shape type breakdown:`, mergedShapeTypeCounts)
// Log merge results
if (mergedShapeCount < r2ShapeCount) {
// This is expected when shapes are deleted - currentDoc has fewer shapes than R2
console.log(`â
Merged document has ${r2ShapeCount - mergedShapeCount} fewer shapes than R2 (deletions preserved)`)
} else if (mergedShapeCount > r2ShapeCount) {
console.log(`â
Merged document has ${mergedShapeCount - r2ShapeCount} new shapes added to R2's ${r2ShapeCount} shapes`)
}
}
} else {
// No R2 document exists yet - use currentDoc
mergedDoc = this.currentDoc
mergedShapeCount = this.currentDoc.store ? Object.values(this.currentDoc.store).filter((r: any) => r?.typeName === 'shape').length : 0
console.log(`âđïļ No existing R2 document, using current document (${mergedShapeCount} shapes)`)
}
} catch (r2LoadError) {
// If R2 load fails, use currentDoc (better than losing data)
console.warn(`â ïļ Error loading from R2, using current document:`, r2LoadError)
mergedDoc = this.currentDoc
mergedShapeCount = this.currentDoc.store ? Object.values(this.currentDoc.store).filter((r: any) => r?.typeName === 'shape').length : 0
// Clear cache on error
this.cachedR2Doc = null
this.cachedR2Hash = null
}
// Generate hash of merged document state
const currentHash = this.generateDocHash(mergedDoc)
console.log(`ð Server checking R2 persistence for room ${this.roomId}:`, {
currentHash: currentHash.substring(0, 8) + '...',
lastHash: this.lastPersistedHash ? this.lastPersistedHash.substring(0, 8) + '...' : 'none',
hasStore: !!mergedDoc.store,
storeKeys: mergedDoc.store ? Object.keys(mergedDoc.store).length : 0,
shapeCount: mergedShapeCount,
hashesMatch: currentHash === this.lastPersistedHash
})
// Skip persistence if document hasn't changed
if (currentHash === this.lastPersistedHash) {
console.log(`âïļ Skipping R2 persistence for room ${this.roomId} - no changes detected (hash matches)`)
return
}
console.log(`ðū Attempting to persist room ${this.roomId} to R2...`)
try {
// Update currentDoc to the merged version
this.currentDoc = mergedDoc
// OPTIMIZED: Serialize efficiently - R2 handles large payloads well, but we can optimize
// For very large documents, consider compression or chunking in the future
const docJson = JSON.stringify(mergedDoc)
const docSize = docJson.length
console.log(`ðū Uploading to R2: ${docSize} bytes, ${mergedShapeCount} shapes`)
const putResult = await this.r2.put(`rooms/${this.roomId}`, docJson, {
httpMetadata: {
contentType: 'application/json'
}
})
// Track shape types in final persisted document
const persistedShapeTypeCounts = Object.values(mergedDoc.store || {})
.filter((r: any) => r?.typeName === 'shape')
.reduce((acc: any, r: any) => {
const type = r?.type || 'unknown'
acc[type] = (acc[type] || 0) + 1
return acc
}, {})
// Update last persisted hash only after successful save
this.lastPersistedHash = currentHash
// Update cached R2 document to match what we just saved
this.cachedR2Doc = mergedDoc
this.cachedR2Hash = currentHash
console.log(`â
Successfully persisted room ${this.roomId} to R2 (merged):`, {
storeKeys: mergedDoc.store ? Object.keys(mergedDoc.store).length : 0,
shapeCount: mergedShapeCount,
docSize: docSize,
preservedR2Shapes: r2ShapeCount > 0 ? `${r2ShapeCount} from R2` : 'none',
r2PutResult: putResult ? 'success' : 'unknown'
})
console.log(`â
Persisted shape type breakdown:`, persistedShapeTypeCounts)
} catch (error) {
// Enhanced error logging for R2 persistence failures
const errorDetails = {
roomId: this.roomId,
errorMessage: error instanceof Error ? error.message : String(error),
errorName: error instanceof Error ? error.name : 'Unknown',
errorStack: error instanceof Error ? error.stack : undefined,
shapeCount: mergedShapeCount,
storeKeys: mergedDoc.store ? Object.keys(mergedDoc.store).length : 0,
docSize: mergedDoc.store ? JSON.stringify(mergedDoc).length : 0
}
console.error(`â Error persisting room ${this.roomId} to R2:`, errorDetails)
console.error(`â Full error object:`, error)
}
}, 2_000)
// Handle request-document-state message from worker
// This allows the worker to request current document state from clients for persistence
private async handleDocumentStateRequest(sessionId: string) {
// When worker requests document state, we'll respond via the existing POST endpoint
// Clients should periodically send their document state, so this is mainly for logging
console.log(`ðĄ Worker: Document state requested from ${sessionId} (clients should send via POST /room/:roomId)`)
}
// ==================== TOMBSTONE MANAGEMENT ====================
// These methods handle tracking deleted shapes to prevent resurrection
// when offline clients reconnect with stale data
/**
* Load tombstones from R2 storage
* Called during initialization to restore deleted shape tracking
*/
private async loadTombstones(): Promise {
if (this.tombstonesLoaded || !this.roomId) return
try {
const tombstoneKey = `rooms/${this.roomId}/tombstones.json`
const object = await this.r2.get(tombstoneKey)
if (object) {
const data = await object.json() as { deletedShapeIds: string[], lastUpdated: string }
this.deletedShapeIds = new Set(data.deletedShapeIds || [])
console.log(`ðŠĶ Loaded ${this.deletedShapeIds.size} tombstones for room ${this.roomId}`)
} else {
console.log(`ðŠĶ No tombstones found for room ${this.roomId}, starting fresh`)
this.deletedShapeIds = new Set()
}
this.tombstonesLoaded = true
} catch (error) {
console.error(`â Error loading tombstones for room ${this.roomId}:`, error)
this.deletedShapeIds = new Set()
this.tombstonesLoaded = true
}
}
/**
* Save tombstones to R2 storage
* Called after detecting deletions to persist the tombstone list
*/
private async saveTombstones(): Promise {
if (!this.roomId) return
try {
const tombstoneKey = `rooms/${this.roomId}/tombstones.json`
const data = {
deletedShapeIds: Array.from(this.deletedShapeIds),
lastUpdated: new Date().toISOString(),
count: this.deletedShapeIds.size
}
await this.r2.put(tombstoneKey, JSON.stringify(data), {
httpMetadata: { contentType: 'application/json' }
})
console.log(`ðŠĶ Saved ${this.deletedShapeIds.size} tombstones for room ${this.roomId}`)
} catch (error) {
console.error(`â Error saving tombstones for room ${this.roomId}:`, error)
}
}
/**
* Detect deleted shapes by comparing old and new stores
* Adds newly deleted shape IDs to the tombstone set
* @returns Number of new deletions detected
*/
private detectDeletions(oldStore: Record, newStore: Record): number {
let newDeletions = 0
// Find shapes that existed in oldStore but not in newStore
for (const id of Object.keys(oldStore)) {
const record = oldStore[id]
// Only track shape deletions (not camera, instance, etc.)
if (record?.typeName === 'shape' && !newStore[id]) {
if (!this.deletedShapeIds.has(id)) {
this.deletedShapeIds.add(id)
newDeletions++
console.log(`ðŠĶ Detected deletion of shape: ${id}`)
}
}
}
return newDeletions
}
/**
* Filter out tombstoned shapes from a store
* Prevents resurrection of deleted shapes
* @returns Filtered store and count of shapes removed
*/
private filterTombstonedShapes(store: Record): {
filteredStore: Record,
removedCount: number
} {
const filteredStore: Record = {}
let removedCount = 0
for (const [id, record] of Object.entries(store)) {
// Check if this is a tombstoned shape
if (record?.typeName === 'shape' && this.deletedShapeIds.has(id)) {
removedCount++
console.log(`ðŠĶ Blocking resurrection of tombstoned shape: ${id}`)
} else {
filteredStore[id] = record
}
}
return { filteredStore, removedCount }
}
/**
* Clear old tombstones that are older than the retention period
* Called periodically to prevent unbounded tombstone growth
* For now, we keep all tombstones - can add expiry logic later
*/
private cleanupOldTombstones(): void {
// TODO: Implement tombstone expiry if needed
// For now, tombstones are permanent to ensure deleted shapes never return
// This is the safest approach for collaborative editing
if (this.deletedShapeIds.size > 10000) {
console.warn(`â ïļ Large tombstone count (${this.deletedShapeIds.size}) for room ${this.roomId}. Consider implementing expiry.`)
}
}
}