feat: wire DocSyncManager into rSpace WebSocket server

Protocol multiplexing on existing /ws/{slug} endpoint:
- Messages with docId (subscribe/unsubscribe/sync/awareness) → SyncServer
- Messages without docId → legacy canvas handlers (unchanged)

New files: doc-persistence.ts (debounced Automerge save/load),
sync-instance.ts (SyncServer singleton with participant mode).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-02-25 05:19:21 +00:00
parent 1cb361637d
commit 3d5a142062
5 changed files with 145 additions and 4 deletions

View File

@ -12,6 +12,7 @@ services:
- rspace-swag:/data/swag-artifacts
- rspace-files:/data/files
- rspace-splats:/data/splats
- rspace-docs:/data/docs
environment:
- NODE_ENV=production
- STORAGE_DIR=/data/communities
@ -20,6 +21,7 @@ services:
- PORT=3000
- FILES_DIR=/data/files
- SPLATS_DIR=/data/splats
- DOCS_STORAGE_DIR=/data/docs
- INFISICAL_CLIENT_ID=${INFISICAL_CLIENT_ID}
- INFISICAL_CLIENT_SECRET=${INFISICAL_CLIENT_SECRET}
- INFISICAL_PROJECT_SLUG=rspace
@ -141,6 +143,7 @@ volumes:
rspace-swag:
rspace-files:
rspace-splats:
rspace-docs:
rspace-pgdata:
networks:

View File

@ -64,6 +64,8 @@ import { dataModule } from "../modules/data/mod";
import { splatModule } from "../modules/splat/mod";
import { spaces } from "./spaces";
import { renderShell } from "./shell";
import { syncServer } from "./sync-instance";
import { loadAllDocs } from "./local-first/doc-persistence";
// Register modules
registerModule(canvasModule);
@ -639,13 +641,16 @@ const server = Bun.serve<WSData>({
// ── WebSocket handlers (unchanged) ──
websocket: {
open(ws: ServerWebSocket<WSData>) {
const { communitySlug, peerId, mode, nestFrom, nestFilter } = ws.data;
const { communitySlug, peerId, mode, nestFrom, nestFilter, claims } = ws.data;
if (!communityClients.has(communitySlug)) {
communityClients.set(communitySlug, new Map());
}
communityClients.get(communitySlug)!.set(peerId, ws);
// Register with DocSyncManager for multi-doc sync
syncServer.addPeer(peerId, ws, claims ? { sub: claims.sub, username: claims.username } : undefined);
const nestLabel = nestFrom ? ` (nested from ${nestFrom})` : "";
console.log(`[WS] Client ${peerId} connected to ${communitySlug} (mode: ${mode})${nestLabel}`);
@ -685,6 +690,20 @@ const server = Bun.serve<WSData>({
try {
const msg = JSON.parse(message.toString());
// ── DocSyncManager protocol (messages with docId) ──
if (msg.type === "subscribe" || msg.type === "unsubscribe" || msg.type === "awareness") {
if (msg.docIds || msg.docId) {
syncServer.handleMessage(peerId, message.toString());
return;
}
}
if (msg.type === "sync" && msg.docId) {
// New protocol: sync with docId → route to SyncServer
syncServer.handleMessage(peerId, message.toString());
return;
}
// ── Legacy canvas protocol (no docId) ──
if (msg.type === "sync" && Array.isArray(msg.data)) {
if (ws.data.readOnly) {
ws.send(JSON.stringify({ type: "error", message: "Authentication required to edit this space" }));
@ -782,6 +801,7 @@ const server = Bun.serve<WSData>({
if (clients.size === 0) communityClients.delete(communitySlug);
}
removePeerSyncState(communitySlug, peerId);
syncServer.removePeer(peerId);
console.log(`[WS] Client ${peerId} disconnected from ${communitySlug}`);
},
},
@ -790,6 +810,7 @@ const server = Bun.serve<WSData>({
// ── Startup ──
ensureDemoCommunity().then(() => console.log("[Demo] Demo community ready")).catch((e) => console.error("[Demo] Failed:", e));
ensureCampaignDemo().then(() => console.log("[Campaign] Campaign demo ready")).catch((e) => console.error("[Campaign] Failed:", e));
loadAllDocs(syncServer).catch((e) => console.error("[DocStore] Startup load failed:", e));
console.log(`rSpace unified server running on http://localhost:${PORT}`);
console.log(`Modules: ${getAllModules().map((m) => `${m.icon} ${m.name}`).join(", ")}`);

View File

@ -0,0 +1,97 @@
/**
* Doc Persistence Maps docIds to filesystem paths and provides debounced save/load.
*
* Storage layout: {DOCS_STORAGE_DIR}/{space}/{module}/{collection}[/{itemId}].automerge
* Example: /data/docs/demo/notes/notebooks/abc.automerge
*/
import { resolve, dirname } from "node:path";
import { mkdir, readdir, readFile, writeFile, stat } from "node:fs/promises";
import * as Automerge from "@automerge/automerge";
import type { SyncServer } from "./sync-server";
const DOCS_DIR = process.env.DOCS_STORAGE_DIR || "/data/docs";
const SAVE_DEBOUNCE_MS = 2000;
/** Convert a docId like "demo:notes:notebooks:abc" to a filesystem path. */
export function docIdToPath(docId: string): string {
const parts = docId.split(":");
if (parts.length < 3) throw new Error(`Invalid docId: ${docId}`);
// Last part becomes the filename, rest become directories
const filename = parts.pop()! + ".automerge";
return resolve(DOCS_DIR, ...parts, filename);
}
/** Convert a filesystem path back to a docId. */
function pathToDocId(filePath: string): string {
const rel = filePath.slice(DOCS_DIR.length + 1); // strip leading dir + /
const withoutExt = rel.replace(/\.automerge$/, "");
return withoutExt.split("/").join(":");
}
// Debounce timers per docId
const saveTimers = new Map<string, ReturnType<typeof setTimeout>>();
/** Debounced save — writes Automerge binary to disk after SAVE_DEBOUNCE_MS. */
export function saveDoc(docId: string, doc: Automerge.Doc<any>): void {
const existing = saveTimers.get(docId);
if (existing) clearTimeout(existing);
saveTimers.set(
docId,
setTimeout(async () => {
saveTimers.delete(docId);
try {
const filePath = docIdToPath(docId);
await mkdir(dirname(filePath), { recursive: true });
const binary = Automerge.save(doc);
await writeFile(filePath, binary);
console.log(`[DocStore] Saved ${docId} (${binary.byteLength} bytes)`);
} catch (e) {
console.error(`[DocStore] Failed to save ${docId}:`, e);
}
}, SAVE_DEBOUNCE_MS)
);
}
/** Recursively scan DOCS_DIR and load all .automerge files into the SyncServer. */
export async function loadAllDocs(syncServer: SyncServer): Promise<number> {
let count = 0;
try {
await mkdir(DOCS_DIR, { recursive: true });
count = await scanDir(DOCS_DIR, syncServer);
} catch (e) {
console.error("[DocStore] Failed to scan docs directory:", e);
}
console.log(`[DocStore] Loaded ${count} documents from ${DOCS_DIR}`);
return count;
}
async function scanDir(dir: string, syncServer: SyncServer): Promise<number> {
let count = 0;
let entries;
try {
entries = await readdir(dir, { withFileTypes: true });
} catch {
return 0;
}
for (const entry of entries) {
const fullPath = resolve(dir, entry.name);
if (entry.isDirectory()) {
count += await scanDir(fullPath, syncServer);
} else if (entry.name.endsWith(".automerge")) {
try {
const binary = await readFile(fullPath);
const doc = Automerge.load(new Uint8Array(binary));
const docId = pathToDocId(fullPath);
syncServer.setDoc(docId, doc);
count++;
} catch (e) {
console.error(`[DocStore] Failed to load ${fullPath}:`, e);
}
}
}
return count;
}

View File

@ -18,6 +18,12 @@ import * as Automerge from '@automerge/automerge';
// TYPES
// ============================================================================
/** Minimal WebSocket interface — works with both browser WebSocket and Bun ServerWebSocket */
export interface SyncWebSocket {
send(data: string): void;
readyState: number;
}
interface SyncMessage {
type: 'sync';
docId: string;
@ -54,7 +60,7 @@ type WireMessage =
export interface Peer {
id: string;
ws: WebSocket;
ws: SyncWebSocket;
subscribedDocs: Set<string>;
syncStates: Map<string, Automerge.SyncState>;
claims?: Record<string, unknown>; // Auth claims
@ -86,7 +92,7 @@ export class SyncServer {
/**
* Register a new WebSocket peer.
*/
addPeer(peerId: string, ws: WebSocket, claims?: Record<string, unknown>): Peer {
addPeer(peerId: string, ws: SyncWebSocket, claims?: Record<string, unknown>): Peer {
const peer: Peer = {
id: peerId,
ws,
@ -343,7 +349,7 @@ export class SyncServer {
#sendToPeer(peer: Peer, msg: object): void {
try {
if (peer.ws.readyState === WebSocket.OPEN) {
if (peer.ws.readyState === 1) {
peer.ws.send(JSON.stringify(msg));
}
} catch (e) {

14
server/sync-instance.ts Normal file
View File

@ -0,0 +1,14 @@
/**
* SyncServer singleton shared across server/index.ts and modules.
*
* Participant mode: server maintains its own Automerge docs.
* On any doc change, debounced-save to disk via doc-persistence.
*/
import { SyncServer } from "./local-first/sync-server";
import { saveDoc } from "./local-first/doc-persistence";
export const syncServer = new SyncServer({
participantMode: true,
onDocChange: (docId, doc) => saveDoc(docId, doc),
});