/// import { RoomSnapshot, TLSocketRoom } from "@tldraw/sync-core" import { TLRecord, TLShape, createTLSchema, defaultBindingSchemas, defaultShapeSchemas, } from "@tldraw/tlschema" import { AutoRouter, IRequest, error } from "itty-router" import throttle from "lodash.throttle" import { Environment } from "./types" import { ChatBoxShape } from "@/shapes/ChatBoxShapeUtil" import { VideoChatShape } from "@/shapes/VideoChatShapeUtil" import { EmbedShape } from "@/shapes/EmbedShapeUtil" import { MarkdownShape } from "@/shapes/MarkdownShapeUtil" import { MycrozineTemplateShape } from "@/shapes/MycrozineTemplateShapeUtil" // add custom shapes and bindings here if needed: export const customSchema = createTLSchema({ shapes: { ...defaultShapeSchemas, ChatBox: { props: ChatBoxShape.props, migrations: ChatBoxShape.migrations, }, VideoChat: { props: VideoChatShape.props, migrations: VideoChatShape.migrations, }, Embed: { props: EmbedShape.props, migrations: EmbedShape.migrations, }, Markdown: { props: MarkdownShape.props, migrations: MarkdownShape.migrations, }, MycrozineTemplate: { props: MycrozineTemplateShape.props, migrations: MycrozineTemplateShape.migrations, }, }, bindings: defaultBindingSchemas, }) // each whiteboard room is hosted in a DurableObject: // https://developers.cloudflare.com/durable-objects/ // there's only ever one durable object instance per room. it keeps all the room state in memory and // handles websocket connections. periodically, it persists the room state to the R2 bucket. export class TldrawDurableObject { private r2: R2Bucket private backupsR2: R2Bucket private lastBackupDate: string | null = null // the room ID will be missing whilst the room is being initialized private roomId: string | null = null // when we load the room from the R2 bucket, we keep it here. it's a promise so we only ever // load it once. private roomPromise: Promise> | null = null private isDevelopment: boolean // Modify to use a more strict throttle with a flag to prevent multiple calls private isPersisting = false constructor(private readonly ctx: DurableObjectState, env: Environment) { this.r2 = env.TLDRAW_BUCKET this.backupsR2 = env.TLDRAW_BACKUPS_BUCKET this.isDevelopment = env.IS_DEVELOPMENT ctx.blockConcurrencyWhile(async () => { this.roomId = ((await this.ctx.storage.get("roomId")) ?? null) as | string | null this.lastBackupDate = ((await this.ctx.storage.get("lastBackupDate")) ?? null) as | string | null }) } private readonly router = AutoRouter({ catch: (e) => { console.log(e) return error(e) }, }) // when we get a connection request, we stash the room id if needed and handle the connection .get("/connect/:roomId", async (request) => { if (!this.roomId) { await this.ctx.blockConcurrencyWhile(async () => { await this.ctx.storage.put("roomId", request.params.roomId) this.roomId = request.params.roomId }) } return this.handleConnect(request) }) .get("/room/:roomId", async (request) => { const room = await this.getRoom() const snapshot = room.getCurrentSnapshot() return new Response(JSON.stringify(snapshot.documents), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": request.headers.get("Origin") || "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", "Access-Control-Max-Age": "86400", }, }) }) .post("/room/:roomId", async (request) => { console.log('POST /room/:roomId called') const records = (await request.json()) as TLRecord[] console.log('Received records:', records) return new Response(JSON.stringify(Array.from(records)), { headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": request.headers.get("Origin") || "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", "Access-Control-Max-Age": "86400", }, }) }) .get("/test-logs", () => { console.log("Test endpoint hit") return new Response("Test endpoint hit") }) // `fetch` is the entry point for all requests to the Durable Object fetch(request: Request): Response | Promise { try { console.log('DO fetch called:', request.url, request.method) // Handle CORS preflight requests if (request.method === 'OPTIONS') { return new Response(null, { headers: { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS, UPGRADE', 'Access-Control-Allow-Headers': 'Content-Type, Authorization, Upgrade, Connection', 'Access-Control-Max-Age': '86400', 'Access-Control-Allow-Credentials': 'true', }, }) } return this.router.fetch(request) } catch (err) { console.error("Error in DO fetch:", err) return new Response( JSON.stringify({ error: "Internal Server Error", message: (err as Error).message, }), { status: 500, headers: { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS, UPGRADE", "Access-Control-Allow-Headers": "Content-Type, Authorization, Upgrade, Connection", "Access-Control-Max-Age": "86400", "Access-Control-Allow-Credentials": "true", }, } ) } } // what happens when someone tries to connect to this room? async handleConnect(request: IRequest): Promise { if (!this.roomId) { return new Response("Room not initialized", { status: 400 }) } const sessionId = request.query.sessionId as string if (!sessionId) { return new Response("Missing sessionId", { status: 400 }) } // Check if this is a websocket upgrade request const upgradeHeader = request.headers.get("Upgrade") if (!upgradeHeader || upgradeHeader.toLowerCase() !== "websocket") { return new Response("Expected Upgrade: websocket", { status: 426, headers: { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS, UPGRADE", "Access-Control-Allow-Headers": "*", "Access-Control-Allow-Credentials": "true", "Upgrade": "websocket" } }) } // Create WebSocket pair and handle connection const webSocketPair = new WebSocketPair() const [client, server] = Object.values(webSocketPair) try { server.accept() console.log(`Connected to room: ${this.roomId}, session: ${sessionId}`) const room = await this.getRoom() // Add error handling for the websocket server.addEventListener('error', (error) => { console.error(`WebSocket error in room ${this.roomId}, session ${sessionId}:`, error) }) // Handle close event server.addEventListener('close', async () => { console.log(`Disconnected from room: ${this.roomId}, session: ${sessionId}`) if (this.roomPromise) { try { const room = await this.getRoom() if (room) { await this.schedulePersistToR2() } } catch (error) { console.error(`Failed to persist data on disconnect for room ${this.roomId}:`, error) } } }) // Handle message event server.addEventListener('message', async (event) => { try { if (typeof event.data === 'string') { const data = JSON.parse(event.data) //console.log('Received message:', data) } } catch (error) { console.error('Error handling message:', error) } }) room.handleSocketConnect({ sessionId, socket: { send: (data: string) => { try { if (server.readyState === WebSocket.OPEN) { server.send(data) } } catch (error) { console.error(`Failed to send websocket data in room ${this.roomId}:`, error) } }, close: (code?: number, reason?: string) => { try { server.close(code, reason) } catch (error) { console.error(`Error closing websocket in room ${this.roomId}:`, error) } }, addEventListener: server.addEventListener.bind(server), removeEventListener: server.removeEventListener.bind(server), readyState: server.readyState, }, }) return new Response(null, { status: 101, webSocket: client, headers: { "Upgrade": "websocket", "Connection": "Upgrade", "Sec-WebSocket-Accept": "true", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS, UPGRADE", "Access-Control-Allow-Headers": "*", "Access-Control-Allow-Credentials": "true" } }) } catch (error) { console.error("WebSocket connection error:", error) server.close(1011, "Failed to initialize connection") return new Response("Failed to establish WebSocket connection", { status: 500, headers: { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS, UPGRADE", "Access-Control-Allow-Headers": "*", "Access-Control-Allow-Credentials": "true" } }) } } getRoom() { const roomId = this.roomId if (!roomId) throw new Error("Missing roomId") console.log('getRoom called with roomId:', roomId) if (!this.roomPromise) { this.roomPromise = (async () => { // fetch the room from R2 console.log(`Attempting to fetch room ${roomId} from R2...`) const roomFromBucket = await this.r2.get(`rooms/${roomId}`) if (roomFromBucket) { console.log(`Found existing room in R2 for ${roomId}`) } else { console.log(`No existing room found in R2 for ${roomId}, creating new room`) } // if it doesn't exist, we'll just create a new empty room const initialSnapshot = roomFromBucket ? ((await roomFromBucket.json()) as RoomSnapshot) : undefined if (initialSnapshot) { console.log(`Loaded snapshot for room ${roomId}`) } // create a new TLSocketRoom return new TLSocketRoom({ schema: customSchema, initialSnapshot, onDataChange: async () => { console.log('onDataChange triggered - scheduling persist to R2') await this.schedulePersistToR2() console.log('persist to R2 completed') }, }) })() } return this.roomPromise } private async checkAndCreateDailyBackup() { if (!this.roomId) return const now = new Date() const currentDate = now.toISOString().split('T')[0] // In development, check if 10 seconds have passed since last backup // In production, check if it's a different day const shouldBackup = this.isDevelopment ? !this.lastBackupDate || (now.getTime() - new Date(this.lastBackupDate).getTime() > 10000) : this.lastBackupDate !== currentDate if (!shouldBackup) return const room = await this.getRoom() const snapshot = room.getCurrentSnapshot() const timestamp = now.toISOString() const backupKey = `backups/${this.roomId}/${timestamp}.json` try { await this.backupsR2.put(backupKey, JSON.stringify(snapshot)) // Store the full ISO timestamp in development, just the date in production await this.ctx.storage.put("lastBackupDate", this.isDevelopment ? timestamp : currentDate) this.lastBackupDate = this.isDevelopment ? timestamp : currentDate console.log(`Created backup for room ${this.roomId} at ${timestamp}`) } catch (error) { console.error(`Failed to create backup for room ${this.roomId}:`, error) } } // Modify to use a more strict throttle with a flag to prevent multiple calls schedulePersistToR2 = throttle(async () => { if (!this.roomPromise || !this.roomId || this.isPersisting) { console.log('Skipping persist - room not ready or persist already in progress') return } try { this.isPersisting = true console.log(`Persisting room ${this.roomId} to storage...`) const room = await this.getRoom() if (!room) { console.log(`Room ${this.roomId} no longer exists, skipping persist`) return } await this.r2.put(`rooms/${this.roomId}`, JSON.stringify(room.getCurrentSnapshot())) console.log(`Successfully persisted room ${this.roomId}`) await this.checkAndCreateDailyBackup() } catch (error) { console.error(`Failed to persist room ${this.roomId}:`, error) } finally { this.isPersisting = false } }, 10_000, { leading: false, // Don't execute on the leading edge of the timeout trailing: true, // Execute on the trailing edge of the timeout }) // Add CORS headers for WebSocket upgrade handleWebSocket(request: Request) { const upgradeHeader = request.headers.get("Upgrade") if (!upgradeHeader || upgradeHeader !== "websocket") { return new Response("Expected Upgrade: websocket", { status: 426 }) } const webSocketPair = new WebSocketPair() const [client, server] = Object.values(webSocketPair) server.accept() // Add error handling and reconnection logic server.addEventListener("error", (err) => { console.error("WebSocket error:", err) }) server.addEventListener("close", () => { if (this.roomPromise) { this.getRoom().then((room) => { // Update store to ensure all changes are persisted room.updateStore(() => {}) }) } }) return new Response(null, { status: 101, webSocket: client, headers: { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", }, }) } }