braid-transport/src/sync/braid-state-sync.ts

283 lines
7.3 KiB
TypeScript

/**
* Braid State Sync
*
* Drop-in replacement for StateSync that uses Braid-HTTP subscriptions
* instead of polling-based WebSocket sync.
*
* Key improvement: instead of periodic syncWithAllPeers() on a timer,
* state changes are pushed to subscribers immediately via Braid-HTTP.
* This reduces latency from syncIntervalMs (default 1s) to near-zero.
*/
import * as Automerge from '@automerge/automerge';
import pino from 'pino';
import type { BraidPeerManager } from './braid-peer-manager.js';
const logger = pino({ name: 'braid-state-sync' });
/**
* Configuration for BraidStateSync
*/
export interface BraidStateSyncConfig {
/** Debounce interval for batching rapid changes into one push (ms) */
pushDebounceMs?: number;
}
const DEFAULT_CONFIG: Required<BraidStateSyncConfig> = {
pushDebounceMs: 50,
};
/**
* Sync state tracked per peer
*/
interface PeerSyncState {
nodeId: string;
syncState: Automerge.SyncState;
lastSyncAt: number;
}
/**
* Braid-HTTP State Synchronization
*
* Instead of polling, this listens for document changes and pushes
* Automerge sync messages to subscribers via Braid-HTTP.
*/
export class BraidStateSync {
private config: Required<BraidStateSyncConfig>;
private document: any; // ConsensusDocument — typed as any to avoid circular dep
private peerManager: BraidPeerManager;
private peerSyncStates: Map<string, PeerSyncState> = new Map();
private isRunning = false;
private pushTimer: NodeJS.Timeout | null = null;
private pendingPush = false;
constructor(
document: any,
peerManager: BraidPeerManager,
config: BraidStateSyncConfig = {}
) {
this.document = document;
this.peerManager = peerManager;
this.config = { ...DEFAULT_CONFIG, ...config };
// Listen for peer events
this.peerManager.on('peer:connected', (nodeId: string) => {
this.initPeerSyncState(nodeId);
// Push current state to new peer immediately
this.syncWithPeer(nodeId);
});
this.peerManager.on('peer:disconnected', (nodeId: string) => {
this.peerSyncStates.delete(nodeId);
});
// Listen for incoming sync messages (via Braid subscription)
this.peerManager.on('message:sync_request', (nodeId: string) => {
this.handleSyncRequest(nodeId);
});
this.peerManager.on('message:sync_response', (nodeId: string, data: Uint8Array) => {
this.handleSyncResponse(nodeId, data);
});
}
/**
* Start the sync manager.
* Unlike the WebSocket version, we don't poll — we push on change.
*/
start(): void {
if (this.isRunning) return;
logger.info('Starting Braid state sync');
// Initialize sync state for existing peers
for (const nodeId of this.peerManager.connectedPeerIds) {
this.initPeerSyncState(nodeId);
}
this.isRunning = true;
}
/**
* Stop the sync manager.
*/
stop(): void {
if (!this.isRunning) return;
logger.info('Stopping Braid state sync');
if (this.pushTimer) {
clearTimeout(this.pushTimer);
this.pushTimer = null;
}
this.peerSyncStates.clear();
this.isRunning = false;
}
/**
* Notify that the document has changed.
* Call this after any Automerge.change() to trigger a push to subscribers.
* This is the key API difference from the polling-based StateSync.
*/
notifyChange(): void {
if (!this.isRunning) return;
// Debounce rapid changes
if (this.pushTimer) return;
this.pushTimer = setTimeout(() => {
this.pushTimer = null;
this.pushToAllPeers();
}, this.config.pushDebounceMs);
}
/**
* Push sync messages to all connected peers via Braid-HTTP.
*/
private pushToAllPeers(): void {
for (const nodeId of this.peerManager.connectedPeerIds) {
this.syncWithPeer(nodeId);
}
}
/**
* Initialize sync state for a peer.
*/
private initPeerSyncState(nodeId: string): void {
if (this.peerSyncStates.has(nodeId)) return;
this.peerSyncStates.set(nodeId, {
nodeId,
syncState: Automerge.initSyncState(),
lastSyncAt: 0,
});
logger.debug({ nodeId }, 'Initialized sync state for peer');
}
/**
* Generate and push sync message to a specific peer.
* Uses Braid-HTTP push instead of WebSocket send.
*/
private async syncWithPeer(nodeId: string): Promise<void> {
const peerState = this.peerSyncStates.get(nodeId);
if (!peerState) return;
try {
const doc = this.document['doc'];
const [newSyncState, syncMessage] = Automerge.generateSyncMessage(
doc,
peerState.syncState
);
peerState.syncState = newSyncState;
if (syncMessage) {
// Push via Braid-HTTP to the subscriber
this.peerManager.pushUpdate(syncMessage, 'automerge');
// Also send as a direct message for peers that aren't subscribed
try {
await this.peerManager.sendTo(nodeId, {
type: 'SYNC_RESPONSE',
nodeId: this.peerManager.connectedPeerIds[0] ?? '', // our nodeId
timestamp: Date.now(),
payload: {
data: Buffer.from(syncMessage).toString('base64'),
},
});
} catch {
// Peer might not be reachable via direct send — that's fine,
// the Braid push covers subscribed peers
}
logger.debug(
{ nodeId, messageSize: syncMessage.length },
'Pushed sync via Braid'
);
}
peerState.lastSyncAt = Date.now();
} catch (error) {
logger.error({ nodeId, error }, 'Sync with peer failed');
}
}
/**
* Handle sync request from peer.
*/
private async handleSyncRequest(nodeId: string): Promise<void> {
if (!this.peerSyncStates.has(nodeId)) {
this.initPeerSyncState(nodeId);
}
await this.syncWithPeer(nodeId);
}
/**
* Handle sync response from peer (received via Braid subscription).
*/
private handleSyncResponse(nodeId: string, data: Uint8Array): void {
const peerState = this.peerSyncStates.get(nodeId);
if (!peerState) {
this.initPeerSyncState(nodeId);
return;
}
try {
const doc = this.document['doc'];
const [newDoc, newSyncState] = Automerge.receiveSyncMessage(
doc,
peerState.syncState,
data
);
this.document['doc'] = newDoc;
peerState.syncState = newSyncState;
logger.debug({ nodeId }, 'Applied Braid sync message from peer');
// Check if we need to send more
const [, nextMessage] = Automerge.generateSyncMessage(
newDoc,
newSyncState
);
if (nextMessage) {
this.syncWithPeer(nodeId);
}
} catch (error) {
logger.error({ nodeId, error }, 'Failed to apply sync message');
}
}
/**
* Force full sync with a peer (for recovery).
*/
async forceSync(nodeId: string): Promise<void> {
this.peerSyncStates.set(nodeId, {
nodeId,
syncState: Automerge.initSyncState(),
lastSyncAt: 0,
});
await this.syncWithPeer(nodeId);
}
/**
* Get sync status for all peers.
*/
getSyncStatus(): Array<{
nodeId: string;
lastSyncAt: number;
isConnected: boolean;
}> {
return Array.from(this.peerSyncStates.values()).map((state) => ({
nodeId: state.nodeId,
lastSyncAt: state.lastSyncAt,
isConnected: this.peerManager.connectedPeerIds.includes(state.nodeId),
}));
}
}