rspace-online/server/local-first/doc-persistence.ts

199 lines
5.9 KiB
TypeScript

/**
* Doc Persistence — Maps docIds to filesystem paths and provides debounced save/load.
*
* Storage layout: {DOCS_STORAGE_DIR}/{space}/{module}/{collection}[/{itemId}].automerge
* Example: /data/docs/demo/notes/notebooks/abc.automerge
*
* Encrypted docs: Same path but content is rSEN-encrypted (server-side at-rest encryption).
* Opaque blobs: {path}.automerge.enc — relay-mode encrypted blobs the server can't decrypt.
*/
import { resolve, dirname } from "node:path";
import { mkdir, readdir, readFile, writeFile, stat, unlink } from "node:fs/promises";
import * as Automerge from "@automerge/automerge";
import type { SyncServer } from "./sync-server";
import {
deriveSpaceKey,
encryptBinary,
decryptBinary,
isEncryptedFile,
packEncrypted,
unpackEncrypted,
} from "./encryption-utils";
const DOCS_DIR = process.env.DOCS_STORAGE_DIR || "/data/docs";
const SAVE_DEBOUNCE_MS = 2000;
/** Convert a docId like "demo:notes:notebooks:abc" to a filesystem path. */
export function docIdToPath(docId: string): string {
const parts = docId.split(":");
if (parts.length < 3) throw new Error(`Invalid docId: ${docId}`);
// Last part becomes the filename, rest become directories
const filename = parts.pop()! + ".automerge";
return resolve(DOCS_DIR, ...parts, filename);
}
/** Convert a filesystem path back to a docId. */
function pathToDocId(filePath: string): string {
const rel = filePath.slice(DOCS_DIR.length + 1); // strip leading dir + /
const withoutExt = rel.replace(/\.automerge(\.enc)?$/, "");
return withoutExt.split("/").join(":");
}
// Debounce timers per docId
const saveTimers = new Map<string, ReturnType<typeof setTimeout>>();
/**
* Debounced save — writes Automerge binary to disk after SAVE_DEBOUNCE_MS.
* If encryptionKeyId is provided, encrypts with rSEN header before writing.
*/
export function saveDoc(
docId: string,
doc: Automerge.Doc<any>,
encryptionKeyId?: string,
): void {
const existing = saveTimers.get(docId);
if (existing) clearTimeout(existing);
saveTimers.set(
docId,
setTimeout(async () => {
saveTimers.delete(docId);
try {
const filePath = docIdToPath(docId);
await mkdir(dirname(filePath), { recursive: true });
const binary = Automerge.save(doc);
if (encryptionKeyId) {
const key = await deriveSpaceKey(encryptionKeyId);
const ciphertext = await encryptBinary(binary, key);
const packed = packEncrypted(encryptionKeyId, ciphertext);
await writeFile(filePath, packed);
console.log(
`[DocStore] Saved ${docId} encrypted (${packed.byteLength} bytes)`,
);
} else {
await writeFile(filePath, binary);
console.log(
`[DocStore] Saved ${docId} (${binary.byteLength} bytes)`,
);
}
} catch (e) {
console.error(`[DocStore] Failed to save ${docId}:`, e);
}
}, SAVE_DEBOUNCE_MS),
);
}
/**
* Save an opaque encrypted blob for relay-mode docs.
* These are client-encrypted blobs the server cannot decrypt.
* Stored as {docIdPath}.automerge.enc
*/
export async function saveEncryptedBlob(
docId: string,
blob: Uint8Array,
): Promise<void> {
try {
const basePath = docIdToPath(docId);
const encPath = basePath.replace(/\.automerge$/, ".automerge.enc");
await mkdir(dirname(encPath), { recursive: true });
await writeFile(encPath, blob);
console.log(
`[DocStore] Saved encrypted blob ${docId} (${blob.byteLength} bytes)`,
);
} catch (e) {
console.error(`[DocStore] Failed to save encrypted blob ${docId}:`, e);
}
}
/**
* Load an opaque encrypted blob for relay-mode docs.
* Returns null if no blob exists.
*/
export async function loadEncryptedBlob(
docId: string,
): Promise<Uint8Array | null> {
try {
const basePath = docIdToPath(docId);
const encPath = basePath.replace(/\.automerge$/, ".automerge.enc");
const file = Bun.file(encPath);
if (await file.exists()) {
const buffer = await file.arrayBuffer();
return new Uint8Array(buffer);
}
} catch {
// File doesn't exist or read failed
}
return null;
}
/**
* Recursively scan DOCS_DIR and load all .automerge files into the SyncServer.
* Detects rSEN-encrypted files and decrypts them before loading.
* Skips .automerge.enc files (opaque relay blobs — not Automerge docs).
*/
export async function loadAllDocs(syncServer: SyncServer): Promise<number> {
let count = 0;
try {
await mkdir(DOCS_DIR, { recursive: true });
count = await scanDir(DOCS_DIR, syncServer);
} catch (e) {
console.error("[DocStore] Failed to scan docs directory:", e);
}
console.log(`[DocStore] Loaded ${count} documents from ${DOCS_DIR}`);
return count;
}
async function scanDir(dir: string, syncServer: SyncServer): Promise<number> {
let count = 0;
let entries;
try {
entries = await readdir(dir, { withFileTypes: true });
} catch {
return 0;
}
for (const entry of entries) {
const fullPath = resolve(dir, entry.name);
if (entry.isDirectory()) {
count += await scanDir(fullPath, syncServer);
} else if (entry.name.endsWith(".automerge.enc")) {
// Skip opaque relay blobs — they're not loadable Automerge docs
continue;
} else if (entry.name.endsWith(".automerge")) {
try {
const raw = await readFile(fullPath);
let bytes = new Uint8Array(raw);
// Detect and decrypt rSEN-encrypted files
if (isEncryptedFile(bytes)) {
try {
const { keyId, ciphertext } = unpackEncrypted(bytes);
const key = await deriveSpaceKey(keyId);
bytes = new Uint8Array(await decryptBinary(ciphertext, key));
console.log(
`[DocStore] Decrypted ${entry.name} (keyId: ${keyId})`,
);
} catch (e) {
console.error(
`[DocStore] Failed to decrypt ${fullPath}:`,
e,
);
continue;
}
}
const doc = Automerge.load(bytes);
const docId = pathToDocId(fullPath);
syncServer.setDoc(docId, doc);
count++;
} catch (e) {
console.error(`[DocStore] Failed to load ${fullPath}:`, e);
}
}
}
return count;
}