From 3d5a14206238e9b54fc6b7079cd53df1475fb1ad Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Wed, 25 Feb 2026 05:19:21 +0000 Subject: [PATCH] feat: wire DocSyncManager into rSpace WebSocket server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Protocol multiplexing on existing /ws/{slug} endpoint: - Messages with docId (subscribe/unsubscribe/sync/awareness) → SyncServer - Messages without docId → legacy canvas handlers (unchanged) New files: doc-persistence.ts (debounced Automerge save/load), sync-instance.ts (SyncServer singleton with participant mode). Co-Authored-By: Claude Opus 4.6 --- docker-compose.yml | 3 + server/index.ts | 23 ++++++- server/local-first/doc-persistence.ts | 97 +++++++++++++++++++++++++++ server/local-first/sync-server.ts | 12 +++- server/sync-instance.ts | 14 ++++ 5 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 server/local-first/doc-persistence.ts create mode 100644 server/sync-instance.ts diff --git a/docker-compose.yml b/docker-compose.yml index aa21c5e..15d26fd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,7 @@ services: - rspace-swag:/data/swag-artifacts - rspace-files:/data/files - rspace-splats:/data/splats + - rspace-docs:/data/docs environment: - NODE_ENV=production - STORAGE_DIR=/data/communities @@ -20,6 +21,7 @@ services: - PORT=3000 - FILES_DIR=/data/files - SPLATS_DIR=/data/splats + - DOCS_STORAGE_DIR=/data/docs - INFISICAL_CLIENT_ID=${INFISICAL_CLIENT_ID} - INFISICAL_CLIENT_SECRET=${INFISICAL_CLIENT_SECRET} - INFISICAL_PROJECT_SLUG=rspace @@ -141,6 +143,7 @@ volumes: rspace-swag: rspace-files: rspace-splats: + rspace-docs: rspace-pgdata: networks: diff --git a/server/index.ts b/server/index.ts index 3c7dd6b..2dfa71c 100644 --- a/server/index.ts +++ b/server/index.ts @@ -64,6 +64,8 @@ import { dataModule } from "../modules/data/mod"; import { splatModule } from "../modules/splat/mod"; import { spaces } from "./spaces"; import { renderShell } from "./shell"; +import { syncServer } from "./sync-instance"; +import { loadAllDocs } from "./local-first/doc-persistence"; // Register modules registerModule(canvasModule); @@ -639,13 +641,16 @@ const server = Bun.serve({ // ── WebSocket handlers (unchanged) ── websocket: { open(ws: ServerWebSocket) { - const { communitySlug, peerId, mode, nestFrom, nestFilter } = ws.data; + const { communitySlug, peerId, mode, nestFrom, nestFilter, claims } = ws.data; if (!communityClients.has(communitySlug)) { communityClients.set(communitySlug, new Map()); } communityClients.get(communitySlug)!.set(peerId, ws); + // Register with DocSyncManager for multi-doc sync + syncServer.addPeer(peerId, ws, claims ? { sub: claims.sub, username: claims.username } : undefined); + const nestLabel = nestFrom ? ` (nested from ${nestFrom})` : ""; console.log(`[WS] Client ${peerId} connected to ${communitySlug} (mode: ${mode})${nestLabel}`); @@ -685,6 +690,20 @@ const server = Bun.serve({ try { const msg = JSON.parse(message.toString()); + // ── DocSyncManager protocol (messages with docId) ── + if (msg.type === "subscribe" || msg.type === "unsubscribe" || msg.type === "awareness") { + if (msg.docIds || msg.docId) { + syncServer.handleMessage(peerId, message.toString()); + return; + } + } + if (msg.type === "sync" && msg.docId) { + // New protocol: sync with docId → route to SyncServer + syncServer.handleMessage(peerId, message.toString()); + return; + } + + // ── Legacy canvas protocol (no docId) ── if (msg.type === "sync" && Array.isArray(msg.data)) { if (ws.data.readOnly) { ws.send(JSON.stringify({ type: "error", message: "Authentication required to edit this space" })); @@ -782,6 +801,7 @@ const server = Bun.serve({ if (clients.size === 0) communityClients.delete(communitySlug); } removePeerSyncState(communitySlug, peerId); + syncServer.removePeer(peerId); console.log(`[WS] Client ${peerId} disconnected from ${communitySlug}`); }, }, @@ -790,6 +810,7 @@ const server = Bun.serve({ // ── Startup ── ensureDemoCommunity().then(() => console.log("[Demo] Demo community ready")).catch((e) => console.error("[Demo] Failed:", e)); ensureCampaignDemo().then(() => console.log("[Campaign] Campaign demo ready")).catch((e) => console.error("[Campaign] Failed:", e)); +loadAllDocs(syncServer).catch((e) => console.error("[DocStore] Startup load failed:", e)); console.log(`rSpace unified server running on http://localhost:${PORT}`); console.log(`Modules: ${getAllModules().map((m) => `${m.icon} ${m.name}`).join(", ")}`); diff --git a/server/local-first/doc-persistence.ts b/server/local-first/doc-persistence.ts new file mode 100644 index 0000000..8f1e347 --- /dev/null +++ b/server/local-first/doc-persistence.ts @@ -0,0 +1,97 @@ +/** + * Doc Persistence — Maps docIds to filesystem paths and provides debounced save/load. + * + * Storage layout: {DOCS_STORAGE_DIR}/{space}/{module}/{collection}[/{itemId}].automerge + * Example: /data/docs/demo/notes/notebooks/abc.automerge + */ + +import { resolve, dirname } from "node:path"; +import { mkdir, readdir, readFile, writeFile, stat } from "node:fs/promises"; +import * as Automerge from "@automerge/automerge"; +import type { SyncServer } from "./sync-server"; + +const DOCS_DIR = process.env.DOCS_STORAGE_DIR || "/data/docs"; +const SAVE_DEBOUNCE_MS = 2000; + +/** Convert a docId like "demo:notes:notebooks:abc" to a filesystem path. */ +export function docIdToPath(docId: string): string { + const parts = docId.split(":"); + if (parts.length < 3) throw new Error(`Invalid docId: ${docId}`); + // Last part becomes the filename, rest become directories + const filename = parts.pop()! + ".automerge"; + return resolve(DOCS_DIR, ...parts, filename); +} + +/** Convert a filesystem path back to a docId. */ +function pathToDocId(filePath: string): string { + const rel = filePath.slice(DOCS_DIR.length + 1); // strip leading dir + / + const withoutExt = rel.replace(/\.automerge$/, ""); + return withoutExt.split("/").join(":"); +} + +// Debounce timers per docId +const saveTimers = new Map>(); + +/** Debounced save — writes Automerge binary to disk after SAVE_DEBOUNCE_MS. */ +export function saveDoc(docId: string, doc: Automerge.Doc): void { + const existing = saveTimers.get(docId); + if (existing) clearTimeout(existing); + + saveTimers.set( + docId, + setTimeout(async () => { + saveTimers.delete(docId); + try { + const filePath = docIdToPath(docId); + await mkdir(dirname(filePath), { recursive: true }); + const binary = Automerge.save(doc); + await writeFile(filePath, binary); + console.log(`[DocStore] Saved ${docId} (${binary.byteLength} bytes)`); + } catch (e) { + console.error(`[DocStore] Failed to save ${docId}:`, e); + } + }, SAVE_DEBOUNCE_MS) + ); +} + +/** Recursively scan DOCS_DIR and load all .automerge files into the SyncServer. */ +export async function loadAllDocs(syncServer: SyncServer): Promise { + let count = 0; + try { + await mkdir(DOCS_DIR, { recursive: true }); + count = await scanDir(DOCS_DIR, syncServer); + } catch (e) { + console.error("[DocStore] Failed to scan docs directory:", e); + } + console.log(`[DocStore] Loaded ${count} documents from ${DOCS_DIR}`); + return count; +} + +async function scanDir(dir: string, syncServer: SyncServer): Promise { + let count = 0; + let entries; + try { + entries = await readdir(dir, { withFileTypes: true }); + } catch { + return 0; + } + + for (const entry of entries) { + const fullPath = resolve(dir, entry.name); + if (entry.isDirectory()) { + count += await scanDir(fullPath, syncServer); + } else if (entry.name.endsWith(".automerge")) { + try { + const binary = await readFile(fullPath); + const doc = Automerge.load(new Uint8Array(binary)); + const docId = pathToDocId(fullPath); + syncServer.setDoc(docId, doc); + count++; + } catch (e) { + console.error(`[DocStore] Failed to load ${fullPath}:`, e); + } + } + } + + return count; +} diff --git a/server/local-first/sync-server.ts b/server/local-first/sync-server.ts index 6af2e20..417d5e0 100644 --- a/server/local-first/sync-server.ts +++ b/server/local-first/sync-server.ts @@ -18,6 +18,12 @@ import * as Automerge from '@automerge/automerge'; // TYPES // ============================================================================ +/** Minimal WebSocket interface — works with both browser WebSocket and Bun ServerWebSocket */ +export interface SyncWebSocket { + send(data: string): void; + readyState: number; +} + interface SyncMessage { type: 'sync'; docId: string; @@ -54,7 +60,7 @@ type WireMessage = export interface Peer { id: string; - ws: WebSocket; + ws: SyncWebSocket; subscribedDocs: Set; syncStates: Map; claims?: Record; // Auth claims @@ -86,7 +92,7 @@ export class SyncServer { /** * Register a new WebSocket peer. */ - addPeer(peerId: string, ws: WebSocket, claims?: Record): Peer { + addPeer(peerId: string, ws: SyncWebSocket, claims?: Record): Peer { const peer: Peer = { id: peerId, ws, @@ -343,7 +349,7 @@ export class SyncServer { #sendToPeer(peer: Peer, msg: object): void { try { - if (peer.ws.readyState === WebSocket.OPEN) { + if (peer.ws.readyState === 1) { peer.ws.send(JSON.stringify(msg)); } } catch (e) { diff --git a/server/sync-instance.ts b/server/sync-instance.ts new file mode 100644 index 0000000..09b236f --- /dev/null +++ b/server/sync-instance.ts @@ -0,0 +1,14 @@ +/** + * SyncServer singleton — shared across server/index.ts and modules. + * + * Participant mode: server maintains its own Automerge docs. + * On any doc change, debounced-save to disk via doc-persistence. + */ + +import { SyncServer } from "./local-first/sync-server"; +import { saveDoc } from "./local-first/doc-persistence"; + +export const syncServer = new SyncServer({ + participantMode: true, + onDocChange: (docId, doc) => saveDoc(docId, doc), +});