490 lines
13 KiB
TypeScript
490 lines
13 KiB
TypeScript
/**
|
|
* Layer 4: Sync (Client) — Multi-document WebSocket sync.
|
|
*
|
|
* Multiplexes sync for multiple Automerge documents over a single
|
|
* WebSocket connection per space. Handles:
|
|
* - Subscribe/unsubscribe to individual documents
|
|
* - Automerge sync protocol per document
|
|
* - Offline queue (changes made offline replayed on reconnect)
|
|
* - Presence/awareness per document
|
|
*
|
|
* Wire protocol (JSON envelope, binary sync data as number arrays):
|
|
* { type: 'sync', docId, data: number[] } — Automerge sync message
|
|
* { type: 'subscribe', docIds: string[] } — Start syncing docs
|
|
* { type: 'unsubscribe', docIds: string[] } — Stop syncing docs
|
|
* { type: 'awareness', docId, peer, cursor? } — Presence
|
|
* { type: 'ping' } / { type: 'pong' } — Keep-alive
|
|
*/
|
|
|
|
import * as Automerge from '@automerge/automerge';
|
|
import type { DocumentId } from './document';
|
|
import { DocumentManager } from './document';
|
|
import { EncryptedDocStore } from './storage';
|
|
|
|
// ============================================================================
|
|
// TYPES
|
|
// ============================================================================
|
|
|
|
export interface SyncMessage {
|
|
type: 'sync';
|
|
docId: string;
|
|
data: number[];
|
|
}
|
|
|
|
export interface SubscribeMessage {
|
|
type: 'subscribe';
|
|
docIds: string[];
|
|
}
|
|
|
|
export interface UnsubscribeMessage {
|
|
type: 'unsubscribe';
|
|
docIds: string[];
|
|
}
|
|
|
|
export interface AwarenessMessage {
|
|
type: 'awareness';
|
|
docId: string;
|
|
peer: string;
|
|
cursor?: { x: number; y: number };
|
|
selection?: string;
|
|
username?: string;
|
|
color?: string;
|
|
}
|
|
|
|
export type WireMessage =
|
|
| SyncMessage
|
|
| SubscribeMessage
|
|
| UnsubscribeMessage
|
|
| AwarenessMessage
|
|
| { type: 'ping' }
|
|
| { type: 'pong' };
|
|
|
|
export interface DocSyncManagerOptions {
|
|
documents: DocumentManager;
|
|
store?: EncryptedDocStore;
|
|
/** Peer ID for this client (defaults to random) */
|
|
peerId?: string;
|
|
/** Auto-reconnect with exponential backoff (default: true) */
|
|
autoReconnect?: boolean;
|
|
/** Max reconnect delay in ms (default: 30000) */
|
|
maxReconnectDelay?: number;
|
|
}
|
|
|
|
type DocChangeCallback = (doc: any) => void;
|
|
type AwarenessCallback = (msg: AwarenessMessage) => void;
|
|
type ConnectionCallback = () => void;
|
|
|
|
// ============================================================================
|
|
// DocSyncManager (Client)
|
|
// ============================================================================
|
|
|
|
export class DocSyncManager {
|
|
#documents: DocumentManager;
|
|
#store: EncryptedDocStore | null;
|
|
#peerId: string;
|
|
#ws: WebSocket | null = null;
|
|
#wsUrl: string | null = null;
|
|
#space: string | null = null;
|
|
#autoReconnect: boolean;
|
|
#maxReconnectDelay: number;
|
|
#reconnectAttempts = 0;
|
|
|
|
// Per-document sync state
|
|
#syncStates = new Map<DocumentId, Automerge.SyncState>();
|
|
#subscribedDocs = new Set<DocumentId>();
|
|
|
|
// Event listeners
|
|
#changeListeners = new Map<DocumentId, Set<DocChangeCallback>>();
|
|
#awarenessListeners = new Map<DocumentId, Set<AwarenessCallback>>();
|
|
#connectListeners = new Set<ConnectionCallback>();
|
|
#disconnectListeners = new Set<ConnectionCallback>();
|
|
|
|
// Save debounce
|
|
#saveTimers = new Map<DocumentId, ReturnType<typeof setTimeout>>();
|
|
|
|
// Keep-alive
|
|
#pingInterval: ReturnType<typeof setInterval> | null = null;
|
|
|
|
constructor(opts: DocSyncManagerOptions) {
|
|
this.#documents = opts.documents;
|
|
this.#store = opts.store ?? null;
|
|
this.#peerId = opts.peerId ?? generatePeerId();
|
|
this.#autoReconnect = opts.autoReconnect ?? true;
|
|
this.#maxReconnectDelay = opts.maxReconnectDelay ?? 30000;
|
|
}
|
|
|
|
get peerId(): string { return this.#peerId; }
|
|
get isConnected(): boolean { return this.#ws?.readyState === WebSocket.OPEN; }
|
|
|
|
/**
|
|
* Connect to the sync server for a space.
|
|
*/
|
|
async connect(wsUrl: string, space: string): Promise<void> {
|
|
this.#wsUrl = wsUrl;
|
|
this.#space = space;
|
|
|
|
if (this.#ws?.readyState === WebSocket.OPEN) return;
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this.#ws = new WebSocket(wsUrl);
|
|
this.#ws.binaryType = 'arraybuffer';
|
|
|
|
this.#ws.onopen = () => {
|
|
this.#reconnectAttempts = 0;
|
|
|
|
// Subscribe to all currently tracked docs
|
|
if (this.#subscribedDocs.size > 0) {
|
|
this.#send({
|
|
type: 'subscribe',
|
|
docIds: Array.from(this.#subscribedDocs),
|
|
});
|
|
|
|
// Initiate sync for each document
|
|
for (const docId of this.#subscribedDocs) {
|
|
this.#sendSyncMessage(docId);
|
|
}
|
|
}
|
|
|
|
// Start keep-alive
|
|
this.#startPing();
|
|
|
|
for (const cb of this.#connectListeners) {
|
|
try { cb(); } catch { /* ignore */ }
|
|
}
|
|
|
|
resolve();
|
|
};
|
|
|
|
this.#ws.onmessage = (event) => {
|
|
this.#handleMessage(event.data);
|
|
};
|
|
|
|
this.#ws.onclose = () => {
|
|
this.#stopPing();
|
|
for (const cb of this.#disconnectListeners) {
|
|
try { cb(); } catch { /* ignore */ }
|
|
}
|
|
this.#attemptReconnect();
|
|
};
|
|
|
|
this.#ws.onerror = () => {
|
|
// onclose will fire after onerror
|
|
if (this.#ws?.readyState !== WebSocket.OPEN) {
|
|
reject(new Error('WebSocket connection failed'));
|
|
}
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Subscribe to sync for one or more documents.
|
|
*/
|
|
async subscribe(docIds: DocumentId[]): Promise<void> {
|
|
const newIds: string[] = [];
|
|
|
|
for (const id of docIds) {
|
|
if (!this.#subscribedDocs.has(id)) {
|
|
this.#subscribedDocs.add(id);
|
|
newIds.push(id);
|
|
|
|
// Initialize sync state from store if available
|
|
if (this.#store && !this.#syncStates.has(id)) {
|
|
const saved = await this.#store.loadSyncState(id, this.#peerId);
|
|
if (saved) {
|
|
this.#syncStates.set(id, Automerge.decodeSyncState(saved));
|
|
}
|
|
}
|
|
|
|
if (!this.#syncStates.has(id)) {
|
|
this.#syncStates.set(id, Automerge.initSyncState());
|
|
}
|
|
}
|
|
}
|
|
|
|
if (newIds.length > 0 && this.isConnected) {
|
|
this.#send({ type: 'subscribe', docIds: newIds });
|
|
for (const id of newIds) {
|
|
this.#sendSyncMessage(id as DocumentId);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe from sync for one or more documents.
|
|
*/
|
|
unsubscribe(docIds: DocumentId[]): void {
|
|
const removed: string[] = [];
|
|
|
|
for (const id of docIds) {
|
|
if (this.#subscribedDocs.delete(id)) {
|
|
removed.push(id);
|
|
}
|
|
}
|
|
|
|
if (removed.length > 0 && this.isConnected) {
|
|
this.#send({ type: 'unsubscribe', docIds: removed });
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Apply a local change to a document and sync to server.
|
|
*/
|
|
change<T>(docId: DocumentId, message: string, fn: (doc: T) => void): void {
|
|
this.#documents.change<T>(docId, message, fn);
|
|
this.#sendSyncMessage(docId);
|
|
this.#scheduleSave(docId);
|
|
}
|
|
|
|
/**
|
|
* Send awareness/presence update for a document.
|
|
*/
|
|
sendAwareness(docId: DocumentId, data: Partial<AwarenessMessage>): void {
|
|
this.#send({
|
|
type: 'awareness',
|
|
docId,
|
|
peer: this.#peerId,
|
|
...data,
|
|
} as AwarenessMessage);
|
|
}
|
|
|
|
/**
|
|
* Listen for changes on a specific document.
|
|
*/
|
|
onChange(docId: DocumentId, cb: DocChangeCallback): () => void {
|
|
let set = this.#changeListeners.get(docId);
|
|
if (!set) {
|
|
set = new Set();
|
|
this.#changeListeners.set(docId, set);
|
|
}
|
|
set.add(cb);
|
|
return () => { set!.delete(cb); };
|
|
}
|
|
|
|
/**
|
|
* Listen for awareness updates on a specific document.
|
|
*/
|
|
onAwareness(docId: DocumentId, cb: AwarenessCallback): () => void {
|
|
let set = this.#awarenessListeners.get(docId);
|
|
if (!set) {
|
|
set = new Set();
|
|
this.#awarenessListeners.set(docId, set);
|
|
}
|
|
set.add(cb);
|
|
return () => { set!.delete(cb); };
|
|
}
|
|
|
|
/**
|
|
* Listen for connection events.
|
|
*/
|
|
onConnect(cb: ConnectionCallback): () => void {
|
|
this.#connectListeners.add(cb);
|
|
return () => { this.#connectListeners.delete(cb); };
|
|
}
|
|
|
|
/**
|
|
* Listen for disconnection events.
|
|
*/
|
|
onDisconnect(cb: ConnectionCallback): () => void {
|
|
this.#disconnectListeners.add(cb);
|
|
return () => { this.#disconnectListeners.delete(cb); };
|
|
}
|
|
|
|
/**
|
|
* Disconnect from server.
|
|
*/
|
|
disconnect(): void {
|
|
this.#autoReconnect = false;
|
|
this.#stopPing();
|
|
if (this.#ws) {
|
|
this.#ws.close();
|
|
this.#ws = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush all pending saves and sync state to IndexedDB.
|
|
*/
|
|
async flush(): Promise<void> {
|
|
const promises: Promise<void>[] = [];
|
|
|
|
for (const [docId, timer] of this.#saveTimers) {
|
|
clearTimeout(timer);
|
|
const doc = this.#documents.get(docId);
|
|
if (doc && this.#store) {
|
|
const binary = Automerge.save(doc);
|
|
const meta = this.#documents.getMeta(docId);
|
|
promises.push(
|
|
this.#store.saveImmediate(docId, binary, meta ? {
|
|
module: meta.module,
|
|
collection: meta.collection,
|
|
version: meta.version,
|
|
} : undefined)
|
|
);
|
|
}
|
|
}
|
|
this.#saveTimers.clear();
|
|
|
|
// Persist all sync states
|
|
for (const [docId, state] of this.#syncStates) {
|
|
if (this.#store) {
|
|
const encoded = Automerge.encodeSyncState(state);
|
|
promises.push(this.#store.saveSyncState(docId, this.#peerId, encoded));
|
|
}
|
|
}
|
|
|
|
await Promise.all(promises);
|
|
}
|
|
|
|
// ---------- Private ----------
|
|
|
|
#handleMessage(raw: ArrayBuffer | string): void {
|
|
try {
|
|
const data = typeof raw === 'string' ? raw : new TextDecoder().decode(raw);
|
|
const msg: WireMessage = JSON.parse(data);
|
|
|
|
switch (msg.type) {
|
|
case 'sync':
|
|
this.#handleSyncMessage(msg as SyncMessage);
|
|
break;
|
|
case 'awareness':
|
|
this.#handleAwareness(msg as AwarenessMessage);
|
|
break;
|
|
case 'pong':
|
|
// Keep-alive acknowledged
|
|
break;
|
|
}
|
|
} catch (e) {
|
|
console.error('[DocSyncManager] Failed to handle message:', e);
|
|
}
|
|
}
|
|
|
|
#handleSyncMessage(msg: SyncMessage): void {
|
|
const docId = msg.docId as DocumentId;
|
|
const doc = this.#documents.get(docId);
|
|
if (!doc) return;
|
|
|
|
let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState();
|
|
const syncMsg = new Uint8Array(msg.data);
|
|
|
|
const [newDoc, newSyncState] = Automerge.receiveSyncMessage(
|
|
doc,
|
|
syncState,
|
|
syncMsg
|
|
);
|
|
|
|
this.#documents.set(docId, newDoc);
|
|
this.#syncStates.set(docId, newSyncState);
|
|
this.#scheduleSave(docId);
|
|
|
|
// Notify change listeners
|
|
if (newDoc !== doc) {
|
|
const listeners = this.#changeListeners.get(docId);
|
|
if (listeners) {
|
|
for (const cb of listeners) {
|
|
try { cb(newDoc); } catch { /* ignore */ }
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send response sync message if needed
|
|
this.#sendSyncMessage(docId);
|
|
}
|
|
|
|
#handleAwareness(msg: AwarenessMessage): void {
|
|
const docId = msg.docId as DocumentId;
|
|
const listeners = this.#awarenessListeners.get(docId);
|
|
if (!listeners) return;
|
|
for (const cb of listeners) {
|
|
try { cb(msg); } catch { /* ignore */ }
|
|
}
|
|
}
|
|
|
|
#sendSyncMessage(docId: DocumentId): void {
|
|
const doc = this.#documents.get(docId);
|
|
if (!doc) return;
|
|
|
|
let syncState = this.#syncStates.get(docId) ?? Automerge.initSyncState();
|
|
const [newSyncState, syncMessage] = Automerge.generateSyncMessage(doc, syncState);
|
|
|
|
this.#syncStates.set(docId, newSyncState);
|
|
|
|
if (syncMessage && this.isConnected) {
|
|
this.#send({
|
|
type: 'sync',
|
|
docId,
|
|
data: Array.from(syncMessage),
|
|
});
|
|
}
|
|
|
|
// Persist sync state
|
|
if (this.#store) {
|
|
const encoded = Automerge.encodeSyncState(newSyncState);
|
|
this.#store.saveSyncState(docId, this.#peerId, encoded).catch(() => {});
|
|
}
|
|
}
|
|
|
|
#send(msg: WireMessage): void {
|
|
if (this.#ws?.readyState === WebSocket.OPEN) {
|
|
this.#ws.send(JSON.stringify(msg));
|
|
}
|
|
}
|
|
|
|
#scheduleSave(docId: DocumentId): void {
|
|
if (!this.#store) return;
|
|
|
|
const existing = this.#saveTimers.get(docId);
|
|
if (existing) clearTimeout(existing);
|
|
|
|
this.#saveTimers.set(docId, setTimeout(() => {
|
|
this.#saveTimers.delete(docId);
|
|
const doc = this.#documents.get(docId);
|
|
if (!doc) return;
|
|
|
|
const binary = Automerge.save(doc);
|
|
const meta = this.#documents.getMeta(docId);
|
|
this.#store!.save(docId, binary, meta ? {
|
|
module: meta.module,
|
|
collection: meta.collection,
|
|
version: meta.version,
|
|
} : undefined);
|
|
}, 2000));
|
|
}
|
|
|
|
#attemptReconnect(): void {
|
|
if (!this.#autoReconnect || !this.#wsUrl || !this.#space) return;
|
|
|
|
this.#reconnectAttempts++;
|
|
const delay = Math.min(
|
|
1000 * Math.pow(2, this.#reconnectAttempts - 1),
|
|
this.#maxReconnectDelay
|
|
);
|
|
|
|
setTimeout(() => {
|
|
if (this.#wsUrl && this.#space) {
|
|
this.connect(this.#wsUrl, this.#space).catch(() => {});
|
|
}
|
|
}, delay);
|
|
}
|
|
|
|
#startPing(): void {
|
|
this.#stopPing();
|
|
this.#pingInterval = setInterval(() => {
|
|
this.#send({ type: 'ping' });
|
|
}, 30000);
|
|
}
|
|
|
|
#stopPing(): void {
|
|
if (this.#pingInterval) {
|
|
clearInterval(this.#pingInterval);
|
|
this.#pingInterval = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
function generatePeerId(): string {
|
|
if (typeof crypto !== 'undefined' && crypto.randomUUID) {
|
|
return crypto.randomUUID();
|
|
}
|
|
return `peer-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
|
}
|