rspace-online/shared/local-first/sync.ts

490 lines
13 KiB
TypeScript

/**
* Layer 4: Sync (Client) — Multi-document WebSocket sync.
*
* Multiplexes sync for multiple Automerge documents over a single
* WebSocket connection per space. Handles:
* - Subscribe/unsubscribe to individual documents
* - Automerge sync protocol per document
* - Offline queue (changes made offline replayed on reconnect)
* - Presence/awareness per document
*
* Wire protocol (JSON envelope, binary sync data as number arrays):
* { type: 'sync', docId, data: number[] } — Automerge sync message
* { type: 'subscribe', docIds: string[] } — Start syncing docs
* { type: 'unsubscribe', docIds: string[] } — Stop syncing docs
* { type: 'awareness', docId, peer, cursor? } — Presence
* { type: 'ping' } / { type: 'pong' } — Keep-alive
*/
import * as Automerge from '@automerge/automerge';
import type { DocumentId } from './document';
import { DocumentManager } from './document';
import { EncryptedDocStore } from './storage';
// ============================================================================
// TYPES
// ============================================================================
export interface SyncMessage {
type: 'sync';
docId: string;
data: number[];
}
export interface SubscribeMessage {
type: 'subscribe';
docIds: string[];
}
export interface UnsubscribeMessage {
type: 'unsubscribe';
docIds: string[];
}
export interface AwarenessMessage {
type: 'awareness';
docId: string;
peer: string;
cursor?: { x: number; y: number };
selection?: string;
username?: string;
color?: string;
}
export type WireMessage =
| SyncMessage
| SubscribeMessage
| UnsubscribeMessage
| AwarenessMessage
| { type: 'ping' }
| { type: 'pong' };
export interface DocSyncManagerOptions {
documents: DocumentManager;
store?: EncryptedDocStore;
/** Peer ID for this client (defaults to random) */
peerId?: string;
/** Auto-reconnect with exponential backoff (default: true) */
autoReconnect?: boolean;
/** Max reconnect delay in ms (default: 30000) */
maxReconnectDelay?: number;
}
type DocChangeCallback = (doc: any) => void;
type AwarenessCallback = (msg: AwarenessMessage) => void;
type ConnectionCallback = () => void;
// ============================================================================
// DocSyncManager (Client)
// ============================================================================
export class DocSyncManager {
#documents: DocumentManager;
#store: EncryptedDocStore | null;
#peerId: string;
#ws: WebSocket | null = null;
#wsUrl: string | null = null;
#space: string | null = null;
#autoReconnect: boolean;
#maxReconnectDelay: number;
#reconnectAttempts = 0;
// Per-document sync state
#syncStates = new Map<DocumentId, Automerge.SyncState>();
#subscribedDocs = new Set<DocumentId>();
// Event listeners
#changeListeners = new Map<DocumentId, Set<DocChangeCallback>>();
#awarenessListeners = new Map<DocumentId, Set<AwarenessCallback>>();
#connectListeners = new Set<ConnectionCallback>();
#disconnectListeners = new Set<ConnectionCallback>();
// Save debounce
#saveTimers = new Map<DocumentId, ReturnType<typeof setTimeout>>();
// Keep-alive
#pingInterval: ReturnType<typeof setInterval> | null = null;
constructor(opts: DocSyncManagerOptions) {
this.#documents = opts.documents;
this.#store = opts.store ?? null;
this.#peerId = opts.peerId ?? generatePeerId();
this.#autoReconnect = opts.autoReconnect ?? true;
this.#maxReconnectDelay = opts.maxReconnectDelay ?? 30000;
}
get peerId(): string { return this.#peerId; }
get isConnected(): boolean { return this.#ws?.readyState === WebSocket.OPEN; }
/**
* Connect to the sync server for a space.
*/
async connect(wsUrl: string, space: string): Promise<void> {
this.#wsUrl = wsUrl;
this.#space = space;
if (this.#ws?.readyState === WebSocket.OPEN) return;
return new Promise((resolve, reject) => {
this.#ws = new WebSocket(wsUrl);
this.#ws.binaryType = 'arraybuffer';
this.#ws.onopen = () => {
this.#reconnectAttempts = 0;
// Subscribe to all currently tracked docs
if (this.#subscribedDocs.size > 0) {
this.#send({
type: 'subscribe',
docIds: Array.from(this.#subscribedDocs),
});
// Initiate sync for each document
for (const docId of this.#subscribedDocs) {
this.#sendSyncMessage(docId);
}
}
// Start keep-alive
this.#startPing();
for (const cb of this.#connectListeners) {
try { cb(); } catch { /* ignore */ }
}
resolve();
};
this.#ws.onmessage = (event) => {
this.#handleMessage(event.data);
};
this.#ws.onclose = () => {
this.#stopPing();
for (const cb of this.#disconnectListeners) {
try { cb(); } catch { /* ignore */ }
}
this.#attemptReconnect();
};
this.#ws.onerror = () => {
// onclose will fire after onerror
if (this.#ws?.readyState !== WebSocket.OPEN) {
reject(new Error('WebSocket connection failed'));
}
};
});
}
/**
* Subscribe to sync for one or more documents.
*/
async subscribe(docIds: DocumentId[]): Promise<void> {
const newIds: string[] = [];
for (const id of docIds) {
if (!this.#subscribedDocs.has(id)) {
this.#subscribedDocs.add(id);
newIds.push(id);
// Initialize sync state from store if available
if (this.#store && !this.#syncStates.has(id)) {
const saved = await this.#store.loadSyncState(id, this.#peerId);
if (saved) {
this.#syncStates.set(id, Automerge.decodeSyncState(saved));
}
}
if (!this.#syncStates.has(id)) {
this.#syncStates.set(id, Automerge.initSyncState());
}
}
}
if (newIds.length > 0 && this.isConnected) {
this.#send({ type: 'subscribe', docIds: newIds });
for (const id of newIds) {
this.#sendSyncMessage(id as DocumentId);
}
}
}
/**
* Unsubscribe from sync for one or more documents.
*/
unsubscribe(docIds: DocumentId[]): void {
const removed: string[] = [];
for (const id of docIds) {
if (this.#subscribedDocs.delete(id)) {
removed.push(id);
}
}
if (removed.length > 0 && this.isConnected) {
this.#send({ type: 'unsubscribe', docIds: removed });
}
}
/**
* Apply a local change to a document and sync to server.
*/
change<T>(docId: DocumentId, message: string, fn: (doc: T) => void): void {
this.#documents.change<T>(docId, message, fn);
this.#sendSyncMessage(docId);
this.#scheduleSave(docId);
}
/**
* Send awareness/presence update for a document.
*/
sendAwareness(docId: DocumentId, data: Partial<AwarenessMessage>): void {
this.#send({
type: 'awareness',
docId,
peer: this.#peerId,
...data,
} as AwarenessMessage);
}
/**
* Listen for changes on a specific document.
*/
onChange(docId: DocumentId, cb: DocChangeCallback): () => void {
let set = this.#changeListeners.get(docId);
if (!set) {
set = new Set();
this.#changeListeners.set(docId, set);
}
set.add(cb);
return () => { set!.delete(cb); };
}
/**
* Listen for awareness updates on a specific document.
*/
onAwareness(docId: DocumentId, cb: AwarenessCallback): () => void {
let set = this.#awarenessListeners.get(docId);
if (!set) {
set = new Set();
this.#awarenessListeners.set(docId, set);
}
set.add(cb);
return () => { set!.delete(cb); };
}
/**
* Listen for connection events.
*/
onConnect(cb: ConnectionCallback): () => void {
this.#connectListeners.add(cb);
return () => { this.#connectListeners.delete(cb); };
}
/**
* Listen for disconnection events.
*/
onDisconnect(cb: ConnectionCallback): () => void {
this.#disconnectListeners.add(cb);
return () => { this.#disconnectListeners.delete(cb); };
}
/**
* Disconnect from server.
*/
disconnect(): void {
this.#autoReconnect = false;
this.#stopPing();
if (this.#ws) {
this.#ws.close();
this.#ws = null;
}
}
/**
* Flush all pending saves and sync state to IndexedDB.
*/
async flush(): Promise<void> {
const promises: Promise<void>[] = [];
for (const [docId, timer] of this.#saveTimers) {
clearTimeout(timer);
const doc = this.#documents.get(docId);
if (doc && this.#store) {
const binary = Automerge.save(doc);
const meta = this.#documents.getMeta(docId);
promises.push(
this.#store.saveImmediate(docId, binary, meta ? {
module: meta.module,
collection: meta.collection,
version: meta.version,
} : undefined)
);
}
}
this.#saveTimers.clear();
// Persist all sync states
for (const [docId, state] of this.#syncStates) {
if (this.#store) {
const encoded = Automerge.encodeSyncState(state);
promises.push(this.#store.saveSyncState(docId, this.#peerId, encoded));
}
}
await Promise.all(promises);
}
// ---------- Private ----------
#handleMessage(raw: ArrayBuffer | string): void {
try {
const data = typeof raw === 'string' ? raw : new TextDecoder().decode(raw);
const msg: WireMessage = JSON.parse(data);
switch (msg.type) {
case 'sync':
this.#handleSyncMessage(msg as SyncMessage);
break;
case 'awareness':
this.#handleAwareness(msg as AwarenessMessage);
break;
case 'pong':
// Keep-alive acknowledged
break;
}
} catch (e) {
console.error('[DocSyncManager] Failed to handle message:', e);
}
}
#handleSyncMessage(msg: SyncMessage): void {
const docId = msg.docId as DocumentId;
const doc = this.#documents.get(docId);
if (!doc) return;
let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState();
const syncMsg = new Uint8Array(msg.data);
const [newDoc, newSyncState] = Automerge.receiveSyncMessage(
doc,
syncState,
syncMsg
);
this.#documents.set(docId, newDoc);
this.#syncStates.set(docId, newSyncState);
this.#scheduleSave(docId);
// Notify change listeners
if (newDoc !== doc) {
const listeners = this.#changeListeners.get(docId);
if (listeners) {
for (const cb of listeners) {
try { cb(newDoc); } catch { /* ignore */ }
}
}
}
// Send response sync message if needed
this.#sendSyncMessage(docId);
}
#handleAwareness(msg: AwarenessMessage): void {
const docId = msg.docId as DocumentId;
const listeners = this.#awarenessListeners.get(docId);
if (!listeners) return;
for (const cb of listeners) {
try { cb(msg); } catch { /* ignore */ }
}
}
#sendSyncMessage(docId: DocumentId): void {
const doc = this.#documents.get(docId);
if (!doc) return;
let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState();
const [newSyncState, syncMessage] = Automerge.generateSyncMessage(doc, syncState);
this.#syncStates.set(docId, newSyncState);
if (syncMessage && this.isConnected) {
this.#send({
type: 'sync',
docId,
data: Array.from(syncMessage),
});
}
// Persist sync state
if (this.#store) {
const encoded = Automerge.encodeSyncState(newSyncState);
this.#store.saveSyncState(docId, this.#peerId, encoded).catch(() => {});
}
}
#send(msg: WireMessage): void {
if (this.#ws?.readyState === WebSocket.OPEN) {
this.#ws.send(JSON.stringify(msg));
}
}
#scheduleSave(docId: DocumentId): void {
if (!this.#store) return;
const existing = this.#saveTimers.get(docId);
if (existing) clearTimeout(existing);
this.#saveTimers.set(docId, setTimeout(() => {
this.#saveTimers.delete(docId);
const doc = this.#documents.get(docId);
if (!doc) return;
const binary = Automerge.save(doc);
const meta = this.#documents.getMeta(docId);
this.#store!.save(docId, binary, meta ? {
module: meta.module,
collection: meta.collection,
version: meta.version,
} : undefined);
}, 2000));
}
#attemptReconnect(): void {
if (!this.#autoReconnect || !this.#wsUrl || !this.#space) return;
this.#reconnectAttempts++;
const delay = Math.min(
1000 * Math.pow(2, this.#reconnectAttempts - 1),
this.#maxReconnectDelay
);
setTimeout(() => {
if (this.#wsUrl && this.#space) {
this.connect(this.#wsUrl, this.#space).catch(() => {});
}
}, delay);
}
#startPing(): void {
this.#stopPing();
this.#pingInterval = setInterval(() => {
this.#send({ type: 'ping' });
}, 30000);
}
#stopPing(): void {
if (this.#pingInterval) {
clearInterval(this.#pingInterval);
this.#pingInterval = null;
}
}
}
function generatePeerId(): string {
if (typeof crypto !== 'undefined' && crypto.randomUUID) {
return crypto.randomUUID();
}
return `peer-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}