rspace-online/shared/local-first/runtime.ts

273 lines
7.7 KiB
TypeScript

/**
* Offline Runtime — singleton coordinator for all modules.
*
* Composes DocumentManager, EncryptedDocStore, and DocSyncManager into a
* single entry point that web components use via `window.__rspaceOfflineRuntime`.
*
* Lifecycle:
* 1. Shell creates runtime with space slug
* 2. runtime.init() opens IndexedDB, connects WebSocket
* 3. Components call runtime.subscribe(docId, schema) to get a live doc
* 4. Components call runtime.change(docId, msg, fn) for mutations
* 5. Shell calls runtime.flush() on beforeunload
*/
import * as Automerge from '@automerge/automerge';
import {
type DocumentId,
type DocSchema,
makeDocumentId,
DocumentManager,
} from './document';
import { EncryptedDocStore } from './storage';
import { DocSyncManager } from './sync';
import { DocCrypto } from './crypto';
import {
getStorageInfo,
evictStaleDocs,
requestPersistentStorage,
QUOTA_WARNING_PERCENT,
} from './storage-quota';
// ============================================================================
// TYPES
// ============================================================================
export type RuntimeStatus = 'idle' | 'initializing' | 'online' | 'offline' | 'error';
export type StatusCallback = (status: RuntimeStatus) => void;
// ============================================================================
// RSpaceOfflineRuntime
// ============================================================================
export class RSpaceOfflineRuntime {
#documents: DocumentManager;
#store: EncryptedDocStore;
#sync: DocSyncManager;
#crypto: DocCrypto;
#space: string;
#status: RuntimeStatus = 'idle';
#statusListeners = new Set<StatusCallback>();
#initialized = false;
constructor(space: string) {
this.#space = space;
this.#crypto = new DocCrypto();
this.#documents = new DocumentManager();
this.#store = new EncryptedDocStore(space);
this.#sync = new DocSyncManager({
documents: this.#documents,
store: this.#store,
});
}
// ── Getters ──
get space(): string { return this.#space; }
get isInitialized(): boolean { return this.#initialized; }
get isOnline(): boolean { return this.#sync.isConnected; }
get status(): RuntimeStatus { return this.#status; }
// ── Lifecycle ──
/**
* Open IndexedDB and connect to the sync server.
* Safe to call multiple times — no-ops if already initialized.
*/
async init(): Promise<void> {
if (this.#initialized) return;
this.#setStatus('initializing');
try {
// 1. Open IndexedDB
await this.#store.open();
// 2. Connect WebSocket (non-blocking — works offline)
const proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${proto}//${location.host}/ws/${this.#space}`;
this.#sync.onConnect(() => this.#setStatus('online'));
this.#sync.onDisconnect(() => this.#setStatus('offline'));
try {
await this.#sync.connect(wsUrl, this.#space);
this.#setStatus('online');
} catch {
// WebSocket failed — still usable offline
this.#setStatus('offline');
}
this.#initialized = true;
// 3. Storage housekeeping (non-blocking)
this.#runStorageHousekeeping();
} catch (e) {
this.#setStatus('error');
console.error('[OfflineRuntime] Init failed:', e);
throw e;
}
}
/**
* Subscribe to a document. Loads from IndexedDB (instant), syncs via WebSocket.
* Returns the current Automerge doc.
*/
async subscribe<T extends Record<string, any>>(
docId: DocumentId,
schema: DocSchema<T>,
): Promise<Automerge.Doc<T>> {
// Register schema for future use
this.#documents.registerSchema(schema);
// Try loading from IndexedDB first (instant cache hit)
const cached = await this.#store.load(docId);
const doc = this.#documents.open(docId, schema, cached ?? undefined);
// Subscribe for sync (sends subscribe + initial sync to server)
await this.#sync.subscribe([docId]);
return doc;
}
/**
* Unsubscribe from a document. Persists current state, stops syncing.
*/
unsubscribe(docId: DocumentId): void {
// Save before unsubscribing
const binary = this.#documents.save(docId);
if (binary) {
const meta = this.#documents.getMeta(docId);
this.#store.save(docId, binary, meta ? {
module: meta.module,
collection: meta.collection,
version: meta.version,
} : undefined);
}
this.#sync.unsubscribe([docId]);
this.#documents.close(docId);
}
/**
* Mutate a document. Persists to IndexedDB + syncs to server.
*/
change<T>(docId: DocumentId, message: string, fn: (doc: T) => void): void {
this.#sync.change<T>(docId, message, fn);
}
/**
* Read the current state of a document.
*/
get<T>(docId: DocumentId): Automerge.Doc<T> | undefined {
return this.#documents.get<T>(docId);
}
/**
* Listen for changes (local or remote) on a document.
* Returns an unsubscribe function.
*/
onChange<T>(docId: DocumentId, cb: (doc: Automerge.Doc<T>) => void): () => void {
// Listen to both DocumentManager (local changes) and DocSyncManager (remote)
const unsub1 = this.#documents.onChange<T>(docId, cb);
const unsub2 = this.#sync.onChange(docId, cb as any);
return () => { unsub1(); unsub2(); };
}
/**
* Build a DocumentId for a module using the current space.
*/
makeDocId(module: string, collection: string, itemId?: string): DocumentId {
return makeDocumentId(this.#space, module, collection, itemId);
}
/**
* List all cached doc IDs for a module/collection from IndexedDB.
*/
async listByModule(module: string, collection?: string): Promise<DocumentId[]> {
return this.#store.listByModule(module, collection);
}
/**
* Subscribe to all documents for a multi-doc module.
* Discovers doc IDs from both IndexedDB cache and the server,
* then subscribes to each. Returns all discovered docs.
*/
async subscribeModule<T extends Record<string, any>>(
module: string,
collection: string,
schema: DocSchema<T>,
): Promise<Map<DocumentId, Automerge.Doc<T>>> {
this.#documents.registerSchema(schema);
const prefix = `${this.#space}:${module}:${collection}`;
const docIds = await this.#sync.requestDocList(prefix);
const results = new Map<DocumentId, Automerge.Doc<T>>();
for (const id of docIds) {
const docId = id as DocumentId;
const doc = await this.subscribe<T>(docId, schema);
results.set(docId, doc);
}
return results;
}
/**
* Listen for runtime status changes (online/offline/syncing/error).
*/
onStatusChange(cb: StatusCallback): () => void {
this.#statusListeners.add(cb);
return () => { this.#statusListeners.delete(cb); };
}
/**
* Flush all pending writes to IndexedDB. Call on beforeunload.
*/
async flush(): Promise<void> {
await Promise.all([
this.#sync.flush(),
this.#store.flush(),
]);
}
/**
* Tear down: flush, disconnect, clear listeners.
*/
destroy(): void {
this.#sync.disconnect();
this.#statusListeners.clear();
this.#initialized = false;
this.#setStatus('idle');
}
// ── Private ──
async #runStorageHousekeeping(): Promise<void> {
try {
// Request persistent storage (browser may grant silently)
await requestPersistentStorage();
// Check quota and evict stale docs if needed
const info = await getStorageInfo();
if (info.percent >= QUOTA_WARNING_PERCENT) {
console.warn(
`[OfflineRuntime] Storage usage at ${info.percent}% ` +
`(${(info.usage / 1e6).toFixed(1)}MB / ${(info.quota / 1e6).toFixed(1)}MB). ` +
`Evicting stale documents...`
);
await evictStaleDocs();
}
} catch (e) {
console.warn('[OfflineRuntime] Storage housekeeping failed:', e);
}
}
#setStatus(status: RuntimeStatus): void {
if (this.#status === status) return;
this.#status = status;
for (const cb of this.#statusListeners) {
try { cb(status); } catch { /* ignore */ }
}
}
}