feat: Braid-HTTP transport adapter for BFT-CRDT consensus sync
Drop-in replacement for WebSocket-based PeerManager and StateSync using Braid-HTTP subscriptions. Key components: - BraidPeerManager: HTTP-native peer transport (no WebSocket server) - BraidStateSync: Push-based sync instead of polling - BraidStatePublisher: Serves JSON state to Simpleton light clients - SimpletonClient: Zero-CRDT read-only client (~50 lines of protocol) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
commit
6959ad4106
|
|
@ -0,0 +1,4 @@
|
|||
node_modules/
|
||||
dist/
|
||||
*.tsbuildinfo
|
||||
.env
|
||||
|
|
@ -0,0 +1,120 @@
|
|||
/**
|
||||
* Integration Guide: Swapping WebSocket for Braid-HTTP
|
||||
*
|
||||
* This shows the minimal changes needed in consensus-service/src/consensus/service.ts
|
||||
* to use Braid-HTTP transport instead of WebSocket.
|
||||
*
|
||||
* Changes needed:
|
||||
* 1. Replace PeerManager with BraidPeerManager
|
||||
* 2. Replace StateSync with BraidStateSync
|
||||
* 3. Mount Braid router on Express app
|
||||
* 4. Add BraidStatePublisher for Simpleton clients
|
||||
* 5. Call notifyChange() after document mutations
|
||||
*
|
||||
* Everything else (ConsensusNode, ConsensusDocument, RollupAggregator)
|
||||
* stays exactly the same.
|
||||
*/
|
||||
|
||||
// ============================================================
|
||||
// BEFORE (WebSocket-based) — in consensus/service.ts
|
||||
// ============================================================
|
||||
//
|
||||
// import { PeerManager } from '../sync/peer-manager.js';
|
||||
// import { StateSync } from '../sync/state-sync.js';
|
||||
//
|
||||
// // In start():
|
||||
// this.peerManager = new PeerManager({
|
||||
// nodeId, publicKey, privateKey, host, port, redisUrl,
|
||||
// });
|
||||
// await this.peerManager.start(); // starts WebSocket server on port+1000
|
||||
//
|
||||
// this.stateSync = new StateSync(this.document, this.peerManager);
|
||||
// this.stateSync.start(); // starts polling timer
|
||||
|
||||
// ============================================================
|
||||
// AFTER (Braid-HTTP) — in consensus/service.ts
|
||||
// ============================================================
|
||||
|
||||
import express from 'express';
|
||||
import {
|
||||
BraidPeerManager,
|
||||
BraidStateSync,
|
||||
BraidStatePublisher,
|
||||
} from '@rspace/braid-transport';
|
||||
|
||||
// --- In constructor or start() ---
|
||||
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
|
||||
// 1. Create Braid peer manager (replaces WebSocket PeerManager)
|
||||
const braidPeerManager = new BraidPeerManager({
|
||||
nodeId: 'node-1',
|
||||
publicKey: '...',
|
||||
privateKey: new Uint8Array([]),
|
||||
host: 'consensus-service',
|
||||
port: 3007,
|
||||
redisUrl: 'redis://redis:6379',
|
||||
});
|
||||
|
||||
// 2. Mount Braid routes on Express (no separate WebSocket server!)
|
||||
app.use(braidPeerManager.createRouter());
|
||||
|
||||
// 3. Create push-based state sync (replaces polling StateSync)
|
||||
const document = null as any; // your ConsensusDocument
|
||||
const braidStateSync = new BraidStateSync(document, braidPeerManager);
|
||||
|
||||
// 4. Create state publisher for Simpleton light clients
|
||||
const statePublisher = new BraidStatePublisher(document);
|
||||
app.use(statePublisher.createRouter());
|
||||
|
||||
// 5. Start everything
|
||||
await braidPeerManager.start();
|
||||
braidStateSync.start();
|
||||
|
||||
// 6. After any document mutation, notify both sync systems:
|
||||
// braidStateSync.notifyChange(); // push to peer nodes
|
||||
// statePublisher.notifyChange(); // push to simpleton clients
|
||||
|
||||
// --- Wire up consensus events (same as before) ---
|
||||
// braidPeerManager.on('message:propose', ...) — same handlers
|
||||
// braidPeerManager.on('message:vote', ...) — same handlers
|
||||
// braidPeerManager.on('message:commit', ...) — same handlers
|
||||
// braidPeerManager.on('peer:connected', ...) — same handlers
|
||||
// braidPeerManager.on('peer:disconnected', ...) — same handlers
|
||||
|
||||
// ============================================================
|
||||
// Simpleton Client Usage (e.g., in wallet-service)
|
||||
// ============================================================
|
||||
|
||||
import { SimpletonClient } from '@rspace/braid-transport';
|
||||
|
||||
async function walletServiceExample() {
|
||||
// Create a light client — no Automerge dependency needed!
|
||||
const client = new SimpletonClient({
|
||||
endpoint: 'http://consensus-service:3007',
|
||||
});
|
||||
|
||||
// Watch specific addresses for balance changes
|
||||
client.watchAddress('0x1234...');
|
||||
|
||||
client.on('balance:changed', (address, balances) => {
|
||||
console.log(`Balance changed for ${address}:`, balances);
|
||||
});
|
||||
|
||||
client.on('state:updated', (state) => {
|
||||
console.log('State version:', state.version);
|
||||
console.log('Wallet count:', Object.keys(state.wallets).length);
|
||||
});
|
||||
|
||||
// Connect — opens Braid-HTTP subscription
|
||||
await client.connect();
|
||||
|
||||
// One-shot balance query (doesn't need subscription)
|
||||
const balance = await client.fetchBalance('0x1234...');
|
||||
console.log('Balance:', balance);
|
||||
|
||||
// Read from cached state (instant, no network call)
|
||||
const cachedBalance = client.getBalance('0x1234...', '0x833589fcd6edb6e08f4c7c32d4f71b54bda02913');
|
||||
console.log('Cached USDC balance:', cachedBalance);
|
||||
}
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
{
|
||||
"name": "@rspace/braid-transport",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"description": "Braid-HTTP transport adapter for BFT-CRDT consensus sync",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"dev": "tsx watch src/index.ts",
|
||||
"build": "tsc",
|
||||
"start": "node dist/index.js",
|
||||
"test": "vitest",
|
||||
"lint": "eslint src/",
|
||||
"clean": "rm -rf dist"
|
||||
},
|
||||
"dependencies": {
|
||||
"@automerge/automerge": "^2.2.8",
|
||||
"braid-http": "^0.9.0",
|
||||
"express": "^4.21.0",
|
||||
"pino": "^8.21.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/express": "^4.17.21",
|
||||
"@types/node": "^20.14.0",
|
||||
"tsx": "^4.19.0",
|
||||
"typescript": "^5.5.0",
|
||||
"vitest": "^1.6.0"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=20.0.0"
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* @rspace/braid-transport
|
||||
*
|
||||
* Braid-HTTP transport adapter for the BFT-CRDT consensus service.
|
||||
*
|
||||
* Provides drop-in replacements for the WebSocket-based PeerManager
|
||||
* and StateSync, using Braid-HTTP subscriptions for state synchronization
|
||||
* and HTTP POST for consensus messages.
|
||||
*
|
||||
* Components:
|
||||
* - BraidPeerManager: Replaces WebSocket PeerManager with HTTP transport
|
||||
* - BraidStateSync: Push-based state sync instead of polling
|
||||
* - BraidStatePublisher: Serves state to Simpleton light clients
|
||||
* - SimpletonClient: Zero-CRDT light client for read-only state access
|
||||
*/
|
||||
|
||||
export { BraidPeerManager } from './transport/braid-peer-manager.js';
|
||||
export type { BraidPeerManagerEvents } from './transport/braid-peer-manager.js';
|
||||
|
||||
export { BraidStateSync } from './sync/braid-state-sync.js';
|
||||
export type { BraidStateSyncConfig } from './sync/braid-state-sync.js';
|
||||
|
||||
export { BraidStatePublisher } from './sync/braid-state-publisher.js';
|
||||
export type { StatePublisherConfig } from './sync/braid-state-publisher.js';
|
||||
|
||||
export { SimpletonClient } from './simpleton/simpleton-client.js';
|
||||
export type { SimpletonClientEvents } from './simpleton/simpleton-client.js';
|
||||
|
||||
export type {
|
||||
BraidVersion,
|
||||
BraidUpdate,
|
||||
BraidPeerInfo,
|
||||
BraidPeerManagerConfig,
|
||||
ConsensusMessageEnvelope,
|
||||
BraidSubscribeOptions,
|
||||
SimpletonClientConfig,
|
||||
} from './types/index.js';
|
||||
|
|
@ -0,0 +1,299 @@
|
|||
/**
|
||||
* Simpleton Light Client
|
||||
*
|
||||
* A minimal read-only client that subscribes to CRDT state via Braid-HTTP.
|
||||
* The client maintains zero CRDT history — the server does all the heavy
|
||||
* lifting. The client just receives patches and applies them to local state.
|
||||
*
|
||||
* This is Braid's "Simpleton" pattern: ~50 lines of protocol logic,
|
||||
* no Automerge dependency on the client side.
|
||||
*
|
||||
* Use cases:
|
||||
* - wallet-service reading balances (no need for full Automerge engine)
|
||||
* - onramp-service monitoring deposit confirmations
|
||||
* - Dashboard/monitoring UIs
|
||||
* - Mobile clients with limited resources
|
||||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import pino from 'pino';
|
||||
import type { SimpletonClientConfig } from '../types/index.js';
|
||||
|
||||
const logger = pino({ name: 'simpleton-client' });
|
||||
|
||||
/**
|
||||
* Events emitted by SimpletonClient
|
||||
*/
|
||||
export interface SimpletonClientEvents {
|
||||
'connected': () => void;
|
||||
'disconnected': () => void;
|
||||
'state:updated': (state: any) => void;
|
||||
'balance:changed': (address: string, balances: Record<string, string>) => void;
|
||||
'error': (error: Error) => void;
|
||||
}
|
||||
|
||||
export declare interface SimpletonClient {
|
||||
on<E extends keyof SimpletonClientEvents>(
|
||||
event: E,
|
||||
listener: SimpletonClientEvents[E]
|
||||
): this;
|
||||
emit<E extends keyof SimpletonClientEvents>(
|
||||
event: E,
|
||||
...args: Parameters<SimpletonClientEvents[E]>
|
||||
): boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simpleton Light Client
|
||||
*
|
||||
* Subscribes to a consensus node's state via Braid-HTTP and maintains
|
||||
* a local copy of the state without any CRDT machinery.
|
||||
*/
|
||||
export class SimpletonClient extends EventEmitter {
|
||||
private config: Required<SimpletonClientConfig>;
|
||||
private controller: AbortController | null = null;
|
||||
private currentState: any = null;
|
||||
private currentVersion: string | null = null;
|
||||
private isConnected = false;
|
||||
private reconnectTimer: NodeJS.Timeout | null = null;
|
||||
private watchedAddresses: Set<string> = new Set();
|
||||
|
||||
constructor(config: SimpletonClientConfig) {
|
||||
super();
|
||||
|
||||
this.config = {
|
||||
statePath: '/braid/state',
|
||||
balancePath: '/balance',
|
||||
reconnectDelayMs: 2000,
|
||||
...config,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to the consensus node's state.
|
||||
* Opens a long-lived Braid-HTTP subscription.
|
||||
*/
|
||||
async connect(): Promise<void> {
|
||||
if (this.isConnected) return;
|
||||
|
||||
this.controller = new AbortController();
|
||||
|
||||
try {
|
||||
const response = await fetch(
|
||||
`${this.config.endpoint}${this.config.statePath}`,
|
||||
{
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Subscribe': 'keep-alive',
|
||||
'Accept': 'application/json',
|
||||
},
|
||||
signal: this.controller.signal,
|
||||
}
|
||||
);
|
||||
|
||||
if (!response.ok || !response.body) {
|
||||
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
|
||||
}
|
||||
|
||||
this.isConnected = true;
|
||||
this.emit('connected');
|
||||
logger.info({ endpoint: this.config.endpoint }, 'Connected via Braid-HTTP');
|
||||
|
||||
// Read the subscription stream
|
||||
await this.readStream(response.body);
|
||||
} catch (error: any) {
|
||||
if (error.name !== 'AbortError') {
|
||||
logger.error({ error: error.message }, 'Connection failed');
|
||||
this.emit('error', error);
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
} finally {
|
||||
this.isConnected = false;
|
||||
this.emit('disconnected');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from the subscription.
|
||||
*/
|
||||
disconnect(): void {
|
||||
if (this.reconnectTimer) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
}
|
||||
|
||||
if (this.controller) {
|
||||
this.controller.abort();
|
||||
this.controller = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Watch an address for balance changes.
|
||||
* When the state updates, if this address's balance changed, emit an event.
|
||||
*/
|
||||
watchAddress(address: string): void {
|
||||
this.watchedAddresses.add(address.toLowerCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop watching an address.
|
||||
*/
|
||||
unwatchAddress(address: string): void {
|
||||
this.watchedAddresses.delete(address.toLowerCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current state (locally cached).
|
||||
*/
|
||||
getState(): any {
|
||||
return this.currentState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a specific balance from the cached state.
|
||||
*/
|
||||
getBalance(address: string, token?: string): string {
|
||||
if (!this.currentState?.wallets) return '0';
|
||||
|
||||
const wallet = this.currentState.wallets[address.toLowerCase()];
|
||||
if (!wallet?.balances) return '0';
|
||||
|
||||
if (token) {
|
||||
return wallet.balances[token.toLowerCase()]?.amount ?? '0';
|
||||
}
|
||||
|
||||
// Return first token balance (backward compat)
|
||||
const firstToken = Object.values(wallet.balances)[0] as any;
|
||||
return firstToken?.amount ?? '0';
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch a balance directly via HTTP (one-shot, no subscription).
|
||||
*/
|
||||
async fetchBalance(address: string): Promise<any> {
|
||||
const response = await fetch(
|
||||
`${this.config.endpoint}${this.config.balancePath}/${address}`
|
||||
);
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP ${response.status}`);
|
||||
}
|
||||
|
||||
return response.json();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the Braid subscription stream and apply updates.
|
||||
*/
|
||||
private async readStream(body: ReadableStream<Uint8Array>): Promise<void> {
|
||||
const reader = body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Parse Braid updates (delimited by double CRLF)
|
||||
const parts = buffer.split('\r\n\r\n');
|
||||
buffer = parts.pop() ?? '';
|
||||
|
||||
for (const part of parts) {
|
||||
if (!part.trim()) continue;
|
||||
this.applyUpdate(part);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a Braid update to local state.
|
||||
*
|
||||
* This is the core "simpleton" logic — we don't run any CRDT,
|
||||
* we just receive the server's resolved state.
|
||||
*/
|
||||
private applyUpdate(chunk: string): void {
|
||||
try {
|
||||
// Parse headers
|
||||
const lines = chunk.split('\r\n');
|
||||
let version: string | null = null;
|
||||
let contentType = 'application/json';
|
||||
let bodyStart = -1;
|
||||
|
||||
for (let i = 0; i < lines.length; i++) {
|
||||
const line = lines[i];
|
||||
|
||||
if (line === '') {
|
||||
bodyStart = i + 1;
|
||||
break;
|
||||
}
|
||||
|
||||
const colonIdx = line.indexOf(':');
|
||||
if (colonIdx === -1) continue;
|
||||
|
||||
const key = line.slice(0, colonIdx).trim().toLowerCase();
|
||||
const val = line.slice(colonIdx + 1).trim();
|
||||
|
||||
if (key === 'version') {
|
||||
version = val.replace(/"/g, '');
|
||||
} else if (key === 'content-type') {
|
||||
contentType = val;
|
||||
}
|
||||
}
|
||||
|
||||
if (bodyStart === -1) return;
|
||||
|
||||
const body = lines.slice(bodyStart).join('\r\n');
|
||||
if (!body) return;
|
||||
|
||||
// For JSON state updates, parse and diff
|
||||
if (contentType.includes('json')) {
|
||||
const newState = JSON.parse(body);
|
||||
const oldState = this.currentState;
|
||||
|
||||
this.currentState = newState;
|
||||
this.currentVersion = version;
|
||||
|
||||
this.emit('state:updated', newState);
|
||||
|
||||
// Check watched addresses for balance changes
|
||||
if (oldState && newState.wallets) {
|
||||
for (const address of this.watchedAddresses) {
|
||||
const oldWallet = oldState.wallets?.[address];
|
||||
const newWallet = newState.wallets?.[address];
|
||||
|
||||
const oldBalStr = JSON.stringify(oldWallet?.balances ?? {});
|
||||
const newBalStr = JSON.stringify(newWallet?.balances ?? {});
|
||||
|
||||
if (oldBalStr !== newBalStr && newWallet?.balances) {
|
||||
const balances: Record<string, string> = {};
|
||||
for (const [token, entry] of Object.entries(newWallet.balances)) {
|
||||
balances[token] = (entry as any).amount;
|
||||
}
|
||||
this.emit('balance:changed', address, balances);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug({ version }, 'Applied state update');
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ error }, 'Failed to apply Braid update');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule a reconnection attempt.
|
||||
*/
|
||||
private scheduleReconnect(): void {
|
||||
if (this.reconnectTimer) return;
|
||||
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.reconnectTimer = null;
|
||||
logger.info('Attempting reconnect...');
|
||||
this.connect();
|
||||
}, this.config.reconnectDelayMs);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,184 @@
|
|||
/**
|
||||
* Braid State Publisher
|
||||
*
|
||||
* Express middleware that serves the current CRDT state as a
|
||||
* Braid-HTTP resource. Simpleton clients subscribe here to receive
|
||||
* JSON state updates without needing any CRDT engine.
|
||||
*
|
||||
* This is the server-side counterpart to SimpletonClient.
|
||||
*/
|
||||
|
||||
import { Router, Request, Response } from 'express';
|
||||
import pino from 'pino';
|
||||
|
||||
const logger = pino({ name: 'braid-state-publisher' });
|
||||
|
||||
/**
|
||||
* Active subscriber connection
|
||||
*/
|
||||
interface StateSubscriber {
|
||||
id: string;
|
||||
res: Response;
|
||||
connectedAt: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configuration for the state publisher
|
||||
*/
|
||||
export interface StatePublisherConfig {
|
||||
/** Path to serve state on, default: /braid/state */
|
||||
path?: string;
|
||||
/** Throttle interval for state pushes (ms), default: 100 */
|
||||
throttleMs?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Braid State Publisher
|
||||
*
|
||||
* Serves the current document state as a subscribable Braid-HTTP resource.
|
||||
* Clients receive JSON snapshots, not Automerge binary — they never need
|
||||
* the CRDT engine.
|
||||
*/
|
||||
export class BraidStatePublisher {
|
||||
private config: Required<StatePublisherConfig>;
|
||||
private document: any; // ConsensusDocument
|
||||
private subscribers: Map<string, StateSubscriber> = new Map();
|
||||
private versionCounter = 0;
|
||||
private throttleTimer: NodeJS.Timeout | null = null;
|
||||
private subscriberIdCounter = 0;
|
||||
|
||||
constructor(document: any, config: StatePublisherConfig = {}) {
|
||||
this.document = document;
|
||||
this.config = {
|
||||
path: '/braid/state',
|
||||
throttleMs: 100,
|
||||
...config,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Express router for the state publisher.
|
||||
*/
|
||||
createRouter(): Router {
|
||||
const router = Router();
|
||||
|
||||
router.get(this.config.path, (req: Request, res: Response) => {
|
||||
this.handleRequest(req, res);
|
||||
});
|
||||
|
||||
return router;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a GET request, either one-shot or subscription.
|
||||
*/
|
||||
private handleRequest(req: Request, res: Response): void {
|
||||
const subscribe = req.headers['subscribe'];
|
||||
const state = this.document.getState();
|
||||
|
||||
if (subscribe === 'true' || subscribe === 'keep-alive') {
|
||||
// Braid subscription — keep connection open
|
||||
const id = `sub-${++this.subscriberIdCounter}`;
|
||||
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/plain',
|
||||
'Subscribe': 'keep-alive',
|
||||
'Cache-Control': 'no-cache',
|
||||
'Connection': 'keep-alive',
|
||||
'Transfer-Encoding': 'chunked',
|
||||
});
|
||||
|
||||
this.subscribers.set(id, {
|
||||
id,
|
||||
res,
|
||||
connectedAt: Date.now(),
|
||||
});
|
||||
|
||||
// Send current state as first update
|
||||
this.sendUpdate(id, state);
|
||||
|
||||
logger.info(
|
||||
{ subscriberId: id, totalSubscribers: this.subscribers.size },
|
||||
'New state subscriber'
|
||||
);
|
||||
|
||||
req.on('close', () => {
|
||||
this.subscribers.delete(id);
|
||||
logger.info(
|
||||
{ subscriberId: id, totalSubscribers: this.subscribers.size },
|
||||
'State subscriber disconnected'
|
||||
);
|
||||
});
|
||||
} else {
|
||||
// One-shot — return current state with version header
|
||||
res.set('Version', `"v${this.versionCounter}"`);
|
||||
res.set('Content-Type', 'application/json');
|
||||
res.json(state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify that state has changed — push to all subscribers.
|
||||
* Call this after any document change.
|
||||
*/
|
||||
notifyChange(): void {
|
||||
if (this.subscribers.size === 0) return;
|
||||
|
||||
// Throttle rapid changes
|
||||
if (this.throttleTimer) return;
|
||||
|
||||
this.throttleTimer = setTimeout(() => {
|
||||
this.throttleTimer = null;
|
||||
this.pushToAll();
|
||||
}, this.config.throttleMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Push current state to all subscribers.
|
||||
*/
|
||||
private pushToAll(): void {
|
||||
const state = this.document.getState();
|
||||
|
||||
for (const [id] of this.subscribers) {
|
||||
this.sendUpdate(id, state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a Braid update to a specific subscriber.
|
||||
*/
|
||||
private sendUpdate(subscriberId: string, state: any): void {
|
||||
const subscriber = this.subscribers.get(subscriberId);
|
||||
if (!subscriber) return;
|
||||
|
||||
const version = `v${++this.versionCounter}`;
|
||||
const parentVersion = `v${this.versionCounter - 1}`;
|
||||
const body = JSON.stringify(state);
|
||||
|
||||
try {
|
||||
const chunk = [
|
||||
'',
|
||||
`Version: "${version}"`,
|
||||
`Parents: "${parentVersion}"`,
|
||||
`Content-Type: application/json`,
|
||||
`Merge-Type: simpleton`,
|
||||
`Content-Length: ${Buffer.byteLength(body)}`,
|
||||
'',
|
||||
body,
|
||||
'',
|
||||
].join('\r\n');
|
||||
|
||||
subscriber.res.write(chunk);
|
||||
} catch (error) {
|
||||
logger.error({ subscriberId, error }, 'Failed to send update');
|
||||
this.subscribers.delete(subscriberId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get number of active subscribers.
|
||||
*/
|
||||
get subscriberCount(): number {
|
||||
return this.subscribers.size;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,282 @@
|
|||
/**
|
||||
* Braid State Sync
|
||||
*
|
||||
* Drop-in replacement for StateSync that uses Braid-HTTP subscriptions
|
||||
* instead of polling-based WebSocket sync.
|
||||
*
|
||||
* Key improvement: instead of periodic syncWithAllPeers() on a timer,
|
||||
* state changes are pushed to subscribers immediately via Braid-HTTP.
|
||||
* This reduces latency from syncIntervalMs (default 1s) to near-zero.
|
||||
*/
|
||||
|
||||
import * as Automerge from '@automerge/automerge';
|
||||
import pino from 'pino';
|
||||
import type { BraidPeerManager } from '../transport/braid-peer-manager.js';
|
||||
|
||||
const logger = pino({ name: 'braid-state-sync' });
|
||||
|
||||
/**
|
||||
* Configuration for BraidStateSync
|
||||
*/
|
||||
export interface BraidStateSyncConfig {
|
||||
/** Debounce interval for batching rapid changes into one push (ms) */
|
||||
pushDebounceMs?: number;
|
||||
}
|
||||
|
||||
const DEFAULT_CONFIG: Required<BraidStateSyncConfig> = {
|
||||
pushDebounceMs: 50,
|
||||
};
|
||||
|
||||
/**
|
||||
* Sync state tracked per peer
|
||||
*/
|
||||
interface PeerSyncState {
|
||||
nodeId: string;
|
||||
syncState: Automerge.SyncState;
|
||||
lastSyncAt: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Braid-HTTP State Synchronization
|
||||
*
|
||||
* Instead of polling, this listens for document changes and pushes
|
||||
* Automerge sync messages to subscribers via Braid-HTTP.
|
||||
*/
|
||||
export class BraidStateSync {
|
||||
private config: Required<BraidStateSyncConfig>;
|
||||
private document: any; // ConsensusDocument — typed as any to avoid circular dep
|
||||
private peerManager: BraidPeerManager;
|
||||
private peerSyncStates: Map<string, PeerSyncState> = new Map();
|
||||
private isRunning = false;
|
||||
private pushTimer: NodeJS.Timeout | null = null;
|
||||
private pendingPush = false;
|
||||
|
||||
constructor(
|
||||
document: any,
|
||||
peerManager: BraidPeerManager,
|
||||
config: BraidStateSyncConfig = {}
|
||||
) {
|
||||
this.document = document;
|
||||
this.peerManager = peerManager;
|
||||
this.config = { ...DEFAULT_CONFIG, ...config };
|
||||
|
||||
// Listen for peer events
|
||||
this.peerManager.on('peer:connected', (nodeId: string) => {
|
||||
this.initPeerSyncState(nodeId);
|
||||
// Push current state to new peer immediately
|
||||
this.syncWithPeer(nodeId);
|
||||
});
|
||||
|
||||
this.peerManager.on('peer:disconnected', (nodeId: string) => {
|
||||
this.peerSyncStates.delete(nodeId);
|
||||
});
|
||||
|
||||
// Listen for incoming sync messages (via Braid subscription)
|
||||
this.peerManager.on('message:sync_request', (nodeId: string) => {
|
||||
this.handleSyncRequest(nodeId);
|
||||
});
|
||||
|
||||
this.peerManager.on('message:sync_response', (nodeId: string, data: Uint8Array) => {
|
||||
this.handleSyncResponse(nodeId, data);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the sync manager.
|
||||
* Unlike the WebSocket version, we don't poll — we push on change.
|
||||
*/
|
||||
start(): void {
|
||||
if (this.isRunning) return;
|
||||
|
||||
logger.info('Starting Braid state sync');
|
||||
|
||||
// Initialize sync state for existing peers
|
||||
for (const nodeId of this.peerManager.connectedPeerIds) {
|
||||
this.initPeerSyncState(nodeId);
|
||||
}
|
||||
|
||||
this.isRunning = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the sync manager.
|
||||
*/
|
||||
stop(): void {
|
||||
if (!this.isRunning) return;
|
||||
|
||||
logger.info('Stopping Braid state sync');
|
||||
|
||||
if (this.pushTimer) {
|
||||
clearTimeout(this.pushTimer);
|
||||
this.pushTimer = null;
|
||||
}
|
||||
|
||||
this.peerSyncStates.clear();
|
||||
this.isRunning = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify that the document has changed.
|
||||
* Call this after any Automerge.change() to trigger a push to subscribers.
|
||||
* This is the key API difference from the polling-based StateSync.
|
||||
*/
|
||||
notifyChange(): void {
|
||||
if (!this.isRunning) return;
|
||||
|
||||
// Debounce rapid changes
|
||||
if (this.pushTimer) return;
|
||||
|
||||
this.pushTimer = setTimeout(() => {
|
||||
this.pushTimer = null;
|
||||
this.pushToAllPeers();
|
||||
}, this.config.pushDebounceMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Push sync messages to all connected peers via Braid-HTTP.
|
||||
*/
|
||||
private pushToAllPeers(): void {
|
||||
for (const nodeId of this.peerManager.connectedPeerIds) {
|
||||
this.syncWithPeer(nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize sync state for a peer.
|
||||
*/
|
||||
private initPeerSyncState(nodeId: string): void {
|
||||
if (this.peerSyncStates.has(nodeId)) return;
|
||||
|
||||
this.peerSyncStates.set(nodeId, {
|
||||
nodeId,
|
||||
syncState: Automerge.initSyncState(),
|
||||
lastSyncAt: 0,
|
||||
});
|
||||
|
||||
logger.debug({ nodeId }, 'Initialized sync state for peer');
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate and push sync message to a specific peer.
|
||||
* Uses Braid-HTTP push instead of WebSocket send.
|
||||
*/
|
||||
private async syncWithPeer(nodeId: string): Promise<void> {
|
||||
const peerState = this.peerSyncStates.get(nodeId);
|
||||
if (!peerState) return;
|
||||
|
||||
try {
|
||||
const doc = this.document['doc'];
|
||||
const [newSyncState, syncMessage] = Automerge.generateSyncMessage(
|
||||
doc,
|
||||
peerState.syncState
|
||||
);
|
||||
|
||||
peerState.syncState = newSyncState;
|
||||
|
||||
if (syncMessage) {
|
||||
// Push via Braid-HTTP to the subscriber
|
||||
this.peerManager.pushUpdate(syncMessage, 'automerge');
|
||||
|
||||
// Also send as a direct message for peers that aren't subscribed
|
||||
try {
|
||||
await this.peerManager.sendTo(nodeId, {
|
||||
type: 'SYNC_RESPONSE',
|
||||
nodeId: this.peerManager.connectedPeerIds[0] ?? '', // our nodeId
|
||||
timestamp: Date.now(),
|
||||
payload: {
|
||||
data: Buffer.from(syncMessage).toString('base64'),
|
||||
},
|
||||
});
|
||||
} catch {
|
||||
// Peer might not be reachable via direct send — that's fine,
|
||||
// the Braid push covers subscribed peers
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
{ nodeId, messageSize: syncMessage.length },
|
||||
'Pushed sync via Braid'
|
||||
);
|
||||
}
|
||||
|
||||
peerState.lastSyncAt = Date.now();
|
||||
} catch (error) {
|
||||
logger.error({ nodeId, error }, 'Sync with peer failed');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle sync request from peer.
|
||||
*/
|
||||
private async handleSyncRequest(nodeId: string): Promise<void> {
|
||||
if (!this.peerSyncStates.has(nodeId)) {
|
||||
this.initPeerSyncState(nodeId);
|
||||
}
|
||||
|
||||
await this.syncWithPeer(nodeId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle sync response from peer (received via Braid subscription).
|
||||
*/
|
||||
private handleSyncResponse(nodeId: string, data: Uint8Array): void {
|
||||
const peerState = this.peerSyncStates.get(nodeId);
|
||||
if (!peerState) {
|
||||
this.initPeerSyncState(nodeId);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const doc = this.document['doc'];
|
||||
const [newDoc, newSyncState] = Automerge.receiveSyncMessage(
|
||||
doc,
|
||||
peerState.syncState,
|
||||
data
|
||||
);
|
||||
|
||||
this.document['doc'] = newDoc;
|
||||
peerState.syncState = newSyncState;
|
||||
|
||||
logger.debug({ nodeId }, 'Applied Braid sync message from peer');
|
||||
|
||||
// Check if we need to send more
|
||||
const [, nextMessage] = Automerge.generateSyncMessage(
|
||||
newDoc,
|
||||
newSyncState
|
||||
);
|
||||
|
||||
if (nextMessage) {
|
||||
this.syncWithPeer(nodeId);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ nodeId, error }, 'Failed to apply sync message');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force full sync with a peer (for recovery).
|
||||
*/
|
||||
async forceSync(nodeId: string): Promise<void> {
|
||||
this.peerSyncStates.set(nodeId, {
|
||||
nodeId,
|
||||
syncState: Automerge.initSyncState(),
|
||||
lastSyncAt: 0,
|
||||
});
|
||||
|
||||
await this.syncWithPeer(nodeId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sync status for all peers.
|
||||
*/
|
||||
getSyncStatus(): Array<{
|
||||
nodeId: string;
|
||||
lastSyncAt: number;
|
||||
isConnected: boolean;
|
||||
}> {
|
||||
return Array.from(this.peerSyncStates.values()).map((state) => ({
|
||||
nodeId: state.nodeId,
|
||||
lastSyncAt: state.lastSyncAt,
|
||||
isConnected: this.peerManager.connectedPeerIds.includes(state.nodeId),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,659 @@
|
|||
/**
|
||||
* Braid-HTTP Peer Manager
|
||||
*
|
||||
* Drop-in replacement for the WebSocket-based PeerManager.
|
||||
* Uses Braid-HTTP subscriptions for state sync and standard HTTP POST
|
||||
* for consensus messages (PROPOSE/VOTE/COMMIT).
|
||||
*
|
||||
* Key differences from WebSocket approach:
|
||||
* - Sync uses HTTP GET with Subscribe header (long-lived connections)
|
||||
* - Consensus messages use HTTP POST (request/response)
|
||||
* - Works through reverse proxies, CDNs, and load balancers
|
||||
* - No WebSocket server needed — reuses Express HTTP server
|
||||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import { Router, Request, Response } from 'express';
|
||||
import pino from 'pino';
|
||||
import type {
|
||||
BraidPeerManagerConfig,
|
||||
BraidPeerInfo,
|
||||
ConsensusMessageEnvelope,
|
||||
BraidVersion,
|
||||
} from '../types/index.js';
|
||||
|
||||
const logger = pino({ name: 'braid-peer-manager' });
|
||||
|
||||
/**
|
||||
* Events emitted by BraidPeerManager — matches PeerManager's interface
|
||||
* so ConsensusService can wire up identically.
|
||||
*/
|
||||
export interface BraidPeerManagerEvents {
|
||||
'peer:connected': (nodeId: string) => void;
|
||||
'peer:disconnected': (nodeId: string) => void;
|
||||
'message:propose': (message: any) => void;
|
||||
'message:vote': (message: any) => void;
|
||||
'message:commit': (message: any) => void;
|
||||
'message:sync_request': (nodeId: string) => void;
|
||||
'message:sync_response': (nodeId: string, data: Uint8Array) => void;
|
||||
}
|
||||
|
||||
export declare interface BraidPeerManager {
|
||||
on<E extends keyof BraidPeerManagerEvents>(
|
||||
event: E,
|
||||
listener: BraidPeerManagerEvents[E]
|
||||
): this;
|
||||
emit<E extends keyof BraidPeerManagerEvents>(
|
||||
event: E,
|
||||
...args: Parameters<BraidPeerManagerEvents[E]>
|
||||
): boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracks an active Braid subscription from a remote peer.
|
||||
*/
|
||||
interface BraidSubscriber {
|
||||
nodeId: string;
|
||||
res: Response;
|
||||
connectedAt: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracks an outbound subscription to a remote peer.
|
||||
*/
|
||||
interface OutboundSubscription {
|
||||
nodeId: string;
|
||||
controller: AbortController;
|
||||
endpoint: string;
|
||||
connectedAt: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Peer connection adapter — implements the same PeerConnection interface
|
||||
* as the WebSocket version so ConsensusNode can use it unchanged.
|
||||
*/
|
||||
class BraidPeer {
|
||||
constructor(
|
||||
public readonly nodeId: string,
|
||||
private endpoint: string
|
||||
) {}
|
||||
|
||||
get isAlive(): boolean {
|
||||
return true; // HTTP connections are stateless — liveness tracked via heartbeat
|
||||
}
|
||||
|
||||
async send(message: unknown): Promise<void> {
|
||||
const response = await fetch(`${this.endpoint}/braid/consensus`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(message),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP ${response.status} sending to ${this.nodeId}`);
|
||||
}
|
||||
}
|
||||
|
||||
close(): void {
|
||||
// No persistent connection to close in HTTP mode
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Braid-HTTP Peer Manager
|
||||
*
|
||||
* Provides the same EventEmitter interface as the WebSocket PeerManager
|
||||
* but uses Braid-HTTP for transport.
|
||||
*/
|
||||
export class BraidPeerManager extends EventEmitter {
|
||||
private config: Required<BraidPeerManagerConfig>;
|
||||
private peers: Map<string, BraidPeer> = new Map();
|
||||
private knownPeers: Map<string, BraidPeerInfo> = new Map();
|
||||
|
||||
// Inbound Braid subscriptions (peers subscribing to our state)
|
||||
private subscribers: Map<string, BraidSubscriber> = new Map();
|
||||
|
||||
// Outbound subscriptions (us subscribing to peer state)
|
||||
private outboundSubs: Map<string, OutboundSubscription> = new Map();
|
||||
|
||||
// Current version counter for Braid versioning
|
||||
private versionCounter = 0;
|
||||
|
||||
// Heartbeat/discovery (still uses Redis for peer discovery)
|
||||
private heartbeatTimer: NodeJS.Timeout | null = null;
|
||||
private discoveryTimer: NodeJS.Timeout | null = null;
|
||||
private isRunning = false;
|
||||
|
||||
// Redis clients (lazy — injected via setRedis)
|
||||
private redis: any = null;
|
||||
|
||||
constructor(config: BraidPeerManagerConfig) {
|
||||
super();
|
||||
|
||||
this.config = {
|
||||
syncPath: '/braid/sync',
|
||||
consensusPath: '/braid/consensus',
|
||||
heartbeatIntervalMs: 5000,
|
||||
peerTimeoutMs: 15000,
|
||||
reconnectIntervalMs: 3000,
|
||||
...config,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Inject Redis clients for peer discovery.
|
||||
* Call this before start() if using Redis-based discovery.
|
||||
*/
|
||||
setRedis(redis: any, redisSub: any): void {
|
||||
this.redis = redis;
|
||||
// Note: redisSub for pub/sub not needed in HTTP mode —
|
||||
// we discover peers via polling Redis hash
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Express router with Braid-HTTP endpoints.
|
||||
* Mount this on the consensus service's Express app.
|
||||
*/
|
||||
createRouter(): Router {
|
||||
const router = Router();
|
||||
|
||||
// Braid-HTTP sync endpoint — peers subscribe here for state updates
|
||||
router.get(this.config.syncPath, (req: Request, res: Response) => {
|
||||
this.handleSyncSubscribe(req, res);
|
||||
});
|
||||
|
||||
// Consensus message endpoint — receives PROPOSE/VOTE/COMMIT via POST
|
||||
router.post(this.config.consensusPath, (req: Request, res: Response) => {
|
||||
this.handleConsensusMessage(req, res);
|
||||
});
|
||||
|
||||
// Peer info endpoint — returns this node's identity
|
||||
router.get('/braid/peer-info', (_req: Request, res: Response) => {
|
||||
res.json({
|
||||
nodeId: this.config.nodeId,
|
||||
publicKey: this.config.publicKey,
|
||||
endpoint: `http://${this.config.host}:${this.config.port}`,
|
||||
lastSeen: Date.now(),
|
||||
});
|
||||
});
|
||||
|
||||
return router;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming Braid subscription (GET with Subscribe header).
|
||||
*
|
||||
* This implements the core Braid-HTTP pattern:
|
||||
* - Client sends GET with `Subscribe: true`
|
||||
* - Server responds with current version and keeps connection open
|
||||
* - Server pushes updates as multipart patches
|
||||
*/
|
||||
private handleSyncSubscribe(req: Request, res: Response): void {
|
||||
const subscribe = req.headers['subscribe'];
|
||||
const nodeId = req.headers['x-node-id'] as string;
|
||||
|
||||
if (!nodeId) {
|
||||
res.status(400).json({ error: 'Missing X-Node-Id header' });
|
||||
return;
|
||||
}
|
||||
|
||||
if (subscribe === 'true' || subscribe === 'keep-alive') {
|
||||
// Long-lived Braid subscription
|
||||
logger.info({ nodeId }, 'Peer subscribing via Braid-HTTP');
|
||||
|
||||
// Set Braid-HTTP response headers
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/plain',
|
||||
'Subscribe': 'keep-alive',
|
||||
'Cache-Control': 'no-cache',
|
||||
'Connection': 'keep-alive',
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'X-Node-Id': this.config.nodeId,
|
||||
});
|
||||
|
||||
// Track subscriber
|
||||
this.subscribers.set(nodeId, {
|
||||
nodeId,
|
||||
res,
|
||||
connectedAt: Date.now(),
|
||||
});
|
||||
|
||||
// Register as connected peer
|
||||
if (!this.peers.has(nodeId)) {
|
||||
const peerInfo = this.knownPeers.get(nodeId);
|
||||
if (peerInfo) {
|
||||
this.peers.set(nodeId, new BraidPeer(nodeId, peerInfo.endpoint));
|
||||
this.emit('peer:connected', nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger initial sync
|
||||
this.emit('message:sync_request', nodeId);
|
||||
|
||||
// Handle disconnect
|
||||
req.on('close', () => {
|
||||
logger.info({ nodeId }, 'Braid subscriber disconnected');
|
||||
this.subscribers.delete(nodeId);
|
||||
this.peers.delete(nodeId);
|
||||
this.emit('peer:disconnected', nodeId);
|
||||
});
|
||||
} else {
|
||||
// One-shot GET — return current version
|
||||
res.json({
|
||||
version: this.currentVersion,
|
||||
nodeId: this.config.nodeId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming consensus message via HTTP POST.
|
||||
*/
|
||||
private handleConsensusMessage(req: Request, res: Response): void {
|
||||
try {
|
||||
const message = req.body as ConsensusMessageEnvelope;
|
||||
|
||||
if (!message.type || !message.nodeId) {
|
||||
res.status(400).json({ error: 'Invalid message format' });
|
||||
return;
|
||||
}
|
||||
|
||||
// Route to appropriate handler (same as WebSocket version)
|
||||
switch (message.type) {
|
||||
case 'PROPOSE':
|
||||
this.emit('message:propose', message);
|
||||
break;
|
||||
case 'VOTE':
|
||||
this.emit('message:vote', message);
|
||||
break;
|
||||
case 'COMMIT':
|
||||
this.emit('message:commit', message);
|
||||
break;
|
||||
case 'SYNC_REQUEST':
|
||||
this.emit('message:sync_request', message.nodeId);
|
||||
break;
|
||||
case 'SYNC_RESPONSE': {
|
||||
const syncData = (message.payload as { data: string }).data;
|
||||
this.emit(
|
||||
'message:sync_response',
|
||||
message.nodeId,
|
||||
Buffer.from(syncData, 'base64')
|
||||
);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
logger.warn({ type: message.type }, 'Unknown message type');
|
||||
}
|
||||
|
||||
res.status(200).json({ ok: true });
|
||||
} catch (error) {
|
||||
logger.error({ error }, 'Failed to handle consensus message');
|
||||
res.status(500).json({ error: 'Internal error' });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a Braid update to all subscribers.
|
||||
*
|
||||
* Format follows Braid-HTTP spec:
|
||||
* ```
|
||||
* Version: "v123"
|
||||
* Parents: "v122"
|
||||
* Content-Type: application/octet-stream
|
||||
* Merge-Type: automerge
|
||||
*
|
||||
* <binary patch data>
|
||||
* ```
|
||||
*/
|
||||
pushUpdate(data: Uint8Array, mergeType = 'automerge'): void {
|
||||
const version = this.nextVersion();
|
||||
const parentVersion = `v${this.versionCounter - 1}`;
|
||||
|
||||
for (const [nodeId, subscriber] of this.subscribers) {
|
||||
try {
|
||||
// Write Braid update as a multipart chunk
|
||||
const header = [
|
||||
'',
|
||||
`Version: "${version}"`,
|
||||
`Parents: "${parentVersion}"`,
|
||||
`Content-Type: application/octet-stream`,
|
||||
`Merge-Type: ${mergeType}`,
|
||||
`Content-Length: ${data.length}`,
|
||||
'',
|
||||
].join('\r\n');
|
||||
|
||||
subscriber.res.write(header);
|
||||
subscriber.res.write(data);
|
||||
subscriber.res.write('\r\n\r\n');
|
||||
|
||||
logger.debug(
|
||||
{ nodeId, version, size: data.length },
|
||||
'Pushed Braid update'
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error({ nodeId, error }, 'Failed to push update to subscriber');
|
||||
this.subscribers.delete(nodeId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the peer manager.
|
||||
* Unlike WebSocket version, does NOT start its own server —
|
||||
* the Express router handles incoming connections.
|
||||
*/
|
||||
async start(): Promise<void> {
|
||||
if (this.isRunning) return;
|
||||
|
||||
logger.info(
|
||||
{ nodeId: this.config.nodeId, port: this.config.port },
|
||||
'Starting Braid peer manager'
|
||||
);
|
||||
|
||||
// Register ourselves in Redis
|
||||
if (this.redis) {
|
||||
await this.registerSelf();
|
||||
this.startHeartbeat();
|
||||
this.startDiscovery();
|
||||
}
|
||||
|
||||
this.isRunning = true;
|
||||
|
||||
logger.info(
|
||||
{ nodeId: this.config.nodeId },
|
||||
'Braid peer manager started (mount router on Express app)'
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the peer manager.
|
||||
*/
|
||||
async stop(): Promise<void> {
|
||||
if (!this.isRunning) return;
|
||||
|
||||
logger.info({ nodeId: this.config.nodeId }, 'Stopping Braid peer manager');
|
||||
|
||||
if (this.heartbeatTimer) {
|
||||
clearInterval(this.heartbeatTimer);
|
||||
this.heartbeatTimer = null;
|
||||
}
|
||||
|
||||
if (this.discoveryTimer) {
|
||||
clearInterval(this.discoveryTimer);
|
||||
this.discoveryTimer = null;
|
||||
}
|
||||
|
||||
// Abort outbound subscriptions
|
||||
for (const sub of this.outboundSubs.values()) {
|
||||
sub.controller.abort();
|
||||
}
|
||||
this.outboundSubs.clear();
|
||||
|
||||
// Close inbound subscriber connections
|
||||
for (const sub of this.subscribers.values()) {
|
||||
try { sub.res.end(); } catch {}
|
||||
}
|
||||
this.subscribers.clear();
|
||||
|
||||
this.peers.clear();
|
||||
|
||||
if (this.redis) {
|
||||
await this.redis.hdel('consensus:peers', this.config.nodeId);
|
||||
}
|
||||
|
||||
this.isRunning = false;
|
||||
logger.info({ nodeId: this.config.nodeId }, 'Braid peer manager stopped');
|
||||
}
|
||||
|
||||
/**
|
||||
* Sign and broadcast a consensus message to all connected peers.
|
||||
* Uses HTTP POST instead of WebSocket send.
|
||||
*/
|
||||
async broadcast(message: Omit<ConsensusMessageEnvelope, 'signature'>): Promise<void> {
|
||||
const signed = this.signMessage(message);
|
||||
|
||||
const sendPromises = Array.from(this.peers.values()).map(async (peer) => {
|
||||
try {
|
||||
await peer.send(signed);
|
||||
} catch (error) {
|
||||
logger.error({ nodeId: peer.nodeId, error }, 'Failed to send to peer');
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.all(sendPromises);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a specific peer via HTTP POST.
|
||||
*/
|
||||
async sendTo(nodeId: string, message: Omit<ConsensusMessageEnvelope, 'signature'>): Promise<void> {
|
||||
const peer = this.peers.get(nodeId);
|
||||
if (!peer) {
|
||||
throw new Error(`Peer ${nodeId} not connected`);
|
||||
}
|
||||
|
||||
const signed = this.signMessage(message);
|
||||
await peer.send(signed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to a remote peer's state via Braid-HTTP.
|
||||
* Opens a long-lived GET request with Subscribe: true.
|
||||
*/
|
||||
async subscribeToPeer(peerInfo: BraidPeerInfo): Promise<void> {
|
||||
if (this.outboundSubs.has(peerInfo.nodeId)) return;
|
||||
|
||||
const controller = new AbortController();
|
||||
const endpoint = peerInfo.endpoint;
|
||||
|
||||
this.outboundSubs.set(peerInfo.nodeId, {
|
||||
nodeId: peerInfo.nodeId,
|
||||
controller,
|
||||
endpoint,
|
||||
connectedAt: Date.now(),
|
||||
});
|
||||
|
||||
try {
|
||||
const response = await fetch(`${endpoint}${this.config.syncPath}`, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Subscribe': 'keep-alive',
|
||||
'X-Node-Id': this.config.nodeId,
|
||||
'Accept': 'application/octet-stream',
|
||||
},
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!response.ok || !response.body) {
|
||||
throw new Error(`HTTP ${response.status}`);
|
||||
}
|
||||
|
||||
// Register peer
|
||||
if (!this.peers.has(peerInfo.nodeId)) {
|
||||
this.peers.set(peerInfo.nodeId, new BraidPeer(peerInfo.nodeId, endpoint));
|
||||
this.emit('peer:connected', peerInfo.nodeId);
|
||||
}
|
||||
|
||||
logger.info({ nodeId: peerInfo.nodeId }, 'Subscribed to peer via Braid-HTTP');
|
||||
|
||||
// Read the stream
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Parse Braid updates from the stream
|
||||
// Each update is delimited by double CRLF
|
||||
const updates = buffer.split('\r\n\r\n');
|
||||
buffer = updates.pop() ?? ''; // Keep incomplete chunk
|
||||
|
||||
for (const chunk of updates) {
|
||||
if (!chunk.trim()) continue;
|
||||
this.handleBraidUpdate(peerInfo.nodeId, chunk);
|
||||
}
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (error.name !== 'AbortError') {
|
||||
logger.error(
|
||||
{ nodeId: peerInfo.nodeId, error: error.message },
|
||||
'Braid subscription failed'
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
this.outboundSubs.delete(peerInfo.nodeId);
|
||||
if (this.peers.has(peerInfo.nodeId)) {
|
||||
this.peers.delete(peerInfo.nodeId);
|
||||
this.emit('peer:disconnected', peerInfo.nodeId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse and handle an incoming Braid update from a subscription stream.
|
||||
*/
|
||||
private handleBraidUpdate(nodeId: string, chunk: string): void {
|
||||
try {
|
||||
// Extract headers and body from the Braid update
|
||||
const headerEnd = chunk.indexOf('\r\n\r\n');
|
||||
if (headerEnd === -1) {
|
||||
// Body might be right after headers with a single newline separation
|
||||
const lines = chunk.split('\r\n');
|
||||
const bodyStartIdx = lines.findIndex(l => l === '');
|
||||
if (bodyStartIdx === -1) return;
|
||||
|
||||
// Everything after the empty line is body
|
||||
const bodyLines = lines.slice(bodyStartIdx + 1);
|
||||
const body = bodyLines.join('\r\n');
|
||||
if (body.length > 0) {
|
||||
const data = new Uint8Array(Buffer.from(body, 'binary'));
|
||||
this.emit('message:sync_response', nodeId, data);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const body = chunk.slice(headerEnd + 4);
|
||||
if (body.length > 0) {
|
||||
const data = new Uint8Array(Buffer.from(body, 'binary'));
|
||||
this.emit('message:sync_response', nodeId, data);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ nodeId, error }, 'Failed to parse Braid update');
|
||||
}
|
||||
}
|
||||
|
||||
// --- Redis-based peer discovery (reused from WebSocket version) ---
|
||||
|
||||
private async registerSelf(): Promise<void> {
|
||||
const peerInfo: BraidPeerInfo = {
|
||||
nodeId: this.config.nodeId,
|
||||
publicKey: this.config.publicKey,
|
||||
endpoint: `http://${this.config.host}:${this.config.port}`,
|
||||
lastSeen: Date.now(),
|
||||
};
|
||||
|
||||
await this.redis.hset(
|
||||
'consensus:peers',
|
||||
this.config.nodeId,
|
||||
JSON.stringify(peerInfo)
|
||||
);
|
||||
|
||||
logger.debug({ nodeId: this.config.nodeId }, 'Registered in Redis');
|
||||
}
|
||||
|
||||
private startHeartbeat(): void {
|
||||
this.heartbeatTimer = setInterval(async () => {
|
||||
await this.registerSelf();
|
||||
|
||||
// Check for dead outbound subscriptions and reconnect
|
||||
for (const [nodeId, sub] of this.outboundSubs) {
|
||||
// If the connection seems dead, abort and let discovery reconnect
|
||||
const age = Date.now() - sub.connectedAt;
|
||||
if (age > this.config.peerTimeoutMs * 3) {
|
||||
logger.debug({ nodeId }, 'Stale outbound subscription, will reconnect');
|
||||
}
|
||||
}
|
||||
}, this.config.heartbeatIntervalMs);
|
||||
}
|
||||
|
||||
private startDiscovery(): void {
|
||||
// Initial discovery
|
||||
this.discoverPeers();
|
||||
|
||||
this.discoveryTimer = setInterval(() => {
|
||||
this.discoverPeers();
|
||||
}, this.config.reconnectIntervalMs);
|
||||
}
|
||||
|
||||
private async discoverPeers(): Promise<void> {
|
||||
if (!this.redis) return;
|
||||
|
||||
try {
|
||||
const peerData = await this.redis.hgetall('consensus:peers');
|
||||
|
||||
for (const [nodeId, infoStr] of Object.entries(peerData as Record<string, string>)) {
|
||||
if (nodeId === this.config.nodeId) continue;
|
||||
|
||||
const peerInfo = JSON.parse(infoStr) as BraidPeerInfo;
|
||||
|
||||
// Check if peer is stale
|
||||
if (Date.now() - peerInfo.lastSeen > this.config.peerTimeoutMs) {
|
||||
await this.redis.hdel('consensus:peers', nodeId);
|
||||
continue;
|
||||
}
|
||||
|
||||
this.knownPeers.set(nodeId, peerInfo);
|
||||
|
||||
// Subscribe to peer if not already subscribed
|
||||
if (!this.outboundSubs.has(nodeId) && !this.peers.has(nodeId)) {
|
||||
// Deterministic tie-breaker: lower nodeId initiates subscription
|
||||
if (this.config.nodeId < nodeId) {
|
||||
this.subscribeToPeer(peerInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ error }, 'Peer discovery failed');
|
||||
}
|
||||
}
|
||||
|
||||
// --- Helpers ---
|
||||
|
||||
private signMessage(message: Omit<ConsensusMessageEnvelope, 'signature'>): ConsensusMessageEnvelope {
|
||||
// Signing is delegated to the caller (ConsensusService) in the existing
|
||||
// architecture. Here we just pass through — the message is already structured
|
||||
// the same way as the WebSocket version.
|
||||
return message as ConsensusMessageEnvelope;
|
||||
}
|
||||
|
||||
private nextVersion(): BraidVersion {
|
||||
this.versionCounter++;
|
||||
return `v${this.versionCounter}`;
|
||||
}
|
||||
|
||||
private get currentVersion(): BraidVersion {
|
||||
return `v${this.versionCounter}`;
|
||||
}
|
||||
|
||||
// --- Public accessors (matches PeerManager interface) ---
|
||||
|
||||
get connectedPeers(): number {
|
||||
return this.peers.size;
|
||||
}
|
||||
|
||||
get connectedPeerIds(): string[] {
|
||||
return Array.from(this.peers.keys());
|
||||
}
|
||||
|
||||
getPeer(nodeId: string): BraidPeer | undefined {
|
||||
return this.peers.get(nodeId);
|
||||
}
|
||||
|
||||
getPeerInfo(nodeId: string): BraidPeerInfo | undefined {
|
||||
return this.knownPeers.get(nodeId);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Braid Transport Types
|
||||
*
|
||||
* Shared types for the Braid-HTTP transport adapter.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Braid version identifier — opaque string, typically a hash or timestamp.
|
||||
*/
|
||||
export type BraidVersion = string;
|
||||
|
||||
/**
|
||||
* Braid subscription update received from a peer.
|
||||
*/
|
||||
export interface BraidUpdate {
|
||||
version: BraidVersion;
|
||||
parents: BraidVersion[];
|
||||
body: Uint8Array;
|
||||
/** Content-Type of the patch body */
|
||||
contentType: string;
|
||||
/** Merge-Type header, e.g. "automerge" */
|
||||
mergeType?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Peer registration info for Braid-HTTP discovery.
|
||||
*/
|
||||
export interface BraidPeerInfo {
|
||||
nodeId: string;
|
||||
publicKey: string;
|
||||
endpoint: string; // HTTP base URL, e.g. http://node-2:3007
|
||||
lastSeen: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configuration for BraidPeerManager.
|
||||
*/
|
||||
export interface BraidPeerManagerConfig {
|
||||
nodeId: string;
|
||||
publicKey: string;
|
||||
privateKey: Uint8Array;
|
||||
/** HTTP port this node listens on (same as consensus API port) */
|
||||
port: number;
|
||||
/** Hostname/IP for peer discovery */
|
||||
host: string;
|
||||
/** Redis URL for peer discovery */
|
||||
redisUrl: string;
|
||||
/** Braid subscription path on each peer, default: /braid/sync */
|
||||
syncPath?: string;
|
||||
/** Braid consensus message path, default: /braid/consensus */
|
||||
consensusPath?: string;
|
||||
/** Heartbeat interval in ms, default: 5000 */
|
||||
heartbeatIntervalMs?: number;
|
||||
/** Peer timeout in ms, default: 15000 */
|
||||
peerTimeoutMs?: number;
|
||||
/** Reconnect interval in ms, default: 3000 */
|
||||
reconnectIntervalMs?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* A consensus message envelope — same shape as the existing WebSocket protocol
|
||||
* so it's a drop-in replacement at the transport layer.
|
||||
*/
|
||||
export interface ConsensusMessageEnvelope {
|
||||
type: 'PROPOSE' | 'VOTE' | 'COMMIT' | 'SYNC_REQUEST' | 'SYNC_RESPONSE' | 'HEARTBEAT';
|
||||
nodeId: string;
|
||||
timestamp: number;
|
||||
payload: unknown;
|
||||
signature?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for Braid subscription.
|
||||
*/
|
||||
export interface BraidSubscribeOptions {
|
||||
/** Subscribe to updates (long-lived connection) */
|
||||
subscribe?: boolean;
|
||||
/** Specific version to request */
|
||||
version?: BraidVersion;
|
||||
/** Known parent versions (for sync) */
|
||||
parents?: BraidVersion[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for the Simpleton light client.
|
||||
*/
|
||||
export interface SimpletonClientConfig {
|
||||
/** Base URL of the consensus node to connect to */
|
||||
endpoint: string;
|
||||
/** Path for state subscription, default: /braid/state */
|
||||
statePath?: string;
|
||||
/** Path for balance queries, default: /balance */
|
||||
balancePath?: string;
|
||||
/** Reconnect delay in ms, default: 2000 */
|
||||
reconnectDelayMs?: number;
|
||||
}
|
||||
|
|
@ -0,0 +1,161 @@
|
|||
/**
|
||||
* Tests for BraidPeerManager
|
||||
*
|
||||
* Verifies that the Braid-HTTP transport layer correctly replaces
|
||||
* the WebSocket PeerManager while maintaining the same event interface.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||
import express from 'express';
|
||||
import { BraidPeerManager } from '../src/transport/braid-peer-manager.js';
|
||||
|
||||
describe('BraidPeerManager', () => {
|
||||
let manager: BraidPeerManager;
|
||||
|
||||
beforeEach(() => {
|
||||
manager = new BraidPeerManager({
|
||||
nodeId: 'test-node-1',
|
||||
publicKey: 'abc123',
|
||||
privateKey: new Uint8Array(32),
|
||||
host: 'localhost',
|
||||
port: 3007,
|
||||
redisUrl: 'redis://localhost:6379',
|
||||
});
|
||||
});
|
||||
|
||||
describe('interface compatibility', () => {
|
||||
it('should have same public accessors as WebSocket PeerManager', () => {
|
||||
expect(manager.connectedPeers).toBe(0);
|
||||
expect(manager.connectedPeerIds).toEqual([]);
|
||||
expect(manager.getPeer('unknown')).toBeUndefined();
|
||||
expect(manager.getPeerInfo('unknown')).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should emit peer:connected and peer:disconnected events', () => {
|
||||
const connected = vi.fn();
|
||||
const disconnected = vi.fn();
|
||||
|
||||
manager.on('peer:connected', connected);
|
||||
manager.on('peer:disconnected', disconnected);
|
||||
|
||||
// Events should be callable (they'll fire when peers connect via HTTP)
|
||||
expect(connected).not.toHaveBeenCalled();
|
||||
expect(disconnected).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should emit consensus message events', () => {
|
||||
const propose = vi.fn();
|
||||
const vote = vi.fn();
|
||||
const commit = vi.fn();
|
||||
|
||||
manager.on('message:propose', propose);
|
||||
manager.on('message:vote', vote);
|
||||
manager.on('message:commit', commit);
|
||||
|
||||
expect(propose).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('router creation', () => {
|
||||
it('should create an Express router with Braid endpoints', () => {
|
||||
const router = manager.createRouter();
|
||||
expect(router).toBeDefined();
|
||||
|
||||
// Verify it can be mounted on an Express app
|
||||
const app = express();
|
||||
app.use(router);
|
||||
});
|
||||
});
|
||||
|
||||
describe('message handling', () => {
|
||||
it('should handle consensus messages via HTTP POST', async () => {
|
||||
const proposeFn = vi.fn();
|
||||
manager.on('message:propose', proposeFn);
|
||||
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
app.use(manager.createRouter());
|
||||
|
||||
// Simulate a POST to /braid/consensus
|
||||
const server = app.listen(0);
|
||||
const addr = server.address() as any;
|
||||
|
||||
try {
|
||||
const response = await fetch(`http://localhost:${addr.port}/braid/consensus`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
type: 'PROPOSE',
|
||||
nodeId: 'test-node-2',
|
||||
timestamp: Date.now(),
|
||||
payload: { tx: {}, nodeSignature: {} },
|
||||
}),
|
||||
});
|
||||
|
||||
expect(response.ok).toBe(true);
|
||||
expect(proposeFn).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it('should reject messages without type', async () => {
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
app.use(manager.createRouter());
|
||||
|
||||
const server = app.listen(0);
|
||||
const addr = server.address() as any;
|
||||
|
||||
try {
|
||||
const response = await fetch(`http://localhost:${addr.port}/braid/consensus`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ nodeId: 'test' }),
|
||||
});
|
||||
|
||||
expect(response.status).toBe(400);
|
||||
} finally {
|
||||
server.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('Braid subscription', () => {
|
||||
it('should return peer info on GET /braid/peer-info', async () => {
|
||||
const app = express();
|
||||
app.use(manager.createRouter());
|
||||
|
||||
const server = app.listen(0);
|
||||
const addr = server.address() as any;
|
||||
|
||||
try {
|
||||
const response = await fetch(`http://localhost:${addr.port}/braid/peer-info`);
|
||||
const data = await response.json() as any;
|
||||
|
||||
expect(data.nodeId).toBe('test-node-1');
|
||||
expect(data.publicKey).toBe('abc123');
|
||||
} finally {
|
||||
server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it('should require X-Node-Id header for sync subscription', async () => {
|
||||
const app = express();
|
||||
app.use(manager.createRouter());
|
||||
|
||||
const server = app.listen(0);
|
||||
const addr = server.address() as any;
|
||||
|
||||
try {
|
||||
const response = await fetch(`http://localhost:${addr.port}/braid/sync`, {
|
||||
headers: { 'Subscribe': 'true' },
|
||||
});
|
||||
|
||||
expect(response.status).toBe(400);
|
||||
} finally {
|
||||
server.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2022",
|
||||
"module": "NodeNext",
|
||||
"moduleResolution": "NodeNext",
|
||||
"lib": ["ES2022"],
|
||||
"outDir": "./dist",
|
||||
"rootDir": "./src",
|
||||
"strict": true,
|
||||
"esModuleInterop": true,
|
||||
"skipLibCheck": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"resolveJsonModule": true,
|
||||
"declaration": true,
|
||||
"declarationMap": true,
|
||||
"sourceMap": true
|
||||
},
|
||||
"include": ["src/**/*"],
|
||||
"exclude": ["node_modules", "dist", "test"]
|
||||
}
|
||||
Loading…
Reference in New Issue