From 3a0d376f237bf8267bea79970b13fcdfb7cd770b Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Mon, 23 Feb 2026 01:12:06 +0000 Subject: [PATCH] Add 7-layer local-first data infrastructure Crypto (PRF/HKDF/AES-256-GCM per-doc keys), Document (schema + manager), Storage (encrypted IndexedDB), Sync (multi-doc WebSocket client + server), Compute (local/server-delegated transforms), Query (views + search), and Memory Card interchange format. 2919 lines across 10 files. Co-Authored-By: Claude Opus 4.6 --- server/local-first/compute-server.ts | 90 +++++ server/local-first/sync-server.ts | 353 +++++++++++++++++++ shared/local-first/compute.ts | 181 ++++++++++ shared/local-first/crypto.ts | 210 ++++++++++++ shared/local-first/document.ts | 289 ++++++++++++++++ shared/local-first/index.ts | 75 ++++ shared/local-first/memory-card.ts | 163 +++++++++ shared/local-first/query.ts | 348 +++++++++++++++++++ shared/local-first/storage.ts | 371 ++++++++++++++++++++ shared/local-first/sync.ts | 489 +++++++++++++++++++++++++++ 10 files changed, 2569 insertions(+) create mode 100644 server/local-first/compute-server.ts create mode 100644 server/local-first/sync-server.ts create mode 100644 shared/local-first/compute.ts create mode 100644 shared/local-first/crypto.ts create mode 100644 shared/local-first/document.ts create mode 100644 shared/local-first/index.ts create mode 100644 shared/local-first/memory-card.ts create mode 100644 shared/local-first/query.ts create mode 100644 shared/local-first/storage.ts create mode 100644 shared/local-first/sync.ts diff --git a/server/local-first/compute-server.ts b/server/local-first/compute-server.ts new file mode 100644 index 0000000..9618e8b --- /dev/null +++ b/server/local-first/compute-server.ts @@ -0,0 +1,90 @@ +/** + * Layer 5: Compute (Server) — Server-side transform runner. + * + * Handles POST /:space/api/compute/:transformId requests from clients + * that can't run a transform locally (e.g. PDF generation, image thumbnailing). + * + * Usage in server/index.ts: + * import { createComputeRouter, registerServerTransform } from './local-first/compute-server'; + * const computeRouter = createComputeRouter(); + * app.route('/:space/api/compute', computeRouter); + */ + +import { Hono } from 'hono'; + +// ============================================================================ +// TYPES +// ============================================================================ + +export interface ServerTransform { + id: string; + /** Execute the transform on the server */ + execute(input: In, context: TransformContext): Promise; +} + +export interface TransformContext { + space: string; + /** Auth claims from EncryptID JWT (null if unauthenticated) */ + claims: Record | null; +} + +// ============================================================================ +// REGISTRY +// ============================================================================ + +const transforms = new Map>(); + +export function registerServerTransform(transform: ServerTransform): void { + transforms.set(transform.id, transform); +} + +export function getServerTransform(id: string): ServerTransform | undefined { + return transforms.get(id); +} + +// ============================================================================ +// HONO ROUTER +// ============================================================================ + +/** + * Create a Hono router that handles compute requests. + * Mount at /:space/api/compute + */ +export function createComputeRouter(): Hono { + const router = new Hono(); + + // List available transforms + router.get('/', (c) => { + const list = Array.from(transforms.keys()); + return c.json({ transforms: list }); + }); + + // Execute a transform + router.post('/:transformId', async (c) => { + const transformId = c.req.param('transformId'); + const transform = transforms.get(transformId); + + if (!transform) { + return c.json({ error: `Transform "${transformId}" not found` }, 404); + } + + try { + const body = await c.req.json(); + const input = body.input; + + const context: TransformContext = { + space: c.req.param('space') || 'demo', + claims: (c as any).get?.('claims') ?? null, + }; + + const output = await transform.execute(input, context); + return c.json({ output }); + } catch (e) { + const message = e instanceof Error ? e.message : 'Transform execution failed'; + console.error(`[ComputeServer] Transform "${transformId}" failed:`, e); + return c.json({ error: message }, 500); + } + }); + + return router; +} diff --git a/server/local-first/sync-server.ts b/server/local-first/sync-server.ts new file mode 100644 index 0000000..6af2e20 --- /dev/null +++ b/server/local-first/sync-server.ts @@ -0,0 +1,353 @@ +/** + * Layer 4: Sync (Server) — Multi-document WebSocket sync relay. + * + * Acts as a sync peer for multiple Automerge documents. Can operate in two modes: + * + * 1. **Relay mode** (encrypted docs): Forwards encrypted sync messages between peers. + * Server can't read document content — just routes by docId. + * + * 2. **Participant mode** (shared/unencrypted docs): Server maintains its own copy + * of each document, can read/index/validate. Used for Intent/Claim pattern. + * + * Wire protocol matches the client (shared/local-first/sync.ts). + */ + +import * as Automerge from '@automerge/automerge'; + +// ============================================================================ +// TYPES +// ============================================================================ + +interface SyncMessage { + type: 'sync'; + docId: string; + data: number[]; +} + +interface SubscribeMessage { + type: 'subscribe'; + docIds: string[]; +} + +interface UnsubscribeMessage { + type: 'unsubscribe'; + docIds: string[]; +} + +interface AwarenessMessage { + type: 'awareness'; + docId: string; + peer: string; + cursor?: { x: number; y: number }; + selection?: string; + username?: string; + color?: string; +} + +type WireMessage = + | SyncMessage + | SubscribeMessage + | UnsubscribeMessage + | AwarenessMessage + | { type: 'ping' } + | { type: 'pong' }; + +export interface Peer { + id: string; + ws: WebSocket; + subscribedDocs: Set; + syncStates: Map; + claims?: Record; // Auth claims +} + +export interface SyncServerOptions { + /** If true, server maintains Automerge docs and participates in sync (default: true) */ + participantMode?: boolean; + /** Called when a document changes (participant mode only) */ + onDocChange?: (docId: string, doc: Automerge.Doc) => void; +} + +// ============================================================================ +// SyncServer +// ============================================================================ + +export class SyncServer { + #peers = new Map(); + #docs = new Map>(); + #docSubscribers = new Map>(); // docId → Set + #participantMode: boolean; + #onDocChange?: (docId: string, doc: Automerge.Doc) => void; + + constructor(opts: SyncServerOptions = {}) { + this.#participantMode = opts.participantMode ?? true; + this.#onDocChange = opts.onDocChange; + } + + /** + * Register a new WebSocket peer. + */ + addPeer(peerId: string, ws: WebSocket, claims?: Record): Peer { + const peer: Peer = { + id: peerId, + ws, + subscribedDocs: new Set(), + syncStates: new Map(), + claims, + }; + this.#peers.set(peerId, peer); + return peer; + } + + /** + * Remove a peer and clean up subscriptions. + */ + removePeer(peerId: string): void { + const peer = this.#peers.get(peerId); + if (!peer) return; + + // Remove from all document subscriber sets + for (const docId of peer.subscribedDocs) { + const subs = this.#docSubscribers.get(docId); + if (subs) { + subs.delete(peerId); + if (subs.size === 0) { + this.#docSubscribers.delete(docId); + } + } + } + + this.#peers.delete(peerId); + } + + /** + * Handle an incoming message from a peer. + */ + handleMessage(peerId: string, raw: string | ArrayBuffer): void { + const peer = this.#peers.get(peerId); + if (!peer) return; + + try { + const data = typeof raw === 'string' ? raw : new TextDecoder().decode(raw); + const msg: WireMessage = JSON.parse(data); + + switch (msg.type) { + case 'subscribe': + this.#handleSubscribe(peer, msg as SubscribeMessage); + break; + case 'unsubscribe': + this.#handleUnsubscribe(peer, msg as UnsubscribeMessage); + break; + case 'sync': + this.#handleSync(peer, msg as SyncMessage); + break; + case 'awareness': + this.#handleAwareness(peer, msg as AwarenessMessage); + break; + case 'ping': + this.#sendToPeer(peer, { type: 'pong' }); + break; + } + } catch (e) { + console.error(`[SyncServer] Error handling message from ${peerId}:`, e); + } + } + + /** + * Get a server-side document (participant mode). + */ + getDoc(docId: string): Automerge.Doc | undefined { + return this.#docs.get(docId); + } + + /** + * Set/replace a server-side document and sync to all subscribed peers. + */ + setDoc(docId: string, doc: Automerge.Doc): void { + this.#docs.set(docId, doc); + this.#syncDocToAllPeers(docId); + } + + /** + * Apply a change to a server-side document (e.g. for Intent/Claim validation). + */ + changeDoc(docId: string, message: string, fn: (doc: T) => void): Automerge.Doc | null { + let doc = this.#docs.get(docId); + if (!doc) return null; + + doc = Automerge.change(doc, message, fn as any); + this.#docs.set(docId, doc); + this.#syncDocToAllPeers(docId); + + if (this.#onDocChange) { + this.#onDocChange(docId, doc); + } + + return doc; + } + + /** + * Get list of connected peer IDs. + */ + getPeerIds(): string[] { + return Array.from(this.#peers.keys()); + } + + /** + * Get subscribers for a document. + */ + getDocSubscribers(docId: string): string[] { + return Array.from(this.#docSubscribers.get(docId) ?? []); + } + + // ---------- Private ---------- + + #handleSubscribe(peer: Peer, msg: SubscribeMessage): void { + for (const docId of msg.docIds) { + peer.subscribedDocs.add(docId); + + let subs = this.#docSubscribers.get(docId); + if (!subs) { + subs = new Set(); + this.#docSubscribers.set(docId, subs); + } + subs.add(peer.id); + + // Initialize sync state for this peer-doc pair + if (!peer.syncStates.has(docId)) { + peer.syncStates.set(docId, Automerge.initSyncState()); + } + + // If participant mode and we have a doc, send initial sync + if (this.#participantMode && this.#docs.has(docId)) { + this.#sendSyncToPeer(peer, docId); + } + } + } + + #handleUnsubscribe(peer: Peer, msg: UnsubscribeMessage): void { + for (const docId of msg.docIds) { + peer.subscribedDocs.delete(docId); + peer.syncStates.delete(docId); + + const subs = this.#docSubscribers.get(docId); + if (subs) { + subs.delete(peer.id); + if (subs.size === 0) { + this.#docSubscribers.delete(docId); + } + } + } + } + + #handleSync(peer: Peer, msg: SyncMessage): void { + const { docId, data } = msg; + const syncMsg = new Uint8Array(data); + + if (this.#participantMode) { + // Server participates: apply sync message to server's doc + let doc = this.#docs.get(docId); + if (!doc) { + // Create an empty doc if this is the first time we see this docId + doc = Automerge.init(); + this.#docs.set(docId, doc); + } + + let syncState = peer.syncStates.get(docId) ?? Automerge.initSyncState(); + const [newDoc, newSyncState] = Automerge.receiveSyncMessage(doc, syncState, syncMsg); + + const changed = newDoc !== doc; + this.#docs.set(docId, newDoc); + peer.syncStates.set(docId, newSyncState); + + // Send response sync message back to this peer + this.#sendSyncToPeer(peer, docId); + + // If doc changed, sync to other subscribers + if (changed) { + this.#syncDocToOtherPeers(docId, peer.id); + if (this.#onDocChange) { + this.#onDocChange(docId, newDoc); + } + } + } else { + // Relay mode: forward sync message to all other subscribers + const subs = this.#docSubscribers.get(docId); + if (!subs) return; + + for (const subPeerId of subs) { + if (subPeerId === peer.id) continue; + const subPeer = this.#peers.get(subPeerId); + if (subPeer) { + this.#sendToPeer(subPeer, msg); + } + } + } + } + + #handleAwareness(peer: Peer, msg: AwarenessMessage): void { + // Forward awareness to all other subscribers of this document + const subs = this.#docSubscribers.get(msg.docId); + if (!subs) return; + + for (const subPeerId of subs) { + if (subPeerId === peer.id) continue; + const subPeer = this.#peers.get(subPeerId); + if (subPeer) { + this.#sendToPeer(subPeer, msg); + } + } + } + + #sendSyncToPeer(peer: Peer, docId: string): void { + const doc = this.#docs.get(docId); + if (!doc) return; + + let syncState = peer.syncStates.get(docId) ?? Automerge.initSyncState(); + const [newSyncState, syncMessage] = Automerge.generateSyncMessage(doc, syncState); + + peer.syncStates.set(docId, newSyncState); + + if (syncMessage) { + this.#sendToPeer(peer, { + type: 'sync', + docId, + data: Array.from(syncMessage), + }); + } + } + + #syncDocToAllPeers(docId: string): void { + const subs = this.#docSubscribers.get(docId); + if (!subs) return; + + for (const peerId of subs) { + const peer = this.#peers.get(peerId); + if (peer) { + this.#sendSyncToPeer(peer, docId); + } + } + } + + #syncDocToOtherPeers(docId: string, excludePeerId: string): void { + const subs = this.#docSubscribers.get(docId); + if (!subs) return; + + for (const peerId of subs) { + if (peerId === excludePeerId) continue; + const peer = this.#peers.get(peerId); + if (peer) { + this.#sendSyncToPeer(peer, docId); + } + } + } + + #sendToPeer(peer: Peer, msg: object): void { + try { + if (peer.ws.readyState === WebSocket.OPEN) { + peer.ws.send(JSON.stringify(msg)); + } + } catch (e) { + console.error(`[SyncServer] Failed to send to peer ${peer.id}:`, e); + } + } +} diff --git a/shared/local-first/compute.ts b/shared/local-first/compute.ts new file mode 100644 index 0000000..7d930aa --- /dev/null +++ b/shared/local-first/compute.ts @@ -0,0 +1,181 @@ +/** + * Layer 5: Compute — Deterministic transforms that run locally when possible, + * delegate to server when not. + * + * | Transform | Local? | Example | + * |---------------------|--------|--------------------------------| + * | Markdown → HTML | Yes | Notes rendering | + * | Search indexing | Yes | Build index from docs | + * | Vote tallies | Yes | Derived from CRDT state | + * | PDF generation | No | Server delegate (Typst) | + * | Image thumbnailing | No | Server delegate (Sharp) | + * + * Server fallback: POST /:space/api/compute/:transformId + */ + +// ============================================================================ +// TYPES +// ============================================================================ + +/** + * A transform: a named, deterministic function that converts input → output. + */ +export interface Transform { + /** Unique identifier (e.g. "markdown-to-html", "pdf-generate") */ + id: string; + /** Whether this transform can run in the browser */ + canRunLocally: boolean; + /** Execute the transform */ + execute(input: In): Promise; +} + +export interface ComputeEngineOptions { + /** Base URL for server-side compute endpoint (e.g. "/demo/api/compute") */ + serverBaseUrl?: string; + /** Auth token for server requests */ + authToken?: string; +} + +// ============================================================================ +// ComputeEngine +// ============================================================================ + +export class ComputeEngine { + #transforms = new Map>(); + #serverBaseUrl: string | null; + #authToken: string | null; + + constructor(opts: ComputeEngineOptions = {}) { + this.#serverBaseUrl = opts.serverBaseUrl ?? null; + this.#authToken = opts.authToken ?? null; + } + + /** + * Register a transform. + */ + register(transform: Transform): void { + this.#transforms.set(transform.id, transform); + } + + /** + * Run a transform. Runs locally if possible, delegates to server otherwise. + */ + async run(transformId: string, input: In): Promise { + const transform = this.#transforms.get(transformId); + + if (transform?.canRunLocally) { + return transform.execute(input) as Promise; + } + + if (transform && !transform.canRunLocally && this.#serverBaseUrl) { + return this.#delegateToServer(transformId, input); + } + + if (!transform && this.#serverBaseUrl) { + // Transform not registered locally — try server + return this.#delegateToServer(transformId, input); + } + + throw new Error( + `Transform "${transformId}" not available: ${ + transform ? 'requires server but no serverBaseUrl configured' : 'not registered' + }` + ); + } + + /** + * Check if a transform is registered and can run locally. + */ + canRunLocally(transformId: string): boolean { + const t = this.#transforms.get(transformId); + return !!t && t.canRunLocally; + } + + /** + * Check if a transform is registered. + */ + has(transformId: string): boolean { + return this.#transforms.has(transformId); + } + + /** + * List all registered transform IDs. + */ + list(): string[] { + return Array.from(this.#transforms.keys()); + } + + /** + * Update auth token (e.g. after login/refresh). + */ + setAuthToken(token: string): void { + this.#authToken = token; + } + + /** + * Update server base URL. + */ + setServerBaseUrl(url: string): void { + this.#serverBaseUrl = url; + } + + // ---------- Private ---------- + + async #delegateToServer(transformId: string, input: In): Promise { + if (!this.#serverBaseUrl) { + throw new Error('No server base URL configured for compute delegation'); + } + + const url = `${this.#serverBaseUrl}/${transformId}`; + const headers: Record = { + 'Content-Type': 'application/json', + }; + if (this.#authToken) { + headers['Authorization'] = `Bearer ${this.#authToken}`; + } + + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify({ input }), + }); + + if (!response.ok) { + throw new Error(`Compute server error: ${response.status} ${response.statusText}`); + } + + const result = await response.json(); + return result.output as Out; + } +} + +// ============================================================================ +// BUILT-IN TRANSFORMS +// ============================================================================ + +/** + * Helper to create a local transform. + */ +export function localTransform( + id: string, + fn: (input: In) => Promise | Out +): Transform { + return { + id, + canRunLocally: true, + execute: async (input) => fn(input), + }; +} + +/** + * Helper to declare a server-only transform (acts as a type contract). + */ +export function serverTransform(id: string): Transform { + return { + id, + canRunLocally: false, + execute: async () => { + throw new Error(`Transform "${id}" must run on server`); + }, + }; +} diff --git a/shared/local-first/crypto.ts b/shared/local-first/crypto.ts new file mode 100644 index 0000000..e5625b7 --- /dev/null +++ b/shared/local-first/crypto.ts @@ -0,0 +1,210 @@ +/** + * Layer 1: Crypto — Document-level encryption for local-first data. + * + * Key hierarchy: + * Master Key (PRF output / passphrase) → HKDF + * → Space Key (info: "rspace:{spaceId}") → HKDF + * → Doc Key (info: "rspace:{spaceId}:{docId}") → AES-256-GCM + * + * Server never sees plaintext — only encrypted Automerge binary blobs. + * Extends the existing EncryptIDKeyManager key derivation pattern. + */ + +// ============================================================================ +// TYPES +// ============================================================================ + +export interface EncryptedBlob { + /** AES-256-GCM ciphertext (includes auth tag) */ + ciphertext: Uint8Array; + /** 12-byte random nonce */ + nonce: Uint8Array; +} + +// ============================================================================ +// DocCrypto +// ============================================================================ + +const encoder = new TextEncoder(); + +/** + * DocCrypto — derives per-space and per-document encryption keys from a + * master key (typically the EncryptIDKeyManager's AES-256-GCM key material). + * + * Usage: + * const crypto = new DocCrypto(); + * await crypto.init(masterKey); // from EncryptIDKeyManager PRF + * const spaceKey = await crypto.deriveSpaceKey('my-space'); + * const docKey = await crypto.deriveDocKey(spaceKey, 'notes:items'); + * const blob = await crypto.encrypt(docKey, automergeBytes); + * const plain = await crypto.decrypt(docKey, blob); + */ +export class DocCrypto { + #masterKeyMaterial: CryptoKey | null = null; + + /** + * Initialize from a master key. Accepts either: + * - A CryptoKey with HKDF usage (from EncryptIDKeyManager.initFromPRF) + * - Raw key bytes (Uint8Array / ArrayBuffer) that will be imported as HKDF material + */ + async init(masterKey: CryptoKey | Uint8Array | ArrayBuffer): Promise { + if (masterKey instanceof CryptoKey) { + // If the key already supports deriveBits/deriveKey, use it directly + if (masterKey.algorithm.name === 'HKDF') { + this.#masterKeyMaterial = masterKey; + } else { + // It's an AES-GCM key — export raw bits and re-import as HKDF + const raw = await crypto.subtle.exportKey('raw', masterKey); + this.#masterKeyMaterial = await crypto.subtle.importKey( + 'raw', + raw as ArrayBuffer, + { name: 'HKDF' }, + false, + ['deriveKey', 'deriveBits'] + ); + } + } else { + // Raw bytes → import as HKDF + const buf = masterKey instanceof Uint8Array ? masterKey.buffer as ArrayBuffer : masterKey; + this.#masterKeyMaterial = await crypto.subtle.importKey( + 'raw', + buf, + { name: 'HKDF' }, + false, + ['deriveKey', 'deriveBits'] + ); + } + } + + /** + * Initialize directly from WebAuthn PRF output (convenience shortcut). + */ + async initFromPRF(prfOutput: ArrayBuffer): Promise { + await this.init(new Uint8Array(prfOutput)); + } + + get isInitialized(): boolean { + return this.#masterKeyMaterial !== null; + } + + /** + * Derive a space-level HKDF key. + * info = "rspace:{spaceId}" + */ + async deriveSpaceKey(spaceId: string): Promise { + this.#assertInit(); + + // Derive 256 bits of key material for the space + const bits = await crypto.subtle.deriveBits( + { + name: 'HKDF', + hash: 'SHA-256', + salt: encoder.encode('rspace-space-key-v1'), + info: encoder.encode(`rspace:${spaceId}`), + }, + this.#masterKeyMaterial!, + 256 + ); + + // Re-import as HKDF for further derivation (space → doc) + return crypto.subtle.importKey( + 'raw', + bits, + { name: 'HKDF' }, + false, + ['deriveKey', 'deriveBits'] + ); + } + + /** + * Derive a document-level AES-256-GCM key from a space key. + * info = "rspace:{spaceId}:{docId}" + */ + async deriveDocKey(spaceKey: CryptoKey, docId: string): Promise { + return crypto.subtle.deriveKey( + { + name: 'HKDF', + hash: 'SHA-256', + salt: encoder.encode('rspace-doc-key-v1'), + info: encoder.encode(`doc:${docId}`), + }, + spaceKey, + { name: 'AES-GCM', length: 256 }, + false, // non-extractable + ['encrypt', 'decrypt'] + ); + } + + /** + * Encrypt data with a document key. + * Returns ciphertext + 12-byte random nonce. + */ + async encrypt(docKey: CryptoKey, data: Uint8Array): Promise { + const nonce = crypto.getRandomValues(new Uint8Array(12)); + const ciphertext = await crypto.subtle.encrypt( + { name: 'AES-GCM', iv: nonce }, + docKey, + data.buffer as ArrayBuffer + ); + return { + ciphertext: new Uint8Array(ciphertext), + nonce, + }; + } + + /** + * Decrypt an encrypted blob with a document key. + */ + async decrypt(docKey: CryptoKey, blob: EncryptedBlob): Promise { + const iv = new Uint8Array(blob.nonce) as unknown as ArrayBuffer; + const ct = new Uint8Array(blob.ciphertext) as unknown as ArrayBuffer; + const plaintext = await crypto.subtle.decrypt( + { name: 'AES-GCM', iv }, + docKey, + ct + ); + return new Uint8Array(plaintext); + } + + /** + * Convenience: derive doc key directly from master key (space + doc in one call). + */ + async deriveDocKeyDirect(spaceId: string, docId: string): Promise { + const spaceKey = await this.deriveSpaceKey(spaceId); + return this.deriveDocKey(spaceKey, docId); + } + + /** + * Serialize an EncryptedBlob for storage (nonce prepended to ciphertext). + * Format: [12-byte nonce][ciphertext...] + */ + static pack(blob: EncryptedBlob): Uint8Array { + const packed = new Uint8Array(12 + blob.ciphertext.length); + packed.set(blob.nonce, 0); + packed.set(blob.ciphertext, 12); + return packed; + } + + /** + * Deserialize a packed blob. + */ + static unpack(packed: Uint8Array): EncryptedBlob { + return { + nonce: packed.slice(0, 12), + ciphertext: packed.slice(12), + }; + } + + /** + * Clear master key material from memory. + */ + clear(): void { + this.#masterKeyMaterial = null; + } + + #assertInit(): void { + if (!this.#masterKeyMaterial) { + throw new Error('DocCrypto not initialized — call init() first'); + } + } +} diff --git a/shared/local-first/document.ts b/shared/local-first/document.ts new file mode 100644 index 0000000..9083c23 --- /dev/null +++ b/shared/local-first/document.ts @@ -0,0 +1,289 @@ +/** + * Layer 2: Document — Automerge document management with typed schemas. + * + * DocumentId format: "{space}:{module}:{collection}" or "{space}:{module}:{collection}:{itemId}" + * + * Granularity principle: one document per "unit of collaboration." + * Binary blobs (PDFs, .splat, images) stay in blob storage with metadata refs in Automerge. + */ + +import * as Automerge from '@automerge/automerge'; + +// ============================================================================ +// TYPES +// ============================================================================ + +/** + * Document ID — hierarchical, colon-separated. + * 3-part: space-level collection (e.g. "demo:notes:items") + * 4-part: item-level doc (e.g. "demo:work:boards:board-1") + */ +export type DocumentId = + | `${string}:${string}:${string}` + | `${string}:${string}:${string}:${string}`; + +/** + * Parse a DocumentId into its components. + */ +export interface ParsedDocumentId { + space: string; + module: string; + collection: string; + itemId?: string; +} + +export function parseDocumentId(id: DocumentId): ParsedDocumentId { + const parts = id.split(':'); + if (parts.length < 3 || parts.length > 4) { + throw new Error(`Invalid DocumentId: "${id}" — expected 3 or 4 colon-separated parts`); + } + return { + space: parts[0], + module: parts[1], + collection: parts[2], + itemId: parts[3], + }; +} + +export function makeDocumentId( + space: string, + module: string, + collection: string, + itemId?: string +): DocumentId { + if (itemId) { + return `${space}:${module}:${collection}:${itemId}` as DocumentId; + } + return `${space}:${module}:${collection}` as DocumentId; +} + +/** + * Schema definition for an Automerge document. + * Each module/collection pair defines a schema with version + initializer. + */ +export interface DocSchema { + module: string; + collection: string; + version: number; + /** Create the initial document state */ + init: () => T; + /** Migrate from an older version (called when loaded doc version < schema version) */ + migrate?: (doc: T, fromVersion: number) => T; +} + +/** + * Metadata stored alongside each document. + */ +export interface DocMeta { + docId: DocumentId; + module: string; + collection: string; + version: number; + createdAt: number; + updatedAt: number; +} + +// ============================================================================ +// DOCUMENT MANAGER +// ============================================================================ + +/** + * DocumentManager — manages multiple Automerge documents in memory. + * + * Responsibilities: + * - Open/create documents with typed schemas + * - Track open documents and their metadata + * - Apply changes with Automerge.change() + * - List documents by space/module + * + * Does NOT handle persistence (that's Layer 3) or sync (Layer 4). + */ +export class DocumentManager { + #docs = new Map>(); + #meta = new Map(); + #schemas = new Map>(); + #changeListeners = new Map void>>(); + + /** + * Register a schema so documents can be opened with type safety. + */ + registerSchema(schema: DocSchema): void { + const key = `${schema.module}:${schema.collection}`; + this.#schemas.set(key, schema); + } + + /** + * Get the registered schema for a module/collection. + */ + getSchema(module: string, collection: string): DocSchema | undefined { + return this.#schemas.get(`${module}:${collection}`); + } + + /** + * Open (or create) a document. If already open, returns the cached instance. + * If binary data is provided, loads from that; otherwise creates from schema.init(). + */ + open>( + id: DocumentId, + schema: DocSchema, + binary?: Uint8Array + ): Automerge.Doc { + // Return cached if already open + const existing = this.#docs.get(id); + if (existing) return existing as Automerge.Doc; + + let doc: Automerge.Doc; + const now = Date.now(); + + if (binary) { + // Load from persisted binary + doc = Automerge.load(binary); + + // Check if migration is needed + const meta = this.#meta.get(id); + if (meta && meta.version < schema.version && schema.migrate) { + doc = Automerge.change(doc, `Migrate ${id} to v${schema.version}`, (d) => { + schema.migrate!(d, meta.version); + }); + } + } else { + // Create fresh document from schema + doc = Automerge.init(); + doc = Automerge.change(doc, `Initialize ${id}`, (d) => { + Object.assign(d, schema.init()); + }); + } + + this.#docs.set(id, doc); + this.#meta.set(id, { + docId: id, + module: schema.module, + collection: schema.collection, + version: schema.version, + createdAt: now, + updatedAt: now, + }); + + // Register schema if not already + this.registerSchema(schema); + + return doc; + } + + /** + * Get an already-open document. + */ + get(id: DocumentId): Automerge.Doc | undefined { + return this.#docs.get(id) as Automerge.Doc | undefined; + } + + /** + * Apply a change to a document (Automerge.change wrapper). + * Notifies change listeners. + */ + change( + id: DocumentId, + message: string, + fn: (doc: T) => void + ): Automerge.Doc { + const doc = this.#docs.get(id); + if (!doc) { + throw new Error(`Document not open: ${id}`); + } + + const updated = Automerge.change(doc, message, fn as any); + this.#docs.set(id, updated); + + // Update metadata timestamp + const meta = this.#meta.get(id); + if (meta) { + meta.updatedAt = Date.now(); + } + + // Notify listeners + this.#notifyChange(id, updated); + + return updated as Automerge.Doc; + } + + /** + * Replace a document (e.g. after receiving sync data). + */ + set(id: DocumentId, doc: Automerge.Doc): void { + this.#docs.set(id, doc); + const meta = this.#meta.get(id); + if (meta) { + meta.updatedAt = Date.now(); + } + this.#notifyChange(id, doc); + } + + /** + * Close a document — remove from in-memory cache. + */ + close(id: DocumentId): void { + this.#docs.delete(id); + this.#meta.delete(id); + this.#changeListeners.delete(id); + } + + /** + * Get document binary for persistence. + */ + save(id: DocumentId): Uint8Array | null { + const doc = this.#docs.get(id); + if (!doc) return null; + return Automerge.save(doc); + } + + /** + * Get metadata for a document. + */ + getMeta(id: DocumentId): DocMeta | undefined { + return this.#meta.get(id); + } + + /** + * List all open document IDs for a given space and module. + */ + list(space: string, module?: string): DocumentId[] { + const results: DocumentId[] = []; + for (const [id, meta] of this.#meta) { + const parsed = parseDocumentId(id); + if (parsed.space !== space) continue; + if (module && parsed.module !== module) continue; + results.push(id); + } + return results; + } + + /** + * List all open document IDs. + */ + listAll(): DocumentId[] { + return Array.from(this.#docs.keys()); + } + + /** + * Subscribe to changes on a document. + */ + onChange(id: DocumentId, cb: (doc: Automerge.Doc) => void): () => void { + let listeners = this.#changeListeners.get(id); + if (!listeners) { + listeners = new Set(); + this.#changeListeners.set(id, listeners); + } + listeners.add(cb); + return () => { listeners!.delete(cb); }; + } + + #notifyChange(id: DocumentId, doc: any): void { + const listeners = this.#changeListeners.get(id); + if (!listeners) return; + for (const cb of listeners) { + try { cb(doc); } catch (e) { + console.error(`[DocumentManager] Change listener error for ${id}:`, e); + } + } + } +} diff --git a/shared/local-first/index.ts b/shared/local-first/index.ts new file mode 100644 index 0000000..7f5a98d --- /dev/null +++ b/shared/local-first/index.ts @@ -0,0 +1,75 @@ +/** + * Local-First Data Infrastructure for rSpace + * + * 7-layer architecture: + * 1. Crypto — PRF key derivation, AES-256-GCM per-document encryption + * 2. Document — Automerge documents with typed schemas + * 3. Storage — Encrypted multi-document IndexedDB + * 4. Sync — Multi-doc WebSocket sync (client-side) + * 5. Compute — Deterministic transforms (local or server-delegated) + * 6. Query — Materialized views + full-text search + * 7. Application (module-specific, not in this package) + * + * Plus: Memory Card interchange format for cross-module data exchange. + */ + +// Layer 1: Crypto +export { DocCrypto, type EncryptedBlob } from './crypto'; + +// Layer 2: Document +export { + type DocumentId, + type ParsedDocumentId, + type DocSchema, + type DocMeta, + parseDocumentId, + makeDocumentId, + DocumentManager, +} from './document'; + +// Layer 3: Storage +export { EncryptedDocStore } from './storage'; + +// Layer 4: Sync (client) +export { + DocSyncManager, + type DocSyncManagerOptions, + type SyncMessage, + type SubscribeMessage, + type UnsubscribeMessage, + type AwarenessMessage, + type WireMessage, +} from './sync'; + +// Layer 5: Compute +export { + type Transform, + ComputeEngine, + type ComputeEngineOptions, + localTransform, + serverTransform, +} from './compute'; + +// Layer 6: Query +export { + type MaterializedView, + type SearchResult, + ViewEngine, + LocalSearchEngine, +} from './query'; + +// Memory Card interchange +export { + type MemoryCard, + type MemoryCardType, + type CardExporter, + createCard, + filterByType, + filterByTag, + filterByModule, + sortByNewest, + searchCards, + registerExporter, + getExporter, + getAllExporters, +} from './memory-card'; diff --git a/shared/local-first/memory-card.ts b/shared/local-first/memory-card.ts new file mode 100644 index 0000000..24f6f47 --- /dev/null +++ b/shared/local-first/memory-card.ts @@ -0,0 +1,163 @@ +/** + * Memory Card — Cross-module data interchange format. + * + * Any module can export items as Memory Cards; any module can import/reference them. + * Think of it as a universal "clip" format for data flowing between rSpace modules. + */ + +import type { DocumentId } from './document'; + +// ============================================================================ +// TYPES +// ============================================================================ + +/** + * Core Memory Card — the universal interchange unit. + */ +export interface MemoryCard { + /** Unique ID (UUID v4 or module-specific) */ + id: string; + /** Semantic type */ + type: MemoryCardType | string; + /** Human-readable title */ + title: string; + /** Optional body text (markdown) */ + body?: string; + /** Source provenance */ + source: { + module: string; + docId: DocumentId; + itemId?: string; + }; + /** Freeform tags for filtering/grouping */ + tags: string[]; + /** Unix timestamp (ms) */ + createdAt: number; + /** Module-specific structured data */ + data?: Record; +} + +/** + * Well-known card types. Modules can extend with custom strings. + */ +export type MemoryCardType = + | 'note' + | 'task' + | 'event' + | 'link' + | 'file' + | 'vote' + | 'transaction' + | 'trip' + | 'contact' + | 'shape' + | 'book' + | 'product' + | 'post'; + +// ============================================================================ +// HELPERS +// ============================================================================ + +/** + * Create a Memory Card with defaults. + */ +export function createCard( + fields: Pick & Partial +): MemoryCard { + return { + id: fields.id ?? generateId(), + type: fields.type, + title: fields.title, + body: fields.body, + source: fields.source, + tags: fields.tags ?? [], + createdAt: fields.createdAt ?? Date.now(), + data: fields.data, + }; +} + +/** + * Filter cards by type. + */ +export function filterByType(cards: MemoryCard[], type: string): MemoryCard[] { + return cards.filter((c) => c.type === type); +} + +/** + * Filter cards by tag (any match). + */ +export function filterByTag(cards: MemoryCard[], tag: string): MemoryCard[] { + return cards.filter((c) => c.tags.includes(tag)); +} + +/** + * Filter cards by source module. + */ +export function filterByModule(cards: MemoryCard[], module: string): MemoryCard[] { + return cards.filter((c) => c.source.module === module); +} + +/** + * Sort cards by creation time (newest first). + */ +export function sortByNewest(cards: MemoryCard[]): MemoryCard[] { + return [...cards].sort((a, b) => b.createdAt - a.createdAt); +} + +/** + * Search cards by title/body text (case-insensitive substring match). + */ +export function searchCards(cards: MemoryCard[], query: string): MemoryCard[] { + const q = query.toLowerCase(); + return cards.filter( + (c) => + c.title.toLowerCase().includes(q) || + (c.body && c.body.toLowerCase().includes(q)) + ); +} + +// ============================================================================ +// CARD EXPORTER INTERFACE +// ============================================================================ + +/** + * Modules implement this to export their data as Memory Cards. + */ +export interface CardExporter { + module: string; + /** Export all items from a document as cards */ + exportCards(docId: DocumentId, doc: any): MemoryCard[]; + /** Export a single item as a card */ + exportCard?(docId: DocumentId, doc: any, itemId: string): MemoryCard | null; +} + +/** + * Registry of card exporters. + */ +const exporters = new Map(); + +export function registerExporter(exporter: CardExporter): void { + exporters.set(exporter.module, exporter); +} + +export function getExporter(module: string): CardExporter | undefined { + return exporters.get(module); +} + +export function getAllExporters(): CardExporter[] { + return Array.from(exporters.values()); +} + +// ============================================================================ +// UTILITIES +// ============================================================================ + +function generateId(): string { + // crypto.randomUUID() is available in all modern browsers and Bun + if (typeof crypto !== 'undefined' && crypto.randomUUID) { + return crypto.randomUUID(); + } + // Fallback + return `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`; +} diff --git a/shared/local-first/query.ts b/shared/local-first/query.ts new file mode 100644 index 0000000..a965f6e --- /dev/null +++ b/shared/local-first/query.ts @@ -0,0 +1,348 @@ +/** + * Layer 6: Query — Materialized views + full-text search over Automerge documents. + * + * All computation is client-side. Views are automatically recomputed when their + * source documents change (via DocumentManager change subscriptions). + */ + +import type { DocumentId } from './document'; +import type { DocumentManager } from './document'; + +// ============================================================================ +// TYPES +// ============================================================================ + +/** + * A materialized view: takes a document and projects it into a view shape. + * Views are cached and recomputed lazily on document change. + */ +export interface MaterializedView { + /** Unique view identifier */ + id: string; + /** Which document this view is derived from */ + docId: DocumentId; + /** Project the document into the view */ + compute(doc: T): V; +} + +export interface SearchResult { + docId: DocumentId; + field: string; + /** The matched text snippet */ + snippet: string; + /** Relevance score (higher = better match) */ + score: number; +} + +interface IndexEntry { + docId: DocumentId; + field: string; + text: string; + /** Lowercase tokens for matching */ + tokens: string[]; +} + +// ============================================================================ +// ViewEngine +// ============================================================================ + +export class ViewEngine { + #views = new Map(); + #cache = new Map(); + #documents: DocumentManager; + #unsubs = new Map void>(); + #subscribers = new Map void>>(); + + constructor(documents: DocumentManager) { + this.#documents = documents; + } + + /** + * Register a materialized view. Immediately computes it if the source doc is open. + */ + register(view: MaterializedView): void { + this.#views.set(view.id, view); + + // Compute initial value if doc is available + const doc = this.#documents.get(view.docId); + if (doc) { + this.#recompute(view); + } + + // Subscribe to document changes + const unsub = this.#documents.onChange(view.docId, () => { + this.#recompute(view); + }); + this.#unsubs.set(view.id, unsub); + } + + /** + * Unregister a view. + */ + unregister(viewId: string): void { + this.#views.delete(viewId); + this.#cache.delete(viewId); + this.#subscribers.delete(viewId); + const unsub = this.#unsubs.get(viewId); + if (unsub) { unsub(); this.#unsubs.delete(viewId); } + } + + /** + * Get the current value of a view (cached). + */ + get(viewId: string): V | undefined { + return this.#cache.get(viewId) as V | undefined; + } + + /** + * Subscribe to view changes. Returns unsubscribe function. + */ + subscribe(viewId: string, cb: (v: V) => void): () => void { + let set = this.#subscribers.get(viewId); + if (!set) { + set = new Set(); + this.#subscribers.set(viewId, set); + } + set.add(cb); + + // Immediately call with current value if available + const current = this.#cache.get(viewId); + if (current !== undefined) { + cb(current as V); + } + + return () => { set!.delete(cb); }; + } + + /** + * Force recompute a view. + */ + refresh(viewId: string): void { + const view = this.#views.get(viewId); + if (view) this.#recompute(view); + } + + /** + * Destroy all views and clean up subscriptions. + */ + destroy(): void { + for (const unsub of this.#unsubs.values()) { + unsub(); + } + this.#views.clear(); + this.#cache.clear(); + this.#unsubs.clear(); + this.#subscribers.clear(); + } + + #recompute(view: MaterializedView): void { + const doc = this.#documents.get(view.docId); + if (!doc) return; + + try { + const value = view.compute(doc); + this.#cache.set(view.id, value); + + const subs = this.#subscribers.get(view.id); + if (subs) { + for (const cb of subs) { + try { cb(value); } catch { /* ignore */ } + } + } + } catch (e) { + console.error(`[ViewEngine] Error computing view "${view.id}":`, e); + } + } +} + +// ============================================================================ +// LocalSearchEngine +// ============================================================================ + +/** + * Client-side full-text search over Automerge documents. + * Simple token-based matching — not a full inverted index, but fast enough + * for the expected data sizes (hundreds, not millions of documents). + */ +export class LocalSearchEngine { + #index: IndexEntry[] = []; + #documents: DocumentManager; + #indexedDocs = new Set(); // "docId:field" set for dedup + + constructor(documents: DocumentManager) { + this.#documents = documents; + } + + /** + * Index specific fields of a document for searching. + * Call this when a document is opened or changes. + */ + index(docId: DocumentId, fields: string[]): void { + const doc = this.#documents.get(docId); + if (!doc) return; + + for (const field of fields) { + const key = `${docId}:${field}`; + + // Remove old entries for this doc+field + this.#index = this.#index.filter((e) => !(e.docId === docId && e.field === field)); + this.#indexedDocs.delete(key); + + const text = extractText(doc, field); + if (!text) continue; + + this.#index.push({ + docId, + field, + text, + tokens: tokenize(text), + }); + this.#indexedDocs.add(key); + } + } + + /** + * Index all text fields from a map/object structure. + * Walks one level of keys, indexes any string values. + */ + indexMap(docId: DocumentId, mapField: string): void { + const doc = this.#documents.get(docId); + if (!doc) return; + + const map = (doc as any)[mapField]; + if (!map || typeof map !== 'object') return; + + for (const [itemId, item] of Object.entries(map)) { + if (!item || typeof item !== 'object') continue; + for (const [key, value] of Object.entries(item as Record)) { + if (typeof value !== 'string') continue; + const fullField = `${mapField}.${itemId}.${key}`; + const compositeKey = `${docId}:${fullField}`; + + this.#index = this.#index.filter((e) => !(e.docId === docId && e.field === fullField)); + this.#indexedDocs.delete(compositeKey); + + this.#index.push({ + docId, + field: fullField, + text: value, + tokens: tokenize(value), + }); + this.#indexedDocs.add(compositeKey); + } + } + } + + /** + * Remove all index entries for a document. + */ + removeDoc(docId: DocumentId): void { + this.#index = this.#index.filter((e) => e.docId !== docId); + // Clean up indexedDocs set + for (const key of this.#indexedDocs) { + if (key.startsWith(`${docId}:`)) { + this.#indexedDocs.delete(key); + } + } + } + + /** + * Search across all indexed documents. + */ + search(query: string, opts?: { module?: string; maxResults?: number }): SearchResult[] { + const queryTokens = tokenize(query); + if (queryTokens.length === 0) return []; + + const results: SearchResult[] = []; + const moduleFilter = opts?.module; + const maxResults = opts?.maxResults ?? 50; + + for (const entry of this.#index) { + // Optional module filter + if (moduleFilter) { + const parts = entry.docId.split(':'); + if (parts[1] !== moduleFilter) continue; + } + + const score = computeScore(queryTokens, entry.tokens); + if (score > 0) { + results.push({ + docId: entry.docId, + field: entry.field, + snippet: createSnippet(entry.text, query), + score, + }); + } + } + + // Sort by score descending + results.sort((a, b) => b.score - a.score); + return results.slice(0, maxResults); + } + + /** + * Clear the entire index. + */ + clear(): void { + this.#index = []; + this.#indexedDocs.clear(); + } +} + +// ============================================================================ +// UTILITIES +// ============================================================================ + +function tokenize(text: string): string[] { + return text + .toLowerCase() + .replace(/[^\w\s]/g, ' ') + .split(/\s+/) + .filter((t) => t.length > 1); +} + +function extractText(doc: any, fieldPath: string): string | null { + const parts = fieldPath.split('.'); + let value: any = doc; + for (const part of parts) { + if (value == null || typeof value !== 'object') return null; + value = value[part]; + } + return typeof value === 'string' ? value : null; +} + +function computeScore(queryTokens: string[], docTokens: string[]): number { + let matches = 0; + for (const qt of queryTokens) { + for (const dt of docTokens) { + if (dt.includes(qt)) { + matches++; + break; + } + } + } + // Score: fraction of query tokens matched + return matches / queryTokens.length; +} + +function createSnippet(text: string, query: string, contextChars = 60): string { + const lowerText = text.toLowerCase(); + const lowerQuery = query.toLowerCase(); + const idx = lowerText.indexOf(lowerQuery); + + if (idx === -1) { + // No exact match; return beginning of text + return text.length > contextChars * 2 + ? text.slice(0, contextChars * 2) + '...' + : text; + } + + const start = Math.max(0, idx - contextChars); + const end = Math.min(text.length, idx + query.length + contextChars); + let snippet = text.slice(start, end); + + if (start > 0) snippet = '...' + snippet; + if (end < text.length) snippet = snippet + '...'; + + return snippet; +} diff --git a/shared/local-first/storage.ts b/shared/local-first/storage.ts new file mode 100644 index 0000000..e0205ef --- /dev/null +++ b/shared/local-first/storage.ts @@ -0,0 +1,371 @@ +/** + * Layer 3: Storage — Encrypted multi-document IndexedDB store. + * + * Extends the single-document OfflineStore pattern into a multi-document, + * per-document encrypted store. Each document is AES-256-GCM encrypted + * at rest using keys derived from the user's passkey (DocCrypto). + * + * IndexedDB database: "rspace-docs" + * Object store "docs": { docId, data (EncryptedBlob packed), updatedAt } + * Object store "meta": { docId, module, collection, version, createdAt, updatedAt } + * Object store "sync": { key: "{docId}:{peerId}", state: Uint8Array } + * Index on "meta" by [module] for listByModule queries + */ + +import type { DocumentId } from './document'; +import { DocCrypto, type EncryptedBlob } from './crypto'; + +// ============================================================================ +// TYPES +// ============================================================================ + +interface StoredDoc { + docId: string; + /** Packed encrypted blob (nonce + ciphertext) — or raw bytes if encryption disabled */ + data: Uint8Array; + updatedAt: number; + encrypted: boolean; +} + +interface StoredMeta { + docId: string; + module: string; + collection: string; + version: number; + createdAt: number; + updatedAt: number; +} + +interface StoredSyncState { + key: string; // "{docId}\0{peerId}" + state: Uint8Array; +} + +// ============================================================================ +// EncryptedDocStore +// ============================================================================ + +export class EncryptedDocStore { + #db: IDBDatabase | null = null; + #dbName = 'rspace-docs'; + #version = 1; + #crypto: DocCrypto | null = null; + #spaceId: string; + + // Debounce infrastructure (same pattern as OfflineStore) + #saveTimers = new Map>(); + #pendingSaves = new Map(); + #saveDebounceMs = 2000; + + constructor(spaceId: string, docCrypto?: DocCrypto) { + this.#spaceId = spaceId; + this.#crypto = docCrypto ?? null; + } + + /** + * Open the IndexedDB database. Must be called before any other method. + */ + async open(): Promise { + if (this.#db) return; + + return new Promise((resolve, reject) => { + const request = indexedDB.open(this.#dbName, this.#version); + + request.onupgradeneeded = () => { + const db = request.result; + + if (!db.objectStoreNames.contains('docs')) { + db.createObjectStore('docs', { keyPath: 'docId' }); + } + + if (!db.objectStoreNames.contains('meta')) { + const metaStore = db.createObjectStore('meta', { keyPath: 'docId' }); + metaStore.createIndex('by_module', 'module', { unique: false }); + metaStore.createIndex('by_module_collection', ['module', 'collection'], { unique: false }); + } + + if (!db.objectStoreNames.contains('sync')) { + db.createObjectStore('sync', { keyPath: 'key' }); + } + }; + + request.onsuccess = () => { + this.#db = request.result; + resolve(); + }; + + request.onerror = () => { + console.error('[EncryptedDocStore] Failed to open IndexedDB:', request.error); + reject(request.error); + }; + }); + } + + /** + * Save a document (debounced). Encrypts if DocCrypto is configured. + */ + save(docId: DocumentId, plaintext: Uint8Array, meta?: { module: string; collection: string; version: number }): void { + this.#pendingSaves.set(docId, { docId, data: plaintext }); + + const existing = this.#saveTimers.get(docId); + if (existing) clearTimeout(existing); + + this.#saveTimers.set( + docId, + setTimeout(() => { + this.#saveTimers.delete(docId); + this.#pendingSaves.delete(docId); + this.#writeDoc(docId, plaintext, meta).catch((e) => { + console.error('[EncryptedDocStore] Failed to save doc:', e); + }); + }, this.#saveDebounceMs) + ); + } + + /** + * Save a document immediately (bypasses debounce). Use before page unload. + */ + async saveImmediate(docId: DocumentId, plaintext: Uint8Array, meta?: { module: string; collection: string; version: number }): Promise { + const existing = this.#saveTimers.get(docId); + if (existing) { + clearTimeout(existing); + this.#saveTimers.delete(docId); + } + this.#pendingSaves.delete(docId); + + await this.#writeDoc(docId, plaintext, meta); + } + + /** + * Load a document. Decrypts if encrypted. + */ + async load(docId: DocumentId): Promise { + if (!this.#db) return null; + + const stored = await this.#getDoc(docId); + if (!stored) return null; + + if (stored.encrypted && this.#crypto) { + const docKey = await this.#crypto.deriveDocKeyDirect(this.#spaceId, docId); + const blob = DocCrypto.unpack(stored.data); + return this.#crypto.decrypt(docKey, blob); + } + + return stored.data; + } + + /** + * Delete a document and its metadata. + */ + async delete(docId: DocumentId): Promise { + if (!this.#db) return; + + const tx = this.#db.transaction(['docs', 'meta'], 'readwrite'); + tx.objectStore('docs').delete(docId); + tx.objectStore('meta').delete(docId); + + await this.#txComplete(tx); + } + + /** + * List all document IDs for a space and module. + */ + async listByModule(module: string, collection?: string): Promise { + if (!this.#db) return []; + + return new Promise((resolve, reject) => { + const tx = this.#db!.transaction('meta', 'readonly'); + const store = tx.objectStore('meta'); + + let request: IDBRequest; + if (collection) { + const index = store.index('by_module_collection'); + request = index.getAllKeys(IDBKeyRange.only([module, collection])); + } else { + const index = store.index('by_module'); + request = index.getAllKeys(IDBKeyRange.only(module)); + } + + request.onsuccess = () => { + // Filter to only docs in this space + const all = request.result as string[]; + const filtered = all.filter((id) => id.startsWith(`${this.#spaceId}:`)); + resolve(filtered as DocumentId[]); + }; + request.onerror = () => reject(request.error); + }); + } + + /** + * List all stored document IDs. + */ + async listAll(): Promise { + if (!this.#db) return []; + + return new Promise((resolve, reject) => { + const tx = this.#db!.transaction('meta', 'readonly'); + const request = tx.objectStore('meta').getAllKeys(); + + request.onsuccess = () => { + const all = request.result as string[]; + const filtered = all.filter((id) => id.startsWith(`${this.#spaceId}:`)); + resolve(filtered as DocumentId[]); + }; + request.onerror = () => reject(request.error); + }); + } + + /** + * Save sync state for a (docId, peerId) pair. + */ + async saveSyncState(docId: DocumentId, peerId: string, state: Uint8Array): Promise { + if (!this.#db) return; + + const entry: StoredSyncState = { + key: `${docId}\0${peerId}`, + state, + }; + + const tx = this.#db.transaction('sync', 'readwrite'); + tx.objectStore('sync').put(entry); + await this.#txComplete(tx); + } + + /** + * Load sync state for a (docId, peerId) pair. + */ + async loadSyncState(docId: DocumentId, peerId: string): Promise { + if (!this.#db) return null; + + return new Promise((resolve, reject) => { + const tx = this.#db!.transaction('sync', 'readonly'); + const request = tx.objectStore('sync').get(`${docId}\0${peerId}`); + + request.onsuccess = () => { + const entry = request.result as StoredSyncState | undefined; + resolve(entry?.state ?? null); + }; + request.onerror = () => reject(request.error); + }); + } + + /** + * Clear all sync state for a document. + */ + async clearSyncState(docId: DocumentId): Promise { + if (!this.#db) return; + + // Have to iterate since there's no compound index + const tx = this.#db.transaction('sync', 'readwrite'); + const store = tx.objectStore('sync'); + const request = store.openCursor(); + + return new Promise((resolve, reject) => { + request.onsuccess = () => { + const cursor = request.result; + if (!cursor) { resolve(); return; } + + const key = cursor.key as string; + if (key.startsWith(`${docId}\0`)) { + cursor.delete(); + } + cursor.continue(); + }; + request.onerror = () => reject(request.error); + }); + } + + /** + * Get metadata for a document. + */ + async getMeta(docId: DocumentId): Promise { + if (!this.#db) return null; + + return new Promise((resolve, reject) => { + const tx = this.#db!.transaction('meta', 'readonly'); + const request = tx.objectStore('meta').get(docId); + + request.onsuccess = () => resolve(request.result ?? null); + request.onerror = () => reject(request.error); + }); + } + + /** + * Flush all pending debounced saves immediately. + */ + async flush(): Promise { + const promises: Promise[] = []; + + for (const [docId, { data }] of this.#pendingSaves) { + const timer = this.#saveTimers.get(docId); + if (timer) clearTimeout(timer); + this.#saveTimers.delete(docId); + promises.push(this.#writeDoc(docId as DocumentId, data)); + } + this.#pendingSaves.clear(); + + await Promise.all(promises); + } + + // ---------- Private helpers ---------- + + async #writeDoc( + docId: DocumentId, + plaintext: Uint8Array, + meta?: { module: string; collection: string; version: number } + ): Promise { + if (!this.#db) return; + + const now = Date.now(); + let data: Uint8Array; + let encrypted = false; + + if (this.#crypto?.isInitialized) { + const docKey = await this.#crypto.deriveDocKeyDirect(this.#spaceId, docId); + const blob = await this.#crypto.encrypt(docKey, plaintext); + data = DocCrypto.pack(blob); + encrypted = true; + } else { + data = plaintext; + } + + const tx = this.#db.transaction(['docs', 'meta'], 'readwrite'); + + const storedDoc: StoredDoc = { docId, data, updatedAt: now, encrypted }; + tx.objectStore('docs').put(storedDoc); + + if (meta) { + const existingMeta = await this.getMeta(docId); + const storedMeta: StoredMeta = { + docId, + module: meta.module, + collection: meta.collection, + version: meta.version, + createdAt: existingMeta?.createdAt ?? now, + updatedAt: now, + }; + tx.objectStore('meta').put(storedMeta); + } + + await this.#txComplete(tx); + } + + #getDoc(docId: DocumentId): Promise { + if (!this.#db) return Promise.resolve(null); + + return new Promise((resolve, reject) => { + const tx = this.#db!.transaction('docs', 'readonly'); + const request = tx.objectStore('docs').get(docId); + + request.onsuccess = () => resolve(request.result ?? null); + request.onerror = () => reject(request.error); + }); + } + + #txComplete(tx: IDBTransaction): Promise { + return new Promise((resolve, reject) => { + tx.oncomplete = () => resolve(); + tx.onerror = () => reject(tx.error); + }); + } +} diff --git a/shared/local-first/sync.ts b/shared/local-first/sync.ts new file mode 100644 index 0000000..10b455d --- /dev/null +++ b/shared/local-first/sync.ts @@ -0,0 +1,489 @@ +/** + * Layer 4: Sync (Client) — Multi-document WebSocket sync. + * + * Multiplexes sync for multiple Automerge documents over a single + * WebSocket connection per space. Handles: + * - Subscribe/unsubscribe to individual documents + * - Automerge sync protocol per document + * - Offline queue (changes made offline replayed on reconnect) + * - Presence/awareness per document + * + * Wire protocol (JSON envelope, binary sync data as number arrays): + * { type: 'sync', docId, data: number[] } — Automerge sync message + * { type: 'subscribe', docIds: string[] } — Start syncing docs + * { type: 'unsubscribe', docIds: string[] } — Stop syncing docs + * { type: 'awareness', docId, peer, cursor? } — Presence + * { type: 'ping' } / { type: 'pong' } — Keep-alive + */ + +import * as Automerge from '@automerge/automerge'; +import type { DocumentId } from './document'; +import { DocumentManager } from './document'; +import { EncryptedDocStore } from './storage'; + +// ============================================================================ +// TYPES +// ============================================================================ + +export interface SyncMessage { + type: 'sync'; + docId: string; + data: number[]; +} + +export interface SubscribeMessage { + type: 'subscribe'; + docIds: string[]; +} + +export interface UnsubscribeMessage { + type: 'unsubscribe'; + docIds: string[]; +} + +export interface AwarenessMessage { + type: 'awareness'; + docId: string; + peer: string; + cursor?: { x: number; y: number }; + selection?: string; + username?: string; + color?: string; +} + +export type WireMessage = + | SyncMessage + | SubscribeMessage + | UnsubscribeMessage + | AwarenessMessage + | { type: 'ping' } + | { type: 'pong' }; + +export interface DocSyncManagerOptions { + documents: DocumentManager; + store?: EncryptedDocStore; + /** Peer ID for this client (defaults to random) */ + peerId?: string; + /** Auto-reconnect with exponential backoff (default: true) */ + autoReconnect?: boolean; + /** Max reconnect delay in ms (default: 30000) */ + maxReconnectDelay?: number; +} + +type DocChangeCallback = (doc: any) => void; +type AwarenessCallback = (msg: AwarenessMessage) => void; +type ConnectionCallback = () => void; + +// ============================================================================ +// DocSyncManager (Client) +// ============================================================================ + +export class DocSyncManager { + #documents: DocumentManager; + #store: EncryptedDocStore | null; + #peerId: string; + #ws: WebSocket | null = null; + #wsUrl: string | null = null; + #space: string | null = null; + #autoReconnect: boolean; + #maxReconnectDelay: number; + #reconnectAttempts = 0; + + // Per-document sync state + #syncStates = new Map(); + #subscribedDocs = new Set(); + + // Event listeners + #changeListeners = new Map>(); + #awarenessListeners = new Map>(); + #connectListeners = new Set(); + #disconnectListeners = new Set(); + + // Save debounce + #saveTimers = new Map>(); + + // Keep-alive + #pingInterval: ReturnType | null = null; + + constructor(opts: DocSyncManagerOptions) { + this.#documents = opts.documents; + this.#store = opts.store ?? null; + this.#peerId = opts.peerId ?? generatePeerId(); + this.#autoReconnect = opts.autoReconnect ?? true; + this.#maxReconnectDelay = opts.maxReconnectDelay ?? 30000; + } + + get peerId(): string { return this.#peerId; } + get isConnected(): boolean { return this.#ws?.readyState === WebSocket.OPEN; } + + /** + * Connect to the sync server for a space. + */ + async connect(wsUrl: string, space: string): Promise { + this.#wsUrl = wsUrl; + this.#space = space; + + if (this.#ws?.readyState === WebSocket.OPEN) return; + + return new Promise((resolve, reject) => { + this.#ws = new WebSocket(wsUrl); + this.#ws.binaryType = 'arraybuffer'; + + this.#ws.onopen = () => { + this.#reconnectAttempts = 0; + + // Subscribe to all currently tracked docs + if (this.#subscribedDocs.size > 0) { + this.#send({ + type: 'subscribe', + docIds: Array.from(this.#subscribedDocs), + }); + + // Initiate sync for each document + for (const docId of this.#subscribedDocs) { + this.#sendSyncMessage(docId); + } + } + + // Start keep-alive + this.#startPing(); + + for (const cb of this.#connectListeners) { + try { cb(); } catch { /* ignore */ } + } + + resolve(); + }; + + this.#ws.onmessage = (event) => { + this.#handleMessage(event.data); + }; + + this.#ws.onclose = () => { + this.#stopPing(); + for (const cb of this.#disconnectListeners) { + try { cb(); } catch { /* ignore */ } + } + this.#attemptReconnect(); + }; + + this.#ws.onerror = () => { + // onclose will fire after onerror + if (this.#ws?.readyState !== WebSocket.OPEN) { + reject(new Error('WebSocket connection failed')); + } + }; + }); + } + + /** + * Subscribe to sync for one or more documents. + */ + async subscribe(docIds: DocumentId[]): Promise { + const newIds: string[] = []; + + for (const id of docIds) { + if (!this.#subscribedDocs.has(id)) { + this.#subscribedDocs.add(id); + newIds.push(id); + + // Initialize sync state from store if available + if (this.#store && !this.#syncStates.has(id)) { + const saved = await this.#store.loadSyncState(id, this.#peerId); + if (saved) { + this.#syncStates.set(id, Automerge.decodeSyncState(saved)); + } + } + + if (!this.#syncStates.has(id)) { + this.#syncStates.set(id, Automerge.initSyncState()); + } + } + } + + if (newIds.length > 0 && this.isConnected) { + this.#send({ type: 'subscribe', docIds: newIds }); + for (const id of newIds) { + this.#sendSyncMessage(id as DocumentId); + } + } + } + + /** + * Unsubscribe from sync for one or more documents. + */ + unsubscribe(docIds: DocumentId[]): void { + const removed: string[] = []; + + for (const id of docIds) { + if (this.#subscribedDocs.delete(id)) { + removed.push(id); + } + } + + if (removed.length > 0 && this.isConnected) { + this.#send({ type: 'unsubscribe', docIds: removed }); + } + } + + /** + * Apply a local change to a document and sync to server. + */ + change(docId: DocumentId, message: string, fn: (doc: T) => void): void { + this.#documents.change(docId, message, fn); + this.#sendSyncMessage(docId); + this.#scheduleSave(docId); + } + + /** + * Send awareness/presence update for a document. + */ + sendAwareness(docId: DocumentId, data: Partial): void { + this.#send({ + type: 'awareness', + docId, + peer: this.#peerId, + ...data, + } as AwarenessMessage); + } + + /** + * Listen for changes on a specific document. + */ + onChange(docId: DocumentId, cb: DocChangeCallback): () => void { + let set = this.#changeListeners.get(docId); + if (!set) { + set = new Set(); + this.#changeListeners.set(docId, set); + } + set.add(cb); + return () => { set!.delete(cb); }; + } + + /** + * Listen for awareness updates on a specific document. + */ + onAwareness(docId: DocumentId, cb: AwarenessCallback): () => void { + let set = this.#awarenessListeners.get(docId); + if (!set) { + set = new Set(); + this.#awarenessListeners.set(docId, set); + } + set.add(cb); + return () => { set!.delete(cb); }; + } + + /** + * Listen for connection events. + */ + onConnect(cb: ConnectionCallback): () => void { + this.#connectListeners.add(cb); + return () => { this.#connectListeners.delete(cb); }; + } + + /** + * Listen for disconnection events. + */ + onDisconnect(cb: ConnectionCallback): () => void { + this.#disconnectListeners.add(cb); + return () => { this.#disconnectListeners.delete(cb); }; + } + + /** + * Disconnect from server. + */ + disconnect(): void { + this.#autoReconnect = false; + this.#stopPing(); + if (this.#ws) { + this.#ws.close(); + this.#ws = null; + } + } + + /** + * Flush all pending saves and sync state to IndexedDB. + */ + async flush(): Promise { + const promises: Promise[] = []; + + for (const [docId, timer] of this.#saveTimers) { + clearTimeout(timer); + const doc = this.#documents.get(docId); + if (doc && this.#store) { + const binary = Automerge.save(doc); + const meta = this.#documents.getMeta(docId); + promises.push( + this.#store.saveImmediate(docId, binary, meta ? { + module: meta.module, + collection: meta.collection, + version: meta.version, + } : undefined) + ); + } + } + this.#saveTimers.clear(); + + // Persist all sync states + for (const [docId, state] of this.#syncStates) { + if (this.#store) { + const encoded = Automerge.encodeSyncState(state); + promises.push(this.#store.saveSyncState(docId, this.#peerId, encoded)); + } + } + + await Promise.all(promises); + } + + // ---------- Private ---------- + + #handleMessage(raw: ArrayBuffer | string): void { + try { + const data = typeof raw === 'string' ? raw : new TextDecoder().decode(raw); + const msg: WireMessage = JSON.parse(data); + + switch (msg.type) { + case 'sync': + this.#handleSyncMessage(msg as SyncMessage); + break; + case 'awareness': + this.#handleAwareness(msg as AwarenessMessage); + break; + case 'pong': + // Keep-alive acknowledged + break; + } + } catch (e) { + console.error('[DocSyncManager] Failed to handle message:', e); + } + } + + #handleSyncMessage(msg: SyncMessage): void { + const docId = msg.docId as DocumentId; + const doc = this.#documents.get(docId); + if (!doc) return; + + let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState(); + const syncMsg = new Uint8Array(msg.data); + + const [newDoc, newSyncState] = Automerge.receiveSyncMessage( + doc, + syncState, + syncMsg + ); + + this.#documents.set(docId, newDoc); + this.#syncStates.set(docId, newSyncState); + this.#scheduleSave(docId); + + // Notify change listeners + if (newDoc !== doc) { + const listeners = this.#changeListeners.get(docId); + if (listeners) { + for (const cb of listeners) { + try { cb(newDoc); } catch { /* ignore */ } + } + } + } + + // Send response sync message if needed + this.#sendSyncMessage(docId); + } + + #handleAwareness(msg: AwarenessMessage): void { + const docId = msg.docId as DocumentId; + const listeners = this.#awarenessListeners.get(docId); + if (!listeners) return; + for (const cb of listeners) { + try { cb(msg); } catch { /* ignore */ } + } + } + + #sendSyncMessage(docId: DocumentId): void { + const doc = this.#documents.get(docId); + if (!doc) return; + + let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState(); + const [newSyncState, syncMessage] = Automerge.generateSyncMessage(doc, syncState); + + this.#syncStates.set(docId, newSyncState); + + if (syncMessage && this.isConnected) { + this.#send({ + type: 'sync', + docId, + data: Array.from(syncMessage), + }); + } + + // Persist sync state + if (this.#store) { + const encoded = Automerge.encodeSyncState(newSyncState); + this.#store.saveSyncState(docId, this.#peerId, encoded).catch(() => {}); + } + } + + #send(msg: WireMessage): void { + if (this.#ws?.readyState === WebSocket.OPEN) { + this.#ws.send(JSON.stringify(msg)); + } + } + + #scheduleSave(docId: DocumentId): void { + if (!this.#store) return; + + const existing = this.#saveTimers.get(docId); + if (existing) clearTimeout(existing); + + this.#saveTimers.set(docId, setTimeout(() => { + this.#saveTimers.delete(docId); + const doc = this.#documents.get(docId); + if (!doc) return; + + const binary = Automerge.save(doc); + const meta = this.#documents.getMeta(docId); + this.#store!.save(docId, binary, meta ? { + module: meta.module, + collection: meta.collection, + version: meta.version, + } : undefined); + }, 2000)); + } + + #attemptReconnect(): void { + if (!this.#autoReconnect || !this.#wsUrl || !this.#space) return; + + this.#reconnectAttempts++; + const delay = Math.min( + 1000 * Math.pow(2, this.#reconnectAttempts - 1), + this.#maxReconnectDelay + ); + + setTimeout(() => { + if (this.#wsUrl && this.#space) { + this.connect(this.#wsUrl, this.#space).catch(() => {}); + } + }, delay); + } + + #startPing(): void { + this.#stopPing(); + this.#pingInterval = setInterval(() => { + this.#send({ type: 'ping' }); + }, 30000); + } + + #stopPing(): void { + if (this.#pingInterval) { + clearInterval(this.#pingInterval); + this.#pingInterval = null; + } + } +} + +function generatePeerId(): string { + if (typeof crypto !== 'undefined' && crypto.randomUUID) { + return crypto.randomUUID(); + } + return `peer-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +}