From d6042fcfe7d9c1636e3e8ecbecc24023e7ccc94a Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Thu, 1 Jan 2026 22:59:32 +0100 Subject: [PATCH] feat: Add Automerge CRDT sync for real-time collaboration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CommunitySync class bridges FolkJS shapes with Automerge documents - Server stores Automerge binary format with debounced persistence - Per-peer sync state for efficient delta synchronization - WebSocket messages carry Automerge sync protocol - Automatic migration from JSON to Automerge format - WASM plugin for Vite to handle Automerge bundle Enables true CRDT-based collaboration with: - Conflict-free concurrent editing - Efficient delta sync (only changed data) - Offline-capable local documents - Automatic peer reconnection 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- bun.lock | 40 +++- lib/community-sync.ts | 484 ++++++++++++++++++++++++++++++++++++++ lib/index.ts | 3 + package.json | 4 +- server/community-store.ts | 361 +++++++++++++++++++++++----- server/index.ts | 184 +++++++++++---- vite.config.ts | 5 + website/canvas.html | 246 ++++++++++--------- 8 files changed, 1112 insertions(+), 215 deletions(-) create mode 100644 lib/community-sync.ts diff --git a/bun.lock b/bun.lock index 6fa75ab..184c65b 100644 --- a/bun.lock +++ b/bun.lock @@ -15,6 +15,8 @@ "bun-types": "^1.1.38", "typescript": "^5.7.2", "vite": "^6.0.3", + "vite-plugin-top-level-await": "^1.6.0", + "vite-plugin-wasm": "^3.5.0", }, }, }, @@ -77,6 +79,8 @@ "@lit/reactive-element": ["@lit/reactive-element@2.1.2", "", { "dependencies": { "@lit-labs/ssr-dom-shim": "^1.5.0" } }, "sha512-pbCDiVMnne1lYUIaYNN5wrwQXDtHaYtg7YEFPeW+hws6U47WeFvISGUWekPGKWOP1ygrs0ef0o1VJMk1exos5A=="], + "@rollup/plugin-virtual": ["@rollup/plugin-virtual@3.0.2", "", { "peerDependencies": { "rollup": "^1.20.0||^2.0.0||^3.0.0||^4.0.0" }, "optionalPeers": ["rollup"] }, "sha512-10monEYsBp3scM4/ND4LNH5Rxvh3e/cVeL3jWTgZ2SrQ+BmUoQcopVQvnaMcOnykb1VkxUFuDAN+0FnpTFRy2A=="], + "@rollup/rollup-android-arm-eabi": ["@rollup/rollup-android-arm-eabi@4.54.0", "", { "os": "android", "cpu": "arm" }, "sha512-OywsdRHrFvCdvsewAInDKCNyR3laPA2mc9bRYJ6LBp5IyvF3fvXbbNR0bSzHlZVFtn6E0xw2oZlyjg4rKCVcng=="], "@rollup/rollup-android-arm64": ["@rollup/rollup-android-arm64@4.54.0", "", { "os": "android", "cpu": "arm64" }, "sha512-Skx39Uv+u7H224Af+bDgNinitlmHyQX1K/atIA32JP3JQw6hVODX5tkbi2zof/E69M1qH2UoN3Xdxgs90mmNYw=="], @@ -121,6 +125,34 @@ "@rollup/rollup-win32-x64-msvc": ["@rollup/rollup-win32-x64-msvc@4.54.0", "", { "os": "win32", "cpu": "x64" }, "sha512-hYT5d3YNdSh3mbCU1gwQyPgQd3T2ne0A3KG8KSBdav5TiBg6eInVmV+TeR5uHufiIgSFg0XsOWGW5/RhNcSvPg=="], + "@swc/core": ["@swc/core@1.15.8", "", { "dependencies": { "@swc/counter": "^0.1.3", "@swc/types": "^0.1.25" }, "optionalDependencies": { "@swc/core-darwin-arm64": "1.15.8", "@swc/core-darwin-x64": "1.15.8", "@swc/core-linux-arm-gnueabihf": "1.15.8", "@swc/core-linux-arm64-gnu": "1.15.8", "@swc/core-linux-arm64-musl": "1.15.8", "@swc/core-linux-x64-gnu": "1.15.8", "@swc/core-linux-x64-musl": "1.15.8", "@swc/core-win32-arm64-msvc": "1.15.8", "@swc/core-win32-ia32-msvc": "1.15.8", "@swc/core-win32-x64-msvc": "1.15.8" }, "peerDependencies": { "@swc/helpers": ">=0.5.17" }, "optionalPeers": ["@swc/helpers"] }, "sha512-T8keoJjXaSUoVBCIjgL6wAnhADIb09GOELzKg10CjNg+vLX48P93SME6jTfte9MZIm5m+Il57H3rTSk/0kzDUw=="], + + "@swc/core-darwin-arm64": ["@swc/core-darwin-arm64@1.15.8", "", { "os": "darwin", "cpu": "arm64" }, "sha512-M9cK5GwyWWRkRGwwCbREuj6r8jKdES/haCZ3Xckgkl8MUQJZA3XB7IXXK1IXRNeLjg6m7cnoMICpXv1v1hlJOg=="], + + "@swc/core-darwin-x64": ["@swc/core-darwin-x64@1.15.8", "", { "os": "darwin", "cpu": "x64" }, "sha512-j47DasuOvXl80sKJHSi2X25l44CMc3VDhlJwA7oewC1nV1VsSzwX+KOwE5tLnfORvVJJyeiXgJORNYg4jeIjYQ=="], + + "@swc/core-linux-arm-gnueabihf": ["@swc/core-linux-arm-gnueabihf@1.15.8", "", { "os": "linux", "cpu": "arm" }, "sha512-siAzDENu2rUbwr9+fayWa26r5A9fol1iORG53HWxQL1J8ym4k7xt9eME0dMPXlYZDytK5r9sW8zEA10F2U3Xwg=="], + + "@swc/core-linux-arm64-gnu": ["@swc/core-linux-arm64-gnu@1.15.8", "", { "os": "linux", "cpu": "arm64" }, "sha512-o+1y5u6k2FfPYbTRUPvurwzNt5qd0NTumCTFscCNuBksycloXY16J8L+SMW5QRX59n4Hp9EmFa3vpvNHRVv1+Q=="], + + "@swc/core-linux-arm64-musl": ["@swc/core-linux-arm64-musl@1.15.8", "", { "os": "linux", "cpu": "arm64" }, "sha512-koiCqL09EwOP1S2RShCI7NbsQuG6r2brTqUYE7pV7kZm9O17wZ0LSz22m6gVibpwEnw8jI3IE1yYsQTVpluALw=="], + + "@swc/core-linux-x64-gnu": ["@swc/core-linux-x64-gnu@1.15.8", "", { "os": "linux", "cpu": "x64" }, "sha512-4p6lOMU3bC+Vd5ARtKJ/FxpIC5G8v3XLoPEZ5s7mLR8h7411HWC/LmTXDHcrSXRC55zvAVia1eldy6zDLz8iFQ=="], + + "@swc/core-linux-x64-musl": ["@swc/core-linux-x64-musl@1.15.8", "", { "os": "linux", "cpu": "x64" }, "sha512-z3XBnbrZAL+6xDGAhJoN4lOueIxC/8rGrJ9tg+fEaeqLEuAtHSW2QHDHxDwkxZMjuF/pZ6MUTjHjbp8wLbuRLA=="], + + "@swc/core-win32-arm64-msvc": ["@swc/core-win32-arm64-msvc@1.15.8", "", { "os": "win32", "cpu": "arm64" }, "sha512-djQPJ9Rh9vP8GTS/Df3hcc6XP6xnG5c8qsngWId/BLA9oX6C7UzCPAn74BG/wGb9a6j4w3RINuoaieJB3t+7iQ=="], + + "@swc/core-win32-ia32-msvc": ["@swc/core-win32-ia32-msvc@1.15.8", "", { "os": "win32", "cpu": "ia32" }, "sha512-/wfAgxORg2VBaUoFdytcVBVCgf1isWZIEXB9MZEUty4wwK93M/PxAkjifOho9RN3WrM3inPLabICRCEgdHpKKQ=="], + + "@swc/core-win32-x64-msvc": ["@swc/core-win32-x64-msvc@1.15.8", "", { "os": "win32", "cpu": "x64" }, "sha512-GpMePrh9Sl4d61o4KAHOOv5is5+zt6BEXCOCgs/H0FLGeii7j9bWDE8ExvKFy2GRRZVNR1ugsnzaGWHKM6kuzA=="], + + "@swc/counter": ["@swc/counter@0.1.3", "", {}, "sha512-e2BR4lsJkkRlKZ/qCHPw9ZaSxc0MVUd7gtbtaB7aMvHeJVYe8sOB8DBZkP2DtISHGSku9sCK6T6cnY0CtXrOCQ=="], + + "@swc/types": ["@swc/types@0.1.25", "", { "dependencies": { "@swc/counter": "^0.1.3" } }, "sha512-iAoY/qRhNH8a/hBvm3zKj9qQ4oc2+3w1unPJa2XvTK3XjeLXtzcCingVPw/9e5mn1+0yPqxcBGp9Jf0pkfMb1g=="], + + "@swc/wasm": ["@swc/wasm@1.15.8", "", {}, "sha512-RG2BxGbbsjtddFCo1ghKH6A/BMXbY1eMBfpysV0lJMCpI4DZOjW1BNBnxvBt7YsYmlJtmy5UXIg9/4ekBTFFaQ=="], + "@types/estree": ["@types/estree@1.0.8", "", {}, "sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w=="], "@types/node": ["@types/node@22.19.3", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-1N9SBnWYOJTrNZCdh/yJE+t910Y128BoyY+zBLWhL3r0TYzlTmFdXrPwHL9DyFZmlEXNQQolTZh3KHV31QDhyA=="], @@ -155,8 +187,14 @@ "undici-types": ["undici-types@6.21.0", "", {}, "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ=="], - "uuid": ["uuid@9.0.1", "", { "bin": { "uuid": "dist/bin/uuid" } }, "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA=="], + "uuid": ["uuid@10.0.0", "", { "bin": { "uuid": "dist/bin/uuid" } }, "sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ=="], "vite": ["vite@6.4.1", "", { "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.4", "picomatch": "^4.0.2", "postcss": "^8.5.3", "rollup": "^4.34.9", "tinyglobby": "^0.2.13" }, "optionalDependencies": { "fsevents": "~2.3.3" }, "peerDependencies": { "@types/node": "^18.0.0 || ^20.0.0 || >=22.0.0", "jiti": ">=1.21.0", "less": "*", "lightningcss": "^1.21.0", "sass": "*", "sass-embedded": "*", "stylus": "*", "sugarss": "*", "terser": "^5.16.0", "tsx": "^4.8.1", "yaml": "^2.4.2" }, "optionalPeers": ["@types/node", "jiti", "less", "lightningcss", "sass", "sass-embedded", "stylus", "sugarss", "terser", "tsx", "yaml"], "bin": { "vite": "bin/vite.js" } }, "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g=="], + + "vite-plugin-top-level-await": ["vite-plugin-top-level-await@1.6.0", "", { "dependencies": { "@rollup/plugin-virtual": "^3.0.2", "@swc/core": "^1.12.14", "@swc/wasm": "^1.12.14", "uuid": "10.0.0" }, "peerDependencies": { "vite": ">=2.8" } }, "sha512-bNhUreLamTIkoulCR9aDXbTbhLk6n1YE8NJUTTxl5RYskNRtzOR0ASzSjBVRtNdjIfngDXo11qOsybGLNsrdww=="], + + "vite-plugin-wasm": ["vite-plugin-wasm@3.5.0", "", { "peerDependencies": { "vite": "^2 || ^3 || ^4 || ^5 || ^6 || ^7" } }, "sha512-X5VWgCnqiQEGb+omhlBVsvTfxikKtoOgAzQ95+BZ8gQ+VfMHIjSHr0wyvXFQCa0eKQ0fKyaL0kWcEnYqBac4lQ=="], + + "@automerge/automerge/uuid": ["uuid@9.0.1", "", { "bin": { "uuid": "dist/bin/uuid" } }, "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA=="], } } diff --git a/lib/community-sync.ts b/lib/community-sync.ts new file mode 100644 index 0000000..634abd7 --- /dev/null +++ b/lib/community-sync.ts @@ -0,0 +1,484 @@ +import * as Automerge from "@automerge/automerge"; +import type { FolkShape } from "./folk-shape"; + +// Shape data stored in Automerge document +export interface ShapeData { + type: string; + id: string; + x: number; + y: number; + width: number; + height: number; + rotation: number; + content?: string; + // Arrow-specific + sourceId?: string; + targetId?: string; +} + +// Automerge document structure +export interface CommunityDoc { + meta: { + name: string; + slug: string; + createdAt: string; + }; + shapes: { + [id: string]: ShapeData; + }; +} + +type SyncState = Automerge.SyncState; + +/** + * CommunitySync - Bridges FolkJS shapes with Automerge CRDT sync + * + * Handles: + * - Local shape changes → Automerge document → WebSocket broadcast + * - Remote Automerge sync messages → Local document → DOM updates + */ +export class CommunitySync extends EventTarget { + #doc: Automerge.Doc; + #syncState: SyncState; + #ws: WebSocket | null = null; + #communitySlug: string; + #shapes: Map = new Map(); + #pendingChanges: boolean = false; + #reconnectAttempts = 0; + #maxReconnectAttempts = 5; + #reconnectDelay = 1000; + + constructor(communitySlug: string) { + super(); + this.#communitySlug = communitySlug; + + // Initialize empty Automerge document + this.#doc = Automerge.init(); + this.#doc = Automerge.change(this.#doc, "Initialize community", (doc) => { + doc.meta = { + name: communitySlug, + slug: communitySlug, + createdAt: new Date().toISOString(), + }; + doc.shapes = {}; + }); + + this.#syncState = Automerge.initSyncState(); + } + + get doc(): Automerge.Doc { + return this.#doc; + } + + get shapes(): Map { + return this.#shapes; + } + + /** + * Connect to WebSocket server for real-time sync + */ + connect(wsUrl: string): void { + if (this.#ws?.readyState === WebSocket.OPEN) { + return; + } + + this.#ws = new WebSocket(wsUrl); + this.#ws.binaryType = "arraybuffer"; + + this.#ws.onopen = () => { + console.log(`[CommunitySync] Connected to ${this.#communitySlug}`); + this.#reconnectAttempts = 0; + + // Request initial sync + this.#requestSync(); + + this.dispatchEvent(new CustomEvent("connected")); + }; + + this.#ws.onmessage = (event) => { + this.#handleMessage(event.data); + }; + + this.#ws.onclose = () => { + console.log(`[CommunitySync] Disconnected from ${this.#communitySlug}`); + this.dispatchEvent(new CustomEvent("disconnected")); + + // Attempt reconnect + this.#attemptReconnect(wsUrl); + }; + + this.#ws.onerror = (error) => { + console.error("[CommunitySync] WebSocket error:", error); + this.dispatchEvent(new CustomEvent("error", { detail: error })); + }; + } + + #attemptReconnect(wsUrl: string): void { + if (this.#reconnectAttempts >= this.#maxReconnectAttempts) { + console.error("[CommunitySync] Max reconnect attempts reached"); + return; + } + + this.#reconnectAttempts++; + const delay = this.#reconnectDelay * Math.pow(2, this.#reconnectAttempts - 1); + + console.log(`[CommunitySync] Reconnecting in ${delay}ms (attempt ${this.#reconnectAttempts})`); + + setTimeout(() => { + this.connect(wsUrl); + }, delay); + } + + /** + * Request sync from server (sends our sync state) + */ + #requestSync(): void { + const [nextSyncState, syncMessage] = Automerge.generateSyncMessage( + this.#doc, + this.#syncState + ); + + this.#syncState = nextSyncState; + + if (syncMessage) { + this.#send({ + type: "sync", + data: Array.from(syncMessage), + }); + } + } + + /** + * Handle incoming WebSocket messages + */ + #handleMessage(data: ArrayBuffer | string): void { + try { + // Handle binary Automerge sync messages + if (data instanceof ArrayBuffer) { + const message = new Uint8Array(data); + this.#applySyncMessage(message); + return; + } + + // Handle JSON messages + const msg = JSON.parse(data as string); + + switch (msg.type) { + case "sync": + // Server sending sync message as JSON array + if (Array.isArray(msg.data)) { + const syncMessage = new Uint8Array(msg.data); + this.#applySyncMessage(syncMessage); + } + break; + + case "full-sync": + // Server sending full document (for initial load) + if (msg.doc) { + const binary = new Uint8Array(msg.doc); + this.#doc = Automerge.load(binary); + this.#syncState = Automerge.initSyncState(); + this.#applyDocToDOM(); + } + break; + + case "presence": + // Handle presence updates (cursors, selections) + this.dispatchEvent(new CustomEvent("presence", { detail: msg })); + break; + } + } catch (e) { + console.error("[CommunitySync] Failed to handle message:", e); + } + } + + /** + * Apply incoming Automerge sync message + */ + #applySyncMessage(message: Uint8Array): void { + const result = Automerge.receiveSyncMessage( + this.#doc, + this.#syncState, + message + ); + + this.#doc = result[0]; + this.#syncState = result[1]; + + // Apply changes to DOM if we received new patches + const patch = result[2] as { patches: Automerge.Patch[] } | null; + if (patch && patch.patches && patch.patches.length > 0) { + this.#applyPatchesToDOM(patch.patches); + } + + // Generate response if needed + const [nextSyncState, responseMessage] = Automerge.generateSyncMessage( + this.#doc, + this.#syncState + ); + + this.#syncState = nextSyncState; + + if (responseMessage) { + this.#send({ + type: "sync", + data: Array.from(responseMessage), + }); + } + } + + /** + * Send message over WebSocket + */ + #send(message: object): void { + if (this.#ws?.readyState === WebSocket.OPEN) { + this.#ws.send(JSON.stringify(message)); + } + } + + /** + * Register a shape element for syncing + */ + registerShape(shape: FolkShape): void { + this.#shapes.set(shape.id, shape); + + // Listen for transform events + shape.addEventListener("folk-transform", ((e: CustomEvent) => { + this.#handleShapeChange(shape); + }) as EventListener); + + // Listen for content changes (for markdown shapes) + shape.addEventListener("content-change", ((e: CustomEvent) => { + this.#handleShapeChange(shape); + }) as EventListener); + + // Add to document if not exists + if (!this.#doc.shapes[shape.id]) { + this.#updateShapeInDoc(shape); + } + } + + /** + * Unregister a shape + */ + unregisterShape(shapeId: string): void { + this.#shapes.delete(shapeId); + } + + /** + * Handle local shape change - update Automerge doc and sync + */ + #handleShapeChange(shape: FolkShape): void { + this.#updateShapeInDoc(shape); + this.#syncToServer(); + } + + /** + * Update shape data in Automerge document + */ + #updateShapeInDoc(shape: FolkShape): void { + const shapeData = this.#shapeToData(shape); + + this.#doc = Automerge.change(this.#doc, `Update shape ${shape.id}`, (doc) => { + if (!doc.shapes) doc.shapes = {}; + doc.shapes[shape.id] = shapeData; + }); + } + + /** + * Convert FolkShape to serializable data + */ + #shapeToData(shape: FolkShape): ShapeData { + const data: ShapeData = { + type: shape.tagName.toLowerCase(), + id: shape.id, + x: shape.x, + y: shape.y, + width: shape.width, + height: shape.height, + rotation: shape.rotation, + }; + + // Add content for markdown shapes + if ("content" in shape && typeof (shape as any).content === "string") { + data.content = (shape as any).content; + } + + // Add arrow connections + if ("sourceId" in shape) { + data.sourceId = (shape as any).sourceId; + } + if ("targetId" in shape) { + data.targetId = (shape as any).targetId; + } + + return data; + } + + /** + * Sync local changes to server + */ + #syncToServer(): void { + const [nextSyncState, syncMessage] = Automerge.generateSyncMessage( + this.#doc, + this.#syncState + ); + + this.#syncState = nextSyncState; + + if (syncMessage) { + this.#send({ + type: "sync", + data: Array.from(syncMessage), + }); + } + } + + /** + * Delete a shape from the document + */ + deleteShape(shapeId: string): void { + this.#doc = Automerge.change(this.#doc, `Delete shape ${shapeId}`, (doc) => { + if (doc.shapes && doc.shapes[shapeId]) { + delete doc.shapes[shapeId]; + } + }); + + this.#shapes.delete(shapeId); + this.#syncToServer(); + } + + /** + * Apply full document to DOM (for initial load) + */ + #applyDocToDOM(): void { + const shapes = this.#doc.shapes || {}; + + for (const [id, shapeData] of Object.entries(shapes)) { + this.#applyShapeToDOM(shapeData); + } + + this.dispatchEvent(new CustomEvent("synced", { detail: { shapes } })); + } + + /** + * Apply Automerge patches to DOM + */ + #applyPatchesToDOM(patches: Automerge.Patch[]): void { + for (const patch of patches) { + const path = patch.path; + + // Handle shape updates: ["shapes", shapeId, ...] + if (path[0] === "shapes" && typeof path[1] === "string") { + const shapeId = path[1]; + const shapeData = this.#doc.shapes?.[shapeId]; + + if (patch.action === "del" && path.length === 2) { + // Shape deleted + this.#removeShapeFromDOM(shapeId); + } else if (shapeData) { + // Shape created or updated + this.#applyShapeToDOM(shapeData); + } + } + } + } + + /** + * Apply shape data to DOM element + */ + #applyShapeToDOM(shapeData: ShapeData): void { + let shape = this.#shapes.get(shapeData.id); + + if (!shape) { + // Create new shape element + shape = this.#createShapeElement(shapeData); + if (shape) { + this.#shapes.set(shapeData.id, shape); + this.dispatchEvent(new CustomEvent("shape-created", { detail: { shape, data: shapeData } })); + } + return; + } + + // Update existing shape (avoid triggering our own change events) + this.#updateShapeElement(shape, shapeData); + } + + /** + * Create a new shape element from data + */ + #createShapeElement(data: ShapeData): FolkShape | undefined { + // This will be handled by the canvas - emit event for canvas to create + this.dispatchEvent(new CustomEvent("create-shape", { detail: data })); + return undefined; + } + + /** + * Update shape element without triggering change events + */ + #updateShapeElement(shape: FolkShape, data: ShapeData): void { + // Temporarily remove event listeners to avoid feedback loop + const isOurChange = + shape.x === data.x && + shape.y === data.y && + shape.width === data.width && + shape.height === data.height && + shape.rotation === data.rotation; + + if (isOurChange && !("content" in data)) { + return; // No change needed + } + + // Update position and size + if (shape.x !== data.x) shape.x = data.x; + if (shape.y !== data.y) shape.y = data.y; + if (shape.width !== data.width) shape.width = data.width; + if (shape.height !== data.height) shape.height = data.height; + if (shape.rotation !== data.rotation) shape.rotation = data.rotation; + + // Update content for markdown shapes + if ("content" in data && "content" in shape) { + const shapeWithContent = shape as any; + if (shapeWithContent.content !== data.content) { + shapeWithContent.content = data.content; + } + } + } + + /** + * Remove shape from DOM + */ + #removeShapeFromDOM(shapeId: string): void { + const shape = this.#shapes.get(shapeId); + if (shape) { + this.#shapes.delete(shapeId); + this.dispatchEvent(new CustomEvent("shape-deleted", { detail: { shapeId, shape } })); + } + } + + /** + * Disconnect from server + */ + disconnect(): void { + if (this.#ws) { + this.#ws.close(); + this.#ws = null; + } + } + + /** + * Get document as binary for storage + */ + getDocumentBinary(): Uint8Array { + return Automerge.save(this.#doc); + } + + /** + * Load document from binary + */ + loadDocumentBinary(binary: Uint8Array): void { + this.#doc = Automerge.load(binary); + this.#syncState = Automerge.initSyncState(); + this.#applyDocToDOM(); + } +} diff --git a/lib/index.ts b/lib/index.ts index 8813697..0d1b5e4 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -21,3 +21,6 @@ export * from "./tags"; // Components export * from "./folk-shape"; export * from "./folk-markdown"; + +// Sync +export * from "./community-sync"; diff --git a/package.json b/package.json index a894195..f453a7f 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,8 @@ "@types/node": "^22.10.1", "bun-types": "^1.1.38", "typescript": "^5.7.2", - "vite": "^6.0.3" + "vite": "^6.0.3", + "vite-plugin-top-level-await": "^1.6.0", + "vite-plugin-wasm": "^3.5.0" } } diff --git a/server/community-store.ts b/server/community-store.ts index efa587c..5c2c867 100644 --- a/server/community-store.ts +++ b/server/community-store.ts @@ -1,4 +1,5 @@ import { mkdir, readdir } from "node:fs/promises"; +import * as Automerge from "@automerge/automerge"; const STORAGE_DIR = process.env.STORAGE_DIR || "./data/communities"; @@ -8,46 +9,83 @@ export interface CommunityMeta { createdAt: string; } -export interface CommunityDoc { - meta: CommunityMeta; - shapes: Record< - string, - { - type: string; - id: string; - x: number; - y: number; - width: number; - height: number; - rotation?: number; - content?: string; - } - >; +export interface ShapeData { + type: string; + id: string; + x: number; + y: number; + width: number; + height: number; + rotation: number; + content?: string; + sourceId?: string; + targetId?: string; } -// In-memory cache of community docs -const communities = new Map(); +export interface CommunityDoc { + meta: CommunityMeta; + shapes: { + [id: string]: ShapeData; + }; +} + +// Per-peer sync state for Automerge +interface PeerState { + syncState: Automerge.SyncState; + lastActivity: number; +} + +// In-memory cache of Automerge documents +const communities = new Map>(); + +// Track sync state per peer (WebSocket connection) +const peerSyncStates = new Map>(); + +// Debounce save timers +const saveTimers = new Map(); // Ensure storage directory exists await mkdir(STORAGE_DIR, { recursive: true }); -export async function loadCommunity(slug: string): Promise { +/** + * Load community document from disk + */ +export async function loadCommunity(slug: string): Promise | null> { // Check cache first if (communities.has(slug)) { return communities.get(slug)!; } - // Try to load from disk - const path = `${STORAGE_DIR}/${slug}.json`; - const file = Bun.file(path); + // Try to load Automerge binary first + const binaryPath = `${STORAGE_DIR}/${slug}.automerge`; + const binaryFile = Bun.file(binaryPath); - if (await file.exists()) { + if (await binaryFile.exists()) { try { - const data = (await file.json()) as CommunityDoc; - communities.set(slug, data); - return data; + const buffer = await binaryFile.arrayBuffer(); + const doc = Automerge.load(new Uint8Array(buffer)); + communities.set(slug, doc); + return doc; } catch (e) { - console.error(`Failed to load community ${slug}:`, e); + console.error(`Failed to load Automerge doc for ${slug}:`, e); + } + } + + // Fallback: try JSON format and migrate + const jsonPath = `${STORAGE_DIR}/${slug}.json`; + const jsonFile = Bun.file(jsonPath); + + if (await jsonFile.exists()) { + try { + const data = (await jsonFile.json()) as CommunityDoc; + // Migrate JSON to Automerge + const doc = jsonToAutomerge(data); + communities.set(slug, doc); + // Save as Automerge binary + await saveCommunity(slug); + return doc; + } catch (e) { + console.error(`Failed to migrate JSON for ${slug}:`, e); return null; } } @@ -55,60 +93,277 @@ export async function loadCommunity(slug: string): Promise return null; } -export async function saveCommunity(slug: string, doc: CommunityDoc): Promise { - communities.set(slug, doc); - const path = `${STORAGE_DIR}/${slug}.json`; - await Bun.write(path, JSON.stringify(doc, null, 2)); -} - -export async function createCommunity(name: string, slug: string): Promise { - const doc: CommunityDoc = { - meta: { - name, - slug, - createdAt: new Date().toISOString(), - }, - shapes: {}, - }; - - await saveCommunity(slug, doc); +/** + * Convert JSON document to Automerge document + */ +function jsonToAutomerge(data: CommunityDoc): Automerge.Doc { + let doc = Automerge.init(); + doc = Automerge.change(doc, "Import from JSON", (d) => { + d.meta = { ...data.meta }; + d.shapes = {}; + for (const [id, shape] of Object.entries(data.shapes || {})) { + d.shapes[id] = { ...shape }; + } + }); return doc; } +/** + * Save community document to disk (debounced) + */ +export async function saveCommunity(slug: string): Promise { + const doc = communities.get(slug); + if (!doc) return; + + // Clear existing timer + const existingTimer = saveTimers.get(slug); + if (existingTimer) { + clearTimeout(existingTimer); + } + + // Debounce saves to avoid excessive disk writes + const timer = setTimeout(async () => { + const currentDoc = communities.get(slug); + if (!currentDoc) return; + + const binary = Automerge.save(currentDoc); + const path = `${STORAGE_DIR}/${slug}.automerge`; + await Bun.write(path, binary); + console.log(`[Store] Saved ${slug} (${binary.length} bytes)`); + }, 2000); + + saveTimers.set(slug, timer); +} + +/** + * Create a new community + */ +export async function createCommunity(name: string, slug: string): Promise> { + let doc = Automerge.init(); + doc = Automerge.change(doc, "Create community", (d) => { + d.meta = { + name, + slug, + createdAt: new Date().toISOString(), + }; + d.shapes = {}; + }); + + communities.set(slug, doc); + await saveCommunity(slug); + return doc; +} + +/** + * Check if community exists + */ export async function communityExists(slug: string): Promise { if (communities.has(slug)) return true; - const path = `${STORAGE_DIR}/${slug}.json`; - const file = Bun.file(path); - return file.exists(); + const binaryPath = `${STORAGE_DIR}/${slug}.automerge`; + const jsonPath = `${STORAGE_DIR}/${slug}.json`; + + const binaryFile = Bun.file(binaryPath); + const jsonFile = Bun.file(jsonPath); + + return (await binaryFile.exists()) || (await jsonFile.exists()); } +/** + * List all communities + */ export async function listCommunities(): Promise { try { const files = await readdir(STORAGE_DIR); - return files.filter((f) => f.endsWith(".json")).map((f) => f.replace(".json", "")); + const slugs = new Set(); + + for (const f of files) { + if (f.endsWith(".automerge")) { + slugs.add(f.replace(".automerge", "")); + } else if (f.endsWith(".json")) { + slugs.add(f.replace(".json", "")); + } + } + + return Array.from(slugs); } catch { return []; } } +/** + * Get or create sync state for a peer + */ +export function getPeerSyncState(slug: string, peerId: string): PeerState { + if (!peerSyncStates.has(slug)) { + peerSyncStates.set(slug, new Map()); + } + + const communityPeers = peerSyncStates.get(slug)!; + + if (!communityPeers.has(peerId)) { + communityPeers.set(peerId, { + syncState: Automerge.initSyncState(), + lastActivity: Date.now(), + }); + } + + const peerState = communityPeers.get(peerId)!; + peerState.lastActivity = Date.now(); + return peerState; +} + +/** + * Remove peer sync state (on disconnect) + */ +export function removePeerSyncState(slug: string, peerId: string): void { + const communityPeers = peerSyncStates.get(slug); + if (communityPeers) { + communityPeers.delete(peerId); + if (communityPeers.size === 0) { + peerSyncStates.delete(slug); + } + } +} + +/** + * Get all peer IDs for a community + */ +export function getCommunityPeers(slug: string): string[] { + const communityPeers = peerSyncStates.get(slug); + return communityPeers ? Array.from(communityPeers.keys()) : []; +} + +/** + * Process incoming sync message from a peer + * Returns response message and messages for other peers + */ +export function receiveSyncMessage( + slug: string, + peerId: string, + message: Uint8Array, +): { + response: Uint8Array | null; + broadcastToPeers: Map; +} { + const doc = communities.get(slug); + if (!doc) { + return { response: null, broadcastToPeers: new Map() }; + } + + const peerState = getPeerSyncState(slug, peerId); + + // Apply incoming sync message + const result = Automerge.receiveSyncMessage( + doc, + peerState.syncState, + message + ); + + const newDoc = result[0]; + const newSyncState = result[1]; + const patch = result[2] as { patches: Automerge.Patch[] } | null; + + communities.set(slug, newDoc); + peerState.syncState = newSyncState; + + // Schedule save if changes were made + const hasPatches = patch && patch.patches && patch.patches.length > 0; + if (hasPatches) { + saveCommunity(slug); + } + + // Generate response for this peer + const [nextSyncState, responseMessage] = Automerge.generateSyncMessage( + newDoc, + peerState.syncState + ); + peerState.syncState = nextSyncState; + + // Generate messages for other peers + const broadcastToPeers = new Map(); + const communityPeers = peerSyncStates.get(slug); + + if (communityPeers && hasPatches) { + for (const [otherPeerId, otherPeerState] of communityPeers) { + if (otherPeerId !== peerId) { + const [newOtherSyncState, otherMessage] = Automerge.generateSyncMessage( + newDoc, + otherPeerState.syncState + ); + otherPeerState.syncState = newOtherSyncState; + + if (otherMessage) { + broadcastToPeers.set(otherPeerId, otherMessage); + } + } + } + } + + return { + response: responseMessage || null, + broadcastToPeers, + }; +} + +/** + * Generate initial sync message for a new peer + */ +export function generateSyncMessageForPeer( + slug: string, + peerId: string, +): Uint8Array | null { + const doc = communities.get(slug); + if (!doc) return null; + + const peerState = getPeerSyncState(slug, peerId); + const [newSyncState, message] = Automerge.generateSyncMessage( + doc, + peerState.syncState + ); + peerState.syncState = newSyncState; + + return message || null; +} + +/** + * Get document as plain object (for API responses) + */ +export function getDocumentData(slug: string): CommunityDoc | null { + const doc = communities.get(slug); + if (!doc) return null; + + // Convert Automerge doc to plain object + return JSON.parse(JSON.stringify(doc)); +} + +// Legacy functions for backward compatibility + export function updateShape( slug: string, shapeId: string, - data: CommunityDoc["shapes"][string], + data: ShapeData, ): void { const doc = communities.get(slug); if (doc) { - doc.shapes[shapeId] = data; - // Save async without blocking - saveCommunity(slug, doc); + const newDoc = Automerge.change(doc, `Update shape ${shapeId}`, (d) => { + if (!d.shapes) d.shapes = {}; + d.shapes[shapeId] = data; + }); + communities.set(slug, newDoc); + saveCommunity(slug); } } export function deleteShape(slug: string, shapeId: string): void { const doc = communities.get(slug); if (doc) { - delete doc.shapes[shapeId]; - saveCommunity(slug, doc); + const newDoc = Automerge.change(doc, `Delete shape ${shapeId}`, (d) => { + if (d.shapes && d.shapes[shapeId]) { + delete d.shapes[shapeId]; + } + }); + communities.set(slug, newDoc); + saveCommunity(slug); } } diff --git a/server/index.ts b/server/index.ts index 9020101..938d711 100644 --- a/server/index.ts +++ b/server/index.ts @@ -4,9 +4,12 @@ import { communityExists, createCommunity, deleteShape, + generateSyncMessageForPeer, + getDocumentData, loadCommunity, + receiveSyncMessage, + removePeerSyncState, updateShape, - type CommunityDoc, } from "./community-store"; const PORT = Number(process.env.PORT) || 3000; @@ -15,22 +18,20 @@ const DIST_DIR = resolve(import.meta.dir, "../dist"); // WebSocket data type interface WSData { communitySlug: string; + peerId: string; } -// Track connected clients per community -const communityClients = new Map>>(); +// Track connected clients per community (for broadcasting) +const communityClients = new Map>>(); -// Helper to broadcast to all clients in a community -function broadcastToCommunity(slug: string, message: object, excludeWs?: ServerWebSocket) { - const clients = communityClients.get(slug); - if (!clients) return; +// Generate unique peer ID +function generatePeerId(): string { + return `peer-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; +} - const data = JSON.stringify(message); - for (const client of clients) { - if (client !== excludeWs && client.readyState === WebSocket.OPEN) { - client.send(data); - } - } +// Helper to get client by peer ID +function getClient(slug: string, peerId: string): ServerWebSocket | undefined { + return communityClients.get(slug)?.get(peerId); } // Parse subdomain from host header @@ -91,7 +92,10 @@ const server = Bun.serve({ if (url.pathname.startsWith("/ws/")) { const communitySlug = url.pathname.split("/")[2]; if (communitySlug) { - const upgraded = server.upgrade(req, { data: { communitySlug } }); + const peerId = generatePeerId(); + const upgraded = server.upgrade(req, { + data: { communitySlug, peerId }, + }); if (upgraded) return undefined; } return new Response("WebSocket upgrade failed", { status: 400 }); @@ -134,61 +138,131 @@ const server = Bun.serve({ websocket: { open(ws: ServerWebSocket) { - const { communitySlug } = ws.data; + const { communitySlug, peerId } = ws.data; - // Add to clients set + // Add to clients map if (!communityClients.has(communitySlug)) { - communityClients.set(communitySlug, new Set()); + communityClients.set(communitySlug, new Map()); } - communityClients.get(communitySlug)!.add(ws); + communityClients.get(communitySlug)!.set(peerId, ws); - console.log(`Client connected to ${communitySlug}`); + console.log(`[WS] Client ${peerId} connected to ${communitySlug}`); - // Send current state + // Load community and send initial sync message loadCommunity(communitySlug).then((doc) => { if (doc) { - ws.send(JSON.stringify({ type: "sync", shapes: doc.shapes })); + const syncMessage = generateSyncMessageForPeer(communitySlug, peerId); + if (syncMessage) { + ws.send( + JSON.stringify({ + type: "sync", + data: Array.from(syncMessage), + }) + ); + } } }); }, message(ws: ServerWebSocket, message: string | Buffer) { - const { communitySlug } = ws.data; + const { communitySlug, peerId } = ws.data; try { - const data = JSON.parse(message.toString()); + const msg = JSON.parse(message.toString()); - if (data.type === "update" && data.id && data.data) { - // Update local store - updateShape(communitySlug, data.id, data.data); + if (msg.type === "sync" && Array.isArray(msg.data)) { + // Handle Automerge sync message + const syncMessage = new Uint8Array(msg.data); + const result = receiveSyncMessage(communitySlug, peerId, syncMessage); + // Send response to this peer + if (result.response) { + ws.send( + JSON.stringify({ + type: "sync", + data: Array.from(result.response), + }) + ); + } + + // Broadcast to other peers + for (const [targetPeerId, targetMessage] of result.broadcastToPeers) { + const targetClient = getClient(communitySlug, targetPeerId); + if (targetClient && targetClient.readyState === WebSocket.OPEN) { + targetClient.send( + JSON.stringify({ + type: "sync", + data: Array.from(targetMessage), + }) + ); + } + } + } else if (msg.type === "ping") { + // Handle keep-alive ping + ws.send(JSON.stringify({ type: "pong", timestamp: msg.timestamp })); + } else if (msg.type === "presence") { + // Broadcast presence to other clients + const clients = communityClients.get(communitySlug); + if (clients) { + const presenceMsg = JSON.stringify({ + type: "presence", + peerId, + ...msg, + }); + for (const [clientPeerId, client] of clients) { + if (clientPeerId !== peerId && client.readyState === WebSocket.OPEN) { + client.send(presenceMsg); + } + } + } + } + // Legacy message handling for backward compatibility + else if (msg.type === "update" && msg.id && msg.data) { + updateShape(communitySlug, msg.id, msg.data); // Broadcast to other clients - broadcastToCommunity(communitySlug, data, ws); - } else if (data.type === "delete" && data.id) { - // Delete from store - deleteShape(communitySlug, data.id); - + const clients = communityClients.get(communitySlug); + if (clients) { + const updateMsg = JSON.stringify(msg); + for (const [clientPeerId, client] of clients) { + if (clientPeerId !== peerId && client.readyState === WebSocket.OPEN) { + client.send(updateMsg); + } + } + } + } else if (msg.type === "delete" && msg.id) { + deleteShape(communitySlug, msg.id); // Broadcast to other clients - broadcastToCommunity(communitySlug, data, ws); + const clients = communityClients.get(communitySlug); + if (clients) { + const deleteMsg = JSON.stringify(msg); + for (const [clientPeerId, client] of clients) { + if (clientPeerId !== peerId && client.readyState === WebSocket.OPEN) { + client.send(deleteMsg); + } + } + } } } catch (e) { - console.error("Failed to parse WebSocket message:", e); + console.error("[WS] Failed to parse message:", e); } }, close(ws: ServerWebSocket) { - const { communitySlug } = ws.data; + const { communitySlug, peerId } = ws.data; - // Remove from clients set + // Remove from clients map const clients = communityClients.get(communitySlug); if (clients) { - clients.delete(ws); + clients.delete(peerId); if (clients.size === 0) { communityClients.delete(communitySlug); } } - console.log(`Client disconnected from ${communitySlug}`); + // Clean up peer sync state + removePeerSyncState(communitySlug, peerId); + + console.log(`[WS] Client ${peerId} disconnected from ${communitySlug}`); }, }, }); @@ -212,20 +286,26 @@ async function handleAPI(req: Request, url: URL): Promise { const { name, slug } = body; if (!name || !slug) { - return Response.json({ error: "Name and slug are required" }, { status: 400, headers: corsHeaders }); + return Response.json( + { error: "Name and slug are required" }, + { status: 400, headers: corsHeaders } + ); } // Validate slug format if (!/^[a-z0-9-]+$/.test(slug)) { return Response.json( { error: "Slug must contain only lowercase letters, numbers, and hyphens" }, - { status: 400, headers: corsHeaders }, + { status: 400, headers: corsHeaders } ); } // Check if exists if (await communityExists(slug)) { - return Response.json({ error: "Community already exists" }, { status: 409, headers: corsHeaders }); + return Response.json( + { error: "Community already exists" }, + { status: 409, headers: corsHeaders } + ); } // Create community @@ -234,24 +314,36 @@ async function handleAPI(req: Request, url: URL): Promise { // Return URL to new community return Response.json( { url: `https://${slug}.rspace.online`, slug, name }, - { headers: corsHeaders }, + { headers: corsHeaders } ); } catch (e) { console.error("Failed to create community:", e); - return Response.json({ error: "Failed to create community" }, { status: 500, headers: corsHeaders }); + return Response.json( + { error: "Failed to create community" }, + { status: 500, headers: corsHeaders } + ); } } // GET /api/communities/:slug - Get community info if (url.pathname.startsWith("/api/communities/") && req.method === "GET") { const slug = url.pathname.split("/")[3]; - const community = await loadCommunity(slug); + const data = getDocumentData(slug); - if (!community) { - return Response.json({ error: "Community not found" }, { status: 404, headers: corsHeaders }); + if (!data) { + // Try loading from disk + await loadCommunity(slug); + const loadedData = getDocumentData(slug); + if (!loadedData) { + return Response.json( + { error: "Community not found" }, + { status: 404, headers: corsHeaders } + ); + } + return Response.json({ meta: loadedData.meta }, { headers: corsHeaders }); } - return Response.json({ meta: community.meta }, { headers: corsHeaders }); + return Response.json({ meta: data.meta }, { headers: corsHeaders }); } return Response.json({ error: "Not found" }, { status: 404, headers: corsHeaders }); diff --git a/vite.config.ts b/vite.config.ts index 0d064ee..67d8b42 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -1,8 +1,10 @@ import { resolve } from "node:path"; import { defineConfig } from "vite"; +import wasm from "vite-plugin-wasm"; export default defineConfig({ root: "website", + plugins: [wasm()], resolve: { alias: { "@lib": resolve(__dirname, "./lib"), @@ -25,4 +27,7 @@ export default defineConfig({ server: { port: 5173, }, + optimizeDeps: { + exclude: ["@automerge/automerge"], + }, }); diff --git a/website/canvas.html b/website/canvas.html index 476654c..2cf0d26 100644 --- a/website/canvas.html +++ b/website/canvas.html @@ -84,14 +84,34 @@ font-size: 12px; color: #64748b; z-index: 1000; + display: flex; + align-items: center; + gap: 8px; } - #status.connected { - color: #22c55e; + #status .indicator { + width: 8px; + height: 8px; + border-radius: 50%; + background: #64748b; } - #status.disconnected { - color: #ef4444; + #status.connected .indicator { + background: #22c55e; + } + + #status.disconnected .indicator { + background: #ef4444; + } + + #status.syncing .indicator { + background: #f59e0b; + animation: pulse 1s infinite; + } + + @keyframes pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.5; } } #canvas { @@ -117,18 +137,21 @@
- + - +
-
Connecting...
+
+ + Connecting... +