rspace-online/shared/local-first/runtime.ts

423 lines
13 KiB
TypeScript

/**
* Offline Runtime — singleton coordinator for all modules.
*
* Composes DocumentManager, EncryptedDocStore, and DocSyncManager into a
* single entry point that web components use via `window.__rspaceOfflineRuntime`.
*
* Lifecycle:
* 1. Shell creates runtime with space slug
* 2. runtime.init() opens IndexedDB, connects WebSocket
* 3. Components call runtime.subscribe(docId, schema) to get a live doc
* 4. Components call runtime.change(docId, msg, fn) for mutations
* 5. Shell calls runtime.flush() on beforeunload
*/
import * as Automerge from '@automerge/automerge';
import {
type DocumentId,
type DocSchema,
makeDocumentId,
DocumentManager,
} from './document';
import { EncryptedDocStore } from './storage';
import { DocSyncManager } from './sync';
import { DocCrypto } from './crypto';
import {
getStorageInfo,
evictStaleDocs,
requestPersistentStorage,
QUOTA_WARNING_PERCENT,
} from './storage-quota';
// ============================================================================
// TYPES
// ============================================================================
export type RuntimeStatus = 'idle' | 'initializing' | 'online' | 'offline' | 'error';
export type StatusCallback = (status: RuntimeStatus) => void;
// ============================================================================
// RSpaceOfflineRuntime
// ============================================================================
export class RSpaceOfflineRuntime {
#documents: DocumentManager;
#store: EncryptedDocStore;
#sync: DocSyncManager;
#crypto: DocCrypto;
#activeSpace: string;
#status: RuntimeStatus = 'idle';
#statusListeners = new Set<StatusCallback>();
#initialized = false;
/** Module scope config: moduleId → 'global' | 'space'. Set from page's module list. */
#moduleScopes = new Map<string, 'global' | 'space'>();
/** Lazy WebSocket connections per space slug (for cross-space subscriptions). */
#spaceConnections = new Map<string, DocSyncManager>();
constructor(space: string) {
this.#activeSpace = space;
this.#crypto = new DocCrypto();
this.#documents = new DocumentManager();
this.#store = new EncryptedDocStore(space);
this.#sync = new DocSyncManager({
documents: this.#documents,
store: this.#store,
});
}
// ── Getters ──
get space(): string { return this.#activeSpace; }
get isInitialized(): boolean { return this.#initialized; }
get isOnline(): boolean { return this.#sync.isConnected; }
get status(): RuntimeStatus { return this.#status; }
// ── Lifecycle ──
/**
* Open IndexedDB and connect to the sync server.
* Safe to call multiple times — no-ops if already initialized.
*/
async init(): Promise<void> {
if (this.#initialized) return;
this.#setStatus('initializing');
try {
// 1. Open IndexedDB
await this.#store.open();
// 2. Connect WebSocket (non-blocking — works offline)
const proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${proto}//${location.host}/ws/${this.#activeSpace}`;
this.#sync.onConnect(() => this.#setStatus('online'));
this.#sync.onDisconnect(() => this.#setStatus('offline'));
try {
await this.#sync.connect(wsUrl, this.#activeSpace);
this.#setStatus('online');
} catch {
// WebSocket failed — still usable offline
this.#setStatus('offline');
}
this.#initialized = true;
// 3. Storage housekeeping (non-blocking)
this.#runStorageHousekeeping();
} catch (e) {
this.#setStatus('error');
console.error('[OfflineRuntime] Init failed:', e);
throw e;
}
}
/**
* Subscribe to a document. Loads from IndexedDB (instant), syncs via WebSocket.
* Returns the current Automerge doc.
*/
async subscribe<T extends Record<string, any>>(
docId: DocumentId,
schema: DocSchema<T>,
): Promise<Automerge.Doc<T>> {
// Register schema for future use
this.#documents.registerSchema(schema);
// Try loading from IndexedDB first (instant cache hit)
const cached = await this.#store.load(docId);
const doc = this.#documents.open(docId, schema, cached ?? undefined);
// Subscribe for sync (sends subscribe + initial sync to server)
await this.#sync.subscribe([docId]);
return doc;
}
/**
* Unsubscribe from a document. Persists current state, stops syncing.
*/
unsubscribe(docId: DocumentId): void {
// Save before unsubscribing
const binary = this.#documents.save(docId);
if (binary) {
const meta = this.#documents.getMeta(docId);
this.#store.save(docId, binary, meta ? {
module: meta.module,
collection: meta.collection,
version: meta.version,
} : undefined);
}
this.#sync.unsubscribe([docId]);
this.#documents.close(docId);
}
/**
* Mutate a document. Persists to IndexedDB + syncs to server.
*/
change<T>(docId: DocumentId, message: string, fn: (doc: T) => void): void {
this.#sync.change<T>(docId, message, fn);
}
/**
* Read the current state of a document.
*/
get<T>(docId: DocumentId): Automerge.Doc<T> | undefined {
return this.#documents.get<T>(docId);
}
/**
* Listen for changes (local or remote) on a document.
* Returns an unsubscribe function.
*/
onChange<T>(docId: DocumentId, cb: (doc: Automerge.Doc<T>) => void): () => void {
// Listen to both DocumentManager (local changes) and DocSyncManager (remote)
const unsub1 = this.#documents.onChange<T>(docId, cb);
const unsub2 = this.#sync.onChange(docId, cb as any);
return () => { unsub1(); unsub2(); };
}
/**
* Configure module scope information from the page's module list.
* Call once after init with the modules config for this space.
*/
setModuleScopes(scopes: Array<{ id: string; scope: 'global' | 'space' }>): void {
this.#moduleScopes.clear();
for (const { id, scope } of scopes) {
this.#moduleScopes.set(id, scope);
}
}
/**
* Resolve the data-space prefix for a module.
* Returns 'global' for global-scoped modules, the current space otherwise.
*/
resolveDocSpace(moduleId: string): string {
const scope = this.#moduleScopes.get(moduleId);
return scope === 'global' ? 'global' : this.#activeSpace;
}
/**
* Build a DocumentId for a module, respecting scope resolution.
* Global-scoped modules get `global:module:collection` prefix.
*/
makeDocId(module: string, collection: string, itemId?: string): DocumentId {
const dataSpace = this.resolveDocSpace(module);
return makeDocumentId(dataSpace, module, collection, itemId);
}
/**
* List all cached doc IDs for a module/collection from IndexedDB.
*/
async listByModule(module: string, collection?: string): Promise<DocumentId[]> {
return this.#store.listByModule(module, collection);
}
/**
* Subscribe to all documents for a multi-doc module.
* Discovers doc IDs from both IndexedDB cache and the server,
* then subscribes to each. Returns all discovered docs.
*/
async subscribeModule<T extends Record<string, any>>(
module: string,
collection: string,
schema: DocSchema<T>,
): Promise<Map<DocumentId, Automerge.Doc<T>>> {
this.#documents.registerSchema(schema);
const dataSpace = this.resolveDocSpace(module);
const prefix = `${dataSpace}:${module}:${collection}`;
const docIds = await this.#sync.requestDocList(prefix);
const results = new Map<DocumentId, Automerge.Doc<T>>();
for (const id of docIds) {
const docId = id as DocumentId;
const doc = await this.subscribe<T>(docId, schema);
results.set(docId, doc);
}
return results;
}
/**
* Listen for runtime status changes (online/offline/syncing/error).
*/
onStatusChange(cb: StatusCallback): () => void {
this.#statusListeners.add(cb);
return () => { this.#statusListeners.delete(cb); };
}
/**
* Flush all pending writes to IndexedDB. Call on beforeunload.
*/
async flush(): Promise<void> {
await Promise.all([
this.#sync.flush(),
this.#store.flush(),
]);
}
/**
* Switch the active space context without destroying existing connections.
* Previous space's WebSocket stays alive for cross-space subscriptions.
*/
switchSpace(newSpace: string): void {
if (newSpace === this.#activeSpace) return;
// Store the current connection for cross-space access
if (!this.#spaceConnections.has(this.#activeSpace)) {
this.#spaceConnections.set(this.#activeSpace, this.#sync);
}
this.#activeSpace = newSpace;
// Re-use existing connection if we've visited this space before
const existing = this.#spaceConnections.get(newSpace);
if (existing) {
this.#sync = existing;
return;
}
// Create a new sync manager for the new space
const newSync = new DocSyncManager({
documents: this.#documents,
store: this.#store,
});
this.#sync = newSync;
this.#spaceConnections.set(newSpace, newSync);
// Connect lazily — will connect when first doc is subscribed
const proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${proto}//${location.host}/ws/${newSpace}`;
newSync.onConnect(() => this.#setStatus('online'));
newSync.onDisconnect(() => this.#setStatus('offline'));
newSync.connect(wsUrl, newSpace).catch(() => {
this.#setStatus('offline');
});
}
/**
* Get all spaces that have active connections (current + cross-space layers).
* Modules use this to aggregate data from all active space layers.
* Returns array of { space, sync } pairs for iteration.
*/
getAllActiveSpaces(): Array<{ space: string; sync: DocSyncManager }> {
const result: Array<{ space: string; sync: DocSyncManager }> = [
{ space: this.#activeSpace, sync: this.#sync },
];
for (const [space, sync] of this.#spaceConnections) {
if (space !== this.#activeSpace) {
result.push({ space, sync });
}
}
return result;
}
/**
* Subscribe to a module's docs across all active spaces.
* Returns merged results from the current space + all space layers.
*/
async subscribeModuleAcrossSpaces<T extends Record<string, any>>(
module: string,
collection: string,
schema: DocSchema<T>,
): Promise<Map<DocumentId, { doc: Automerge.Doc<T>; space: string }>> {
this.#documents.registerSchema(schema);
const results = new Map<DocumentId, { doc: Automerge.Doc<T>; space: string }>();
for (const { space, sync } of this.getAllActiveSpaces()) {
const dataSpace = this.#moduleScopes.get(module) === 'global' ? 'global' : space;
const prefix = `${dataSpace}:${module}:${collection}`;
try {
const docIds = await sync.requestDocList(prefix);
for (const id of docIds) {
const docId = id as DocumentId;
if (results.has(docId)) continue; // global docs already seen
const doc = await this.subscribe<T>(docId, schema);
results.set(docId, { doc, space });
}
} catch {
// Cross-space fetch failed — skip silently
}
}
return results;
}
/**
* Get a sync manager for a specific space (lazy connect).
* Used for cross-space subscriptions (space layers).
*/
async connectToSpace(spaceSlug: string): Promise<DocSyncManager> {
// Already connected?
const existing = this.#spaceConnections.get(spaceSlug);
if (existing) return existing;
// Current space
if (spaceSlug === this.#activeSpace) return this.#sync;
// Create new connection
const sync = new DocSyncManager({
documents: this.#documents,
store: this.#store,
});
this.#spaceConnections.set(spaceSlug, sync);
const proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${proto}//${location.host}/ws/${spaceSlug}`;
sync.onConnect(() => { /* cross-space connected */ });
sync.onDisconnect(() => { /* cross-space disconnected */ });
try {
await sync.connect(wsUrl, spaceSlug);
} catch {
// Cross-space connection failed — operate offline for this space
}
return sync;
}
/**
* Tear down: flush, disconnect all connections, clear listeners.
*/
destroy(): void {
this.#sync.disconnect();
for (const [, sync] of this.#spaceConnections) {
sync.disconnect();
}
this.#spaceConnections.clear();
this.#statusListeners.clear();
this.#initialized = false;
this.#setStatus('idle');
}
// ── Private ──
async #runStorageHousekeeping(): Promise<void> {
try {
// Request persistent storage (browser may grant silently)
await requestPersistentStorage();
// Check quota and evict stale docs if needed
const info = await getStorageInfo();
if (info.percent >= QUOTA_WARNING_PERCENT) {
console.warn(
`[OfflineRuntime] Storage usage at ${info.percent}% ` +
`(${(info.usage / 1e6).toFixed(1)}MB / ${(info.quota / 1e6).toFixed(1)}MB). ` +
`Evicting stale documents...`
);
await evictStaleDocs();
}
} catch (e) {
console.warn('[OfflineRuntime] Storage housekeeping failed:', e);
}
}
#setStatus(status: RuntimeStatus): void {
if (this.#status === status) return;
this.#status = status;
for (const cb of this.#statusListeners) {
try { cb(status); } catch { /* ignore */ }
}
}
}