fix(persistence): flush docs on shutdown, fix eviction race, mobile flush

- Add saveDocImmediate() for synchronous awaitable saves (no debounce)
- Add SyncServer.flushAll() to iterate all in-memory docs
- Fix eviction race: onDocEvict now uses saveDocImmediate instead of
  debounced saveDoc (which could fire after doc deleted from memory)
- Add SIGTERM/SIGINT handlers with 10s timeout safety net
- Add visibilitychange flush on client (reliable on mobile/bfcache)
- Flush pending IDB saves in DocSyncManager.disconnect()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-16 08:45:01 -04:00
parent 63c6fcc941
commit fec934b8a3
6 changed files with 119 additions and 23 deletions

View File

@ -92,12 +92,13 @@ import { sheetsModule } from "../modules/rsheets/mod";
import { exchangeModule } from "../modules/rexchange/mod";
import { auctionsModule } from "../modules/rauctions/mod";
import { credModule } from "../modules/rcred/mod";
import { feedsModule } from "../modules/rfeeds/mod";
import { spaces, createSpace, resolveCallerRole, roleAtLeast } from "./spaces";
import type { SpaceRoleString } from "./spaces";
import { renderShell, renderSubPageInfo, renderModuleLanding, renderOnboarding, setFragmentMode } from "./shell";
import { renderOutputListPage } from "./output-list";
import { renderMainLanding, renderSpaceDashboard } from "./landing";
import { syncServer } from "./sync-instance";
import { syncServer, flushAndShutdown } from "./sync-instance";
import { loadAllDocs } from "./local-first/doc-persistence";
import { backupRouter } from "./local-first/backup-routes";
import { ipfsRouter } from "./ipfs-routes";
@ -123,6 +124,27 @@ process.on('unhandledRejection', (reason) => {
console.error('[FATAL] Unhandled rejection (swallowed):', reason);
});
// ── Graceful shutdown — flush all in-memory docs to disk ──
let shuttingDown = false;
async function gracefulShutdown(signal: string) {
if (shuttingDown) return;
shuttingDown = true;
console.log(`[Server] ${signal} received — flushing docs to disk...`);
const timeout = setTimeout(() => {
console.error("[Server] Flush timed out after 10s — forcing exit");
process.exit(1);
}, 10_000);
try {
await flushAndShutdown();
} catch (e) {
console.error("[Server] Flush error during shutdown:", e);
}
clearTimeout(timeout);
process.exit(0);
}
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
// Register modules (order determines app-switcher menu position)
registerModule(canvasModule);
registerModule(pubsModule);
@ -155,6 +177,7 @@ registerModule(govModule); // Governance decision circuits
registerModule(exchangeModule); // P2P crypto/fiat exchange
registerModule(auctionsModule); // Community auctions with USDC
registerModule(credModule); // Contribution recognition via CredRank
registerModule(feedsModule); // Community RSS dashboard
registerModule(designModule); // Scribus DTP + AI design agent
// De-emphasized modules (bottom of menu)
registerModule(forumModule);

View File

@ -43,6 +43,33 @@ function pathToDocId(filePath: string): string {
// Debounce timers per docId
const saveTimers = new Map<string, ReturnType<typeof setTimeout>>();
/** Shared write logic — saves Automerge doc to disk, optionally encrypted. */
async function writeDocToDisk(
docId: string,
doc: Automerge.Doc<any>,
encryptionKeyId?: string,
label = "Saved",
): Promise<void> {
const filePath = docIdToPath(docId);
await mkdir(dirname(filePath), { recursive: true });
const binary = Automerge.save(doc);
if (encryptionKeyId) {
const key = await deriveSpaceKey(encryptionKeyId);
const ciphertext = await encryptBinary(binary, key);
const packed = packEncrypted(encryptionKeyId, ciphertext);
await writeFile(filePath, packed);
console.log(
`[DocStore] ${label} ${docId} encrypted (${packed.byteLength} bytes)`,
);
} else {
await writeFile(filePath, binary);
console.log(
`[DocStore] ${label} ${docId} (${binary.byteLength} bytes)`,
);
}
}
/**
* Debounced save writes Automerge binary to disk after SAVE_DEBOUNCE_MS.
* If encryptionKeyId is provided, encrypts with rSEN header before writing.
@ -60,24 +87,7 @@ export function saveDoc(
setTimeout(async () => {
saveTimers.delete(docId);
try {
const filePath = docIdToPath(docId);
await mkdir(dirname(filePath), { recursive: true });
const binary = Automerge.save(doc);
if (encryptionKeyId) {
const key = await deriveSpaceKey(encryptionKeyId);
const ciphertext = await encryptBinary(binary, key);
const packed = packEncrypted(encryptionKeyId, ciphertext);
await writeFile(filePath, packed);
console.log(
`[DocStore] Saved ${docId} encrypted (${packed.byteLength} bytes)`,
);
} else {
await writeFile(filePath, binary);
console.log(
`[DocStore] Saved ${docId} (${binary.byteLength} bytes)`,
);
}
await writeDocToDisk(docId, doc, encryptionKeyId);
} catch (e) {
console.error(`[DocStore] Failed to save ${docId}:`, e);
}
@ -85,6 +95,31 @@ export function saveDoc(
);
}
/**
* Immediate save writes Automerge doc to disk synchronously (no debounce).
* Cancels any pending debounced save for this docId.
*/
export async function saveDocImmediate(
docId: string,
doc: Automerge.Doc<any>,
encryptionKeyId?: string,
): Promise<void> {
// Cancel pending debounce timer
const existing = saveTimers.get(docId);
if (existing) {
clearTimeout(existing);
saveTimers.delete(docId);
}
await writeDocToDisk(docId, doc, encryptionKeyId, "Saved immediately");
}
/**
* Returns the set of docIds that have pending (unsaved) debounce timers.
*/
export function getPendingDocIds(): Set<string> {
return new Set(saveTimers.keys());
}
/**
* Save an opaque encrypted blob for relay-mode docs.
* These are client-encrypted blobs the server cannot decrypt.

View File

@ -330,6 +330,20 @@ export class SyncServer {
return Array.from(this.#docSubscribers.get(docId) ?? []);
}
/**
* Iterate all in-memory docs and call the callback for each.
* Uses Promise.allSettled so one failure doesn't block others.
*/
async flushAll(cb: (docId: string, doc: Automerge.Doc<any>) => Promise<void>): Promise<void> {
const tasks: Promise<void>[] = [];
for (const [docId, doc] of this.#docs) {
tasks.push(cb(docId, doc).catch(e => {
console.error(`[SyncServer] flushAll failed for ${docId}:`, e);
}));
}
await Promise.allSettled(tasks);
}
// ---------- Private ----------
#handleSubscribe(peer: Peer, msg: SubscribeMessage): void {

View File

@ -12,7 +12,7 @@
*/
import { SyncServer } from "./local-first/sync-server";
import { saveDoc, saveEncryptedBlob, loadEncryptedBlob } from "./local-first/doc-persistence";
import { saveDoc, saveDocImmediate, saveEncryptedBlob, loadEncryptedBlob } from "./local-first/doc-persistence";
import { getDocumentData } from "./community-store";
import { spaceKnowledgeIndex } from "./space-knowledge";
@ -41,9 +41,11 @@ export const syncServer = new SyncServer({
saveDoc(docId, doc, encryptionKeyId);
},
onDocEvict: (docId, doc) => {
// Persist to disk before evicting from memory
// Persist to disk immediately before evicting from memory (no debounce!)
const encryptionKeyId = getEncryptionKeyId(docId);
saveDoc(docId, doc, encryptionKeyId);
saveDocImmediate(docId, doc, encryptionKeyId).catch(e => {
console.error(`[SyncInstance] Eviction save failed for ${docId}:`, e);
});
},
onRelayBackup: (docId, blob) => {
saveEncryptedBlob(docId, blob);
@ -52,3 +54,17 @@ export const syncServer = new SyncServer({
return loadEncryptedBlob(docId);
},
});
/**
* Flush all in-memory docs to disk immediately, then resolve.
* Called on SIGTERM/SIGINT before process exit.
*/
export async function flushAndShutdown(): Promise<void> {
console.log("[SyncInstance] Flushing all docs to disk before shutdown...");
const start = Date.now();
await syncServer.flushAll(async (docId, doc) => {
const encryptionKeyId = getEncryptionKeyId(docId);
await saveDocImmediate(docId, doc, encryptionKeyId);
});
console.log(`[SyncInstance] Flush complete (${Date.now() - start}ms)`);
}

View File

@ -423,6 +423,8 @@ export class DocSyncManager {
disconnect(): void {
this.#autoReconnect = false;
this.#stopPing();
// Flush pending IDB saves for cache consistency
this.flush().catch(() => {});
if (this.#ws) {
this.#ws.close();
this.#ws = null;

View File

@ -38,7 +38,13 @@ export function initOffline(spaceSlug: string) {
console.warn("[shell] Offline runtime init failed — REST fallback only:", e);
});
// Flush pending writes before the page unloads
// Flush pending writes when tab is hidden (more reliable than beforeunload on mobile/bfcache)
document.addEventListener("visibilitychange", () => {
if (document.visibilityState === "hidden") {
runtime.flush();
}
});
// Flush pending writes before the page unloads (fallback)
window.addEventListener("beforeunload", () => {
runtime.flush();
});