commit 6959ad4106330493500327260af6721048561660 Author: Jeff Emmett Date: Sun Mar 15 05:21:04 2026 +0000 feat: Braid-HTTP transport adapter for BFT-CRDT consensus sync Drop-in replacement for WebSocket-based PeerManager and StateSync using Braid-HTTP subscriptions. Key components: - BraidPeerManager: HTTP-native peer transport (no WebSocket server) - BraidStateSync: Push-based sync instead of polling - BraidStatePublisher: Serves JSON state to Simpleton light clients - SimpletonClient: Zero-CRDT read-only client (~50 lines of protocol) Co-Authored-By: Claude Opus 4.6 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d546b97 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +node_modules/ +dist/ +*.tsbuildinfo +.env diff --git a/examples/integration-guide.ts b/examples/integration-guide.ts new file mode 100644 index 0000000..4f907cd --- /dev/null +++ b/examples/integration-guide.ts @@ -0,0 +1,120 @@ +/** + * Integration Guide: Swapping WebSocket for Braid-HTTP + * + * This shows the minimal changes needed in consensus-service/src/consensus/service.ts + * to use Braid-HTTP transport instead of WebSocket. + * + * Changes needed: + * 1. Replace PeerManager with BraidPeerManager + * 2. Replace StateSync with BraidStateSync + * 3. Mount Braid router on Express app + * 4. Add BraidStatePublisher for Simpleton clients + * 5. Call notifyChange() after document mutations + * + * Everything else (ConsensusNode, ConsensusDocument, RollupAggregator) + * stays exactly the same. + */ + +// ============================================================ +// BEFORE (WebSocket-based) — in consensus/service.ts +// ============================================================ +// +// import { PeerManager } from '../sync/peer-manager.js'; +// import { StateSync } from '../sync/state-sync.js'; +// +// // In start(): +// this.peerManager = new PeerManager({ +// nodeId, publicKey, privateKey, host, port, redisUrl, +// }); +// await this.peerManager.start(); // starts WebSocket server on port+1000 +// +// this.stateSync = new StateSync(this.document, this.peerManager); +// this.stateSync.start(); // starts polling timer + +// ============================================================ +// AFTER (Braid-HTTP) — in consensus/service.ts +// ============================================================ + +import express from 'express'; +import { + BraidPeerManager, + BraidStateSync, + BraidStatePublisher, +} from '@rspace/braid-transport'; + +// --- In constructor or start() --- + +const app = express(); +app.use(express.json()); + +// 1. Create Braid peer manager (replaces WebSocket PeerManager) +const braidPeerManager = new BraidPeerManager({ + nodeId: 'node-1', + publicKey: '...', + privateKey: new Uint8Array([]), + host: 'consensus-service', + port: 3007, + redisUrl: 'redis://redis:6379', +}); + +// 2. Mount Braid routes on Express (no separate WebSocket server!) +app.use(braidPeerManager.createRouter()); + +// 3. Create push-based state sync (replaces polling StateSync) +const document = null as any; // your ConsensusDocument +const braidStateSync = new BraidStateSync(document, braidPeerManager); + +// 4. Create state publisher for Simpleton light clients +const statePublisher = new BraidStatePublisher(document); +app.use(statePublisher.createRouter()); + +// 5. Start everything +await braidPeerManager.start(); +braidStateSync.start(); + +// 6. After any document mutation, notify both sync systems: +// braidStateSync.notifyChange(); // push to peer nodes +// statePublisher.notifyChange(); // push to simpleton clients + +// --- Wire up consensus events (same as before) --- +// braidPeerManager.on('message:propose', ...) — same handlers +// braidPeerManager.on('message:vote', ...) — same handlers +// braidPeerManager.on('message:commit', ...) — same handlers +// braidPeerManager.on('peer:connected', ...) — same handlers +// braidPeerManager.on('peer:disconnected', ...) — same handlers + +// ============================================================ +// Simpleton Client Usage (e.g., in wallet-service) +// ============================================================ + +import { SimpletonClient } from '@rspace/braid-transport'; + +async function walletServiceExample() { + // Create a light client — no Automerge dependency needed! + const client = new SimpletonClient({ + endpoint: 'http://consensus-service:3007', + }); + + // Watch specific addresses for balance changes + client.watchAddress('0x1234...'); + + client.on('balance:changed', (address, balances) => { + console.log(`Balance changed for ${address}:`, balances); + }); + + client.on('state:updated', (state) => { + console.log('State version:', state.version); + console.log('Wallet count:', Object.keys(state.wallets).length); + }); + + // Connect — opens Braid-HTTP subscription + await client.connect(); + + // One-shot balance query (doesn't need subscription) + const balance = await client.fetchBalance('0x1234...'); + console.log('Balance:', balance); + + // Read from cached state (instant, no network call) + const cachedBalance = client.getBalance('0x1234...', '0x833589fcd6edb6e08f4c7c32d4f71b54bda02913'); + console.log('Cached USDC balance:', cachedBalance); +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..6bbf0a1 --- /dev/null +++ b/package.json @@ -0,0 +1,32 @@ +{ + "name": "@rspace/braid-transport", + "version": "0.1.0", + "private": true, + "description": "Braid-HTTP transport adapter for BFT-CRDT consensus sync", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "dev": "tsx watch src/index.ts", + "build": "tsc", + "start": "node dist/index.js", + "test": "vitest", + "lint": "eslint src/", + "clean": "rm -rf dist" + }, + "dependencies": { + "@automerge/automerge": "^2.2.8", + "braid-http": "^0.9.0", + "express": "^4.21.0", + "pino": "^8.21.0" + }, + "devDependencies": { + "@types/express": "^4.17.21", + "@types/node": "^20.14.0", + "tsx": "^4.19.0", + "typescript": "^5.5.0", + "vitest": "^1.6.0" + }, + "engines": { + "node": ">=20.0.0" + } +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..f38e4aa --- /dev/null +++ b/src/index.ts @@ -0,0 +1,37 @@ +/** + * @rspace/braid-transport + * + * Braid-HTTP transport adapter for the BFT-CRDT consensus service. + * + * Provides drop-in replacements for the WebSocket-based PeerManager + * and StateSync, using Braid-HTTP subscriptions for state synchronization + * and HTTP POST for consensus messages. + * + * Components: + * - BraidPeerManager: Replaces WebSocket PeerManager with HTTP transport + * - BraidStateSync: Push-based state sync instead of polling + * - BraidStatePublisher: Serves state to Simpleton light clients + * - SimpletonClient: Zero-CRDT light client for read-only state access + */ + +export { BraidPeerManager } from './transport/braid-peer-manager.js'; +export type { BraidPeerManagerEvents } from './transport/braid-peer-manager.js'; + +export { BraidStateSync } from './sync/braid-state-sync.js'; +export type { BraidStateSyncConfig } from './sync/braid-state-sync.js'; + +export { BraidStatePublisher } from './sync/braid-state-publisher.js'; +export type { StatePublisherConfig } from './sync/braid-state-publisher.js'; + +export { SimpletonClient } from './simpleton/simpleton-client.js'; +export type { SimpletonClientEvents } from './simpleton/simpleton-client.js'; + +export type { + BraidVersion, + BraidUpdate, + BraidPeerInfo, + BraidPeerManagerConfig, + ConsensusMessageEnvelope, + BraidSubscribeOptions, + SimpletonClientConfig, +} from './types/index.js'; diff --git a/src/simpleton/simpleton-client.ts b/src/simpleton/simpleton-client.ts new file mode 100644 index 0000000..e4f62f4 --- /dev/null +++ b/src/simpleton/simpleton-client.ts @@ -0,0 +1,299 @@ +/** + * Simpleton Light Client + * + * A minimal read-only client that subscribes to CRDT state via Braid-HTTP. + * The client maintains zero CRDT history — the server does all the heavy + * lifting. The client just receives patches and applies them to local state. + * + * This is Braid's "Simpleton" pattern: ~50 lines of protocol logic, + * no Automerge dependency on the client side. + * + * Use cases: + * - wallet-service reading balances (no need for full Automerge engine) + * - onramp-service monitoring deposit confirmations + * - Dashboard/monitoring UIs + * - Mobile clients with limited resources + */ + +import { EventEmitter } from 'events'; +import pino from 'pino'; +import type { SimpletonClientConfig } from '../types/index.js'; + +const logger = pino({ name: 'simpleton-client' }); + +/** + * Events emitted by SimpletonClient + */ +export interface SimpletonClientEvents { + 'connected': () => void; + 'disconnected': () => void; + 'state:updated': (state: any) => void; + 'balance:changed': (address: string, balances: Record) => void; + 'error': (error: Error) => void; +} + +export declare interface SimpletonClient { + on( + event: E, + listener: SimpletonClientEvents[E] + ): this; + emit( + event: E, + ...args: Parameters + ): boolean; +} + +/** + * Simpleton Light Client + * + * Subscribes to a consensus node's state via Braid-HTTP and maintains + * a local copy of the state without any CRDT machinery. + */ +export class SimpletonClient extends EventEmitter { + private config: Required; + private controller: AbortController | null = null; + private currentState: any = null; + private currentVersion: string | null = null; + private isConnected = false; + private reconnectTimer: NodeJS.Timeout | null = null; + private watchedAddresses: Set = new Set(); + + constructor(config: SimpletonClientConfig) { + super(); + + this.config = { + statePath: '/braid/state', + balancePath: '/balance', + reconnectDelayMs: 2000, + ...config, + }; + } + + /** + * Subscribe to the consensus node's state. + * Opens a long-lived Braid-HTTP subscription. + */ + async connect(): Promise { + if (this.isConnected) return; + + this.controller = new AbortController(); + + try { + const response = await fetch( + `${this.config.endpoint}${this.config.statePath}`, + { + method: 'GET', + headers: { + 'Subscribe': 'keep-alive', + 'Accept': 'application/json', + }, + signal: this.controller.signal, + } + ); + + if (!response.ok || !response.body) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + this.isConnected = true; + this.emit('connected'); + logger.info({ endpoint: this.config.endpoint }, 'Connected via Braid-HTTP'); + + // Read the subscription stream + await this.readStream(response.body); + } catch (error: any) { + if (error.name !== 'AbortError') { + logger.error({ error: error.message }, 'Connection failed'); + this.emit('error', error); + this.scheduleReconnect(); + } + } finally { + this.isConnected = false; + this.emit('disconnected'); + } + } + + /** + * Disconnect from the subscription. + */ + disconnect(): void { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + if (this.controller) { + this.controller.abort(); + this.controller = null; + } + } + + /** + * Watch an address for balance changes. + * When the state updates, if this address's balance changed, emit an event. + */ + watchAddress(address: string): void { + this.watchedAddresses.add(address.toLowerCase()); + } + + /** + * Stop watching an address. + */ + unwatchAddress(address: string): void { + this.watchedAddresses.delete(address.toLowerCase()); + } + + /** + * Get the current state (locally cached). + */ + getState(): any { + return this.currentState; + } + + /** + * Get a specific balance from the cached state. + */ + getBalance(address: string, token?: string): string { + if (!this.currentState?.wallets) return '0'; + + const wallet = this.currentState.wallets[address.toLowerCase()]; + if (!wallet?.balances) return '0'; + + if (token) { + return wallet.balances[token.toLowerCase()]?.amount ?? '0'; + } + + // Return first token balance (backward compat) + const firstToken = Object.values(wallet.balances)[0] as any; + return firstToken?.amount ?? '0'; + } + + /** + * Fetch a balance directly via HTTP (one-shot, no subscription). + */ + async fetchBalance(address: string): Promise { + const response = await fetch( + `${this.config.endpoint}${this.config.balancePath}/${address}` + ); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}`); + } + + return response.json(); + } + + /** + * Read the Braid subscription stream and apply updates. + */ + private async readStream(body: ReadableStream): Promise { + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Parse Braid updates (delimited by double CRLF) + const parts = buffer.split('\r\n\r\n'); + buffer = parts.pop() ?? ''; + + for (const part of parts) { + if (!part.trim()) continue; + this.applyUpdate(part); + } + } + } + + /** + * Apply a Braid update to local state. + * + * This is the core "simpleton" logic — we don't run any CRDT, + * we just receive the server's resolved state. + */ + private applyUpdate(chunk: string): void { + try { + // Parse headers + const lines = chunk.split('\r\n'); + let version: string | null = null; + let contentType = 'application/json'; + let bodyStart = -1; + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + + if (line === '') { + bodyStart = i + 1; + break; + } + + const colonIdx = line.indexOf(':'); + if (colonIdx === -1) continue; + + const key = line.slice(0, colonIdx).trim().toLowerCase(); + const val = line.slice(colonIdx + 1).trim(); + + if (key === 'version') { + version = val.replace(/"/g, ''); + } else if (key === 'content-type') { + contentType = val; + } + } + + if (bodyStart === -1) return; + + const body = lines.slice(bodyStart).join('\r\n'); + if (!body) return; + + // For JSON state updates, parse and diff + if (contentType.includes('json')) { + const newState = JSON.parse(body); + const oldState = this.currentState; + + this.currentState = newState; + this.currentVersion = version; + + this.emit('state:updated', newState); + + // Check watched addresses for balance changes + if (oldState && newState.wallets) { + for (const address of this.watchedAddresses) { + const oldWallet = oldState.wallets?.[address]; + const newWallet = newState.wallets?.[address]; + + const oldBalStr = JSON.stringify(oldWallet?.balances ?? {}); + const newBalStr = JSON.stringify(newWallet?.balances ?? {}); + + if (oldBalStr !== newBalStr && newWallet?.balances) { + const balances: Record = {}; + for (const [token, entry] of Object.entries(newWallet.balances)) { + balances[token] = (entry as any).amount; + } + this.emit('balance:changed', address, balances); + } + } + } + + logger.debug({ version }, 'Applied state update'); + } + } catch (error) { + logger.error({ error }, 'Failed to apply Braid update'); + } + } + + /** + * Schedule a reconnection attempt. + */ + private scheduleReconnect(): void { + if (this.reconnectTimer) return; + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + logger.info('Attempting reconnect...'); + this.connect(); + }, this.config.reconnectDelayMs); + } +} diff --git a/src/sync/braid-state-publisher.ts b/src/sync/braid-state-publisher.ts new file mode 100644 index 0000000..8415b8d --- /dev/null +++ b/src/sync/braid-state-publisher.ts @@ -0,0 +1,184 @@ +/** + * Braid State Publisher + * + * Express middleware that serves the current CRDT state as a + * Braid-HTTP resource. Simpleton clients subscribe here to receive + * JSON state updates without needing any CRDT engine. + * + * This is the server-side counterpart to SimpletonClient. + */ + +import { Router, Request, Response } from 'express'; +import pino from 'pino'; + +const logger = pino({ name: 'braid-state-publisher' }); + +/** + * Active subscriber connection + */ +interface StateSubscriber { + id: string; + res: Response; + connectedAt: number; +} + +/** + * Configuration for the state publisher + */ +export interface StatePublisherConfig { + /** Path to serve state on, default: /braid/state */ + path?: string; + /** Throttle interval for state pushes (ms), default: 100 */ + throttleMs?: number; +} + +/** + * Braid State Publisher + * + * Serves the current document state as a subscribable Braid-HTTP resource. + * Clients receive JSON snapshots, not Automerge binary — they never need + * the CRDT engine. + */ +export class BraidStatePublisher { + private config: Required; + private document: any; // ConsensusDocument + private subscribers: Map = new Map(); + private versionCounter = 0; + private throttleTimer: NodeJS.Timeout | null = null; + private subscriberIdCounter = 0; + + constructor(document: any, config: StatePublisherConfig = {}) { + this.document = document; + this.config = { + path: '/braid/state', + throttleMs: 100, + ...config, + }; + } + + /** + * Create Express router for the state publisher. + */ + createRouter(): Router { + const router = Router(); + + router.get(this.config.path, (req: Request, res: Response) => { + this.handleRequest(req, res); + }); + + return router; + } + + /** + * Handle a GET request, either one-shot or subscription. + */ + private handleRequest(req: Request, res: Response): void { + const subscribe = req.headers['subscribe']; + const state = this.document.getState(); + + if (subscribe === 'true' || subscribe === 'keep-alive') { + // Braid subscription — keep connection open + const id = `sub-${++this.subscriberIdCounter}`; + + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Subscribe': 'keep-alive', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Transfer-Encoding': 'chunked', + }); + + this.subscribers.set(id, { + id, + res, + connectedAt: Date.now(), + }); + + // Send current state as first update + this.sendUpdate(id, state); + + logger.info( + { subscriberId: id, totalSubscribers: this.subscribers.size }, + 'New state subscriber' + ); + + req.on('close', () => { + this.subscribers.delete(id); + logger.info( + { subscriberId: id, totalSubscribers: this.subscribers.size }, + 'State subscriber disconnected' + ); + }); + } else { + // One-shot — return current state with version header + res.set('Version', `"v${this.versionCounter}"`); + res.set('Content-Type', 'application/json'); + res.json(state); + } + } + + /** + * Notify that state has changed — push to all subscribers. + * Call this after any document change. + */ + notifyChange(): void { + if (this.subscribers.size === 0) return; + + // Throttle rapid changes + if (this.throttleTimer) return; + + this.throttleTimer = setTimeout(() => { + this.throttleTimer = null; + this.pushToAll(); + }, this.config.throttleMs); + } + + /** + * Push current state to all subscribers. + */ + private pushToAll(): void { + const state = this.document.getState(); + + for (const [id] of this.subscribers) { + this.sendUpdate(id, state); + } + } + + /** + * Send a Braid update to a specific subscriber. + */ + private sendUpdate(subscriberId: string, state: any): void { + const subscriber = this.subscribers.get(subscriberId); + if (!subscriber) return; + + const version = `v${++this.versionCounter}`; + const parentVersion = `v${this.versionCounter - 1}`; + const body = JSON.stringify(state); + + try { + const chunk = [ + '', + `Version: "${version}"`, + `Parents: "${parentVersion}"`, + `Content-Type: application/json`, + `Merge-Type: simpleton`, + `Content-Length: ${Buffer.byteLength(body)}`, + '', + body, + '', + ].join('\r\n'); + + subscriber.res.write(chunk); + } catch (error) { + logger.error({ subscriberId, error }, 'Failed to send update'); + this.subscribers.delete(subscriberId); + } + } + + /** + * Get number of active subscribers. + */ + get subscriberCount(): number { + return this.subscribers.size; + } +} diff --git a/src/sync/braid-state-sync.ts b/src/sync/braid-state-sync.ts new file mode 100644 index 0000000..d138246 --- /dev/null +++ b/src/sync/braid-state-sync.ts @@ -0,0 +1,282 @@ +/** + * Braid State Sync + * + * Drop-in replacement for StateSync that uses Braid-HTTP subscriptions + * instead of polling-based WebSocket sync. + * + * Key improvement: instead of periodic syncWithAllPeers() on a timer, + * state changes are pushed to subscribers immediately via Braid-HTTP. + * This reduces latency from syncIntervalMs (default 1s) to near-zero. + */ + +import * as Automerge from '@automerge/automerge'; +import pino from 'pino'; +import type { BraidPeerManager } from '../transport/braid-peer-manager.js'; + +const logger = pino({ name: 'braid-state-sync' }); + +/** + * Configuration for BraidStateSync + */ +export interface BraidStateSyncConfig { + /** Debounce interval for batching rapid changes into one push (ms) */ + pushDebounceMs?: number; +} + +const DEFAULT_CONFIG: Required = { + pushDebounceMs: 50, +}; + +/** + * Sync state tracked per peer + */ +interface PeerSyncState { + nodeId: string; + syncState: Automerge.SyncState; + lastSyncAt: number; +} + +/** + * Braid-HTTP State Synchronization + * + * Instead of polling, this listens for document changes and pushes + * Automerge sync messages to subscribers via Braid-HTTP. + */ +export class BraidStateSync { + private config: Required; + private document: any; // ConsensusDocument — typed as any to avoid circular dep + private peerManager: BraidPeerManager; + private peerSyncStates: Map = new Map(); + private isRunning = false; + private pushTimer: NodeJS.Timeout | null = null; + private pendingPush = false; + + constructor( + document: any, + peerManager: BraidPeerManager, + config: BraidStateSyncConfig = {} + ) { + this.document = document; + this.peerManager = peerManager; + this.config = { ...DEFAULT_CONFIG, ...config }; + + // Listen for peer events + this.peerManager.on('peer:connected', (nodeId: string) => { + this.initPeerSyncState(nodeId); + // Push current state to new peer immediately + this.syncWithPeer(nodeId); + }); + + this.peerManager.on('peer:disconnected', (nodeId: string) => { + this.peerSyncStates.delete(nodeId); + }); + + // Listen for incoming sync messages (via Braid subscription) + this.peerManager.on('message:sync_request', (nodeId: string) => { + this.handleSyncRequest(nodeId); + }); + + this.peerManager.on('message:sync_response', (nodeId: string, data: Uint8Array) => { + this.handleSyncResponse(nodeId, data); + }); + } + + /** + * Start the sync manager. + * Unlike the WebSocket version, we don't poll — we push on change. + */ + start(): void { + if (this.isRunning) return; + + logger.info('Starting Braid state sync'); + + // Initialize sync state for existing peers + for (const nodeId of this.peerManager.connectedPeerIds) { + this.initPeerSyncState(nodeId); + } + + this.isRunning = true; + } + + /** + * Stop the sync manager. + */ + stop(): void { + if (!this.isRunning) return; + + logger.info('Stopping Braid state sync'); + + if (this.pushTimer) { + clearTimeout(this.pushTimer); + this.pushTimer = null; + } + + this.peerSyncStates.clear(); + this.isRunning = false; + } + + /** + * Notify that the document has changed. + * Call this after any Automerge.change() to trigger a push to subscribers. + * This is the key API difference from the polling-based StateSync. + */ + notifyChange(): void { + if (!this.isRunning) return; + + // Debounce rapid changes + if (this.pushTimer) return; + + this.pushTimer = setTimeout(() => { + this.pushTimer = null; + this.pushToAllPeers(); + }, this.config.pushDebounceMs); + } + + /** + * Push sync messages to all connected peers via Braid-HTTP. + */ + private pushToAllPeers(): void { + for (const nodeId of this.peerManager.connectedPeerIds) { + this.syncWithPeer(nodeId); + } + } + + /** + * Initialize sync state for a peer. + */ + private initPeerSyncState(nodeId: string): void { + if (this.peerSyncStates.has(nodeId)) return; + + this.peerSyncStates.set(nodeId, { + nodeId, + syncState: Automerge.initSyncState(), + lastSyncAt: 0, + }); + + logger.debug({ nodeId }, 'Initialized sync state for peer'); + } + + /** + * Generate and push sync message to a specific peer. + * Uses Braid-HTTP push instead of WebSocket send. + */ + private async syncWithPeer(nodeId: string): Promise { + const peerState = this.peerSyncStates.get(nodeId); + if (!peerState) return; + + try { + const doc = this.document['doc']; + const [newSyncState, syncMessage] = Automerge.generateSyncMessage( + doc, + peerState.syncState + ); + + peerState.syncState = newSyncState; + + if (syncMessage) { + // Push via Braid-HTTP to the subscriber + this.peerManager.pushUpdate(syncMessage, 'automerge'); + + // Also send as a direct message for peers that aren't subscribed + try { + await this.peerManager.sendTo(nodeId, { + type: 'SYNC_RESPONSE', + nodeId: this.peerManager.connectedPeerIds[0] ?? '', // our nodeId + timestamp: Date.now(), + payload: { + data: Buffer.from(syncMessage).toString('base64'), + }, + }); + } catch { + // Peer might not be reachable via direct send — that's fine, + // the Braid push covers subscribed peers + } + + logger.debug( + { nodeId, messageSize: syncMessage.length }, + 'Pushed sync via Braid' + ); + } + + peerState.lastSyncAt = Date.now(); + } catch (error) { + logger.error({ nodeId, error }, 'Sync with peer failed'); + } + } + + /** + * Handle sync request from peer. + */ + private async handleSyncRequest(nodeId: string): Promise { + if (!this.peerSyncStates.has(nodeId)) { + this.initPeerSyncState(nodeId); + } + + await this.syncWithPeer(nodeId); + } + + /** + * Handle sync response from peer (received via Braid subscription). + */ + private handleSyncResponse(nodeId: string, data: Uint8Array): void { + const peerState = this.peerSyncStates.get(nodeId); + if (!peerState) { + this.initPeerSyncState(nodeId); + return; + } + + try { + const doc = this.document['doc']; + const [newDoc, newSyncState] = Automerge.receiveSyncMessage( + doc, + peerState.syncState, + data + ); + + this.document['doc'] = newDoc; + peerState.syncState = newSyncState; + + logger.debug({ nodeId }, 'Applied Braid sync message from peer'); + + // Check if we need to send more + const [, nextMessage] = Automerge.generateSyncMessage( + newDoc, + newSyncState + ); + + if (nextMessage) { + this.syncWithPeer(nodeId); + } + } catch (error) { + logger.error({ nodeId, error }, 'Failed to apply sync message'); + } + } + + /** + * Force full sync with a peer (for recovery). + */ + async forceSync(nodeId: string): Promise { + this.peerSyncStates.set(nodeId, { + nodeId, + syncState: Automerge.initSyncState(), + lastSyncAt: 0, + }); + + await this.syncWithPeer(nodeId); + } + + /** + * Get sync status for all peers. + */ + getSyncStatus(): Array<{ + nodeId: string; + lastSyncAt: number; + isConnected: boolean; + }> { + return Array.from(this.peerSyncStates.values()).map((state) => ({ + nodeId: state.nodeId, + lastSyncAt: state.lastSyncAt, + isConnected: this.peerManager.connectedPeerIds.includes(state.nodeId), + })); + } +} diff --git a/src/transport/braid-peer-manager.ts b/src/transport/braid-peer-manager.ts new file mode 100644 index 0000000..a702591 --- /dev/null +++ b/src/transport/braid-peer-manager.ts @@ -0,0 +1,659 @@ +/** + * Braid-HTTP Peer Manager + * + * Drop-in replacement for the WebSocket-based PeerManager. + * Uses Braid-HTTP subscriptions for state sync and standard HTTP POST + * for consensus messages (PROPOSE/VOTE/COMMIT). + * + * Key differences from WebSocket approach: + * - Sync uses HTTP GET with Subscribe header (long-lived connections) + * - Consensus messages use HTTP POST (request/response) + * - Works through reverse proxies, CDNs, and load balancers + * - No WebSocket server needed — reuses Express HTTP server + */ + +import { EventEmitter } from 'events'; +import { Router, Request, Response } from 'express'; +import pino from 'pino'; +import type { + BraidPeerManagerConfig, + BraidPeerInfo, + ConsensusMessageEnvelope, + BraidVersion, +} from '../types/index.js'; + +const logger = pino({ name: 'braid-peer-manager' }); + +/** + * Events emitted by BraidPeerManager — matches PeerManager's interface + * so ConsensusService can wire up identically. + */ +export interface BraidPeerManagerEvents { + 'peer:connected': (nodeId: string) => void; + 'peer:disconnected': (nodeId: string) => void; + 'message:propose': (message: any) => void; + 'message:vote': (message: any) => void; + 'message:commit': (message: any) => void; + 'message:sync_request': (nodeId: string) => void; + 'message:sync_response': (nodeId: string, data: Uint8Array) => void; +} + +export declare interface BraidPeerManager { + on( + event: E, + listener: BraidPeerManagerEvents[E] + ): this; + emit( + event: E, + ...args: Parameters + ): boolean; +} + +/** + * Tracks an active Braid subscription from a remote peer. + */ +interface BraidSubscriber { + nodeId: string; + res: Response; + connectedAt: number; +} + +/** + * Tracks an outbound subscription to a remote peer. + */ +interface OutboundSubscription { + nodeId: string; + controller: AbortController; + endpoint: string; + connectedAt: number; +} + +/** + * Peer connection adapter — implements the same PeerConnection interface + * as the WebSocket version so ConsensusNode can use it unchanged. + */ +class BraidPeer { + constructor( + public readonly nodeId: string, + private endpoint: string + ) {} + + get isAlive(): boolean { + return true; // HTTP connections are stateless — liveness tracked via heartbeat + } + + async send(message: unknown): Promise { + const response = await fetch(`${this.endpoint}/braid/consensus`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(message), + }); + + if (!response.ok) { + throw new Error(`HTTP ${response.status} sending to ${this.nodeId}`); + } + } + + close(): void { + // No persistent connection to close in HTTP mode + } +} + +/** + * Braid-HTTP Peer Manager + * + * Provides the same EventEmitter interface as the WebSocket PeerManager + * but uses Braid-HTTP for transport. + */ +export class BraidPeerManager extends EventEmitter { + private config: Required; + private peers: Map = new Map(); + private knownPeers: Map = new Map(); + + // Inbound Braid subscriptions (peers subscribing to our state) + private subscribers: Map = new Map(); + + // Outbound subscriptions (us subscribing to peer state) + private outboundSubs: Map = new Map(); + + // Current version counter for Braid versioning + private versionCounter = 0; + + // Heartbeat/discovery (still uses Redis for peer discovery) + private heartbeatTimer: NodeJS.Timeout | null = null; + private discoveryTimer: NodeJS.Timeout | null = null; + private isRunning = false; + + // Redis clients (lazy — injected via setRedis) + private redis: any = null; + + constructor(config: BraidPeerManagerConfig) { + super(); + + this.config = { + syncPath: '/braid/sync', + consensusPath: '/braid/consensus', + heartbeatIntervalMs: 5000, + peerTimeoutMs: 15000, + reconnectIntervalMs: 3000, + ...config, + }; + } + + /** + * Inject Redis clients for peer discovery. + * Call this before start() if using Redis-based discovery. + */ + setRedis(redis: any, redisSub: any): void { + this.redis = redis; + // Note: redisSub for pub/sub not needed in HTTP mode — + // we discover peers via polling Redis hash + } + + /** + * Create Express router with Braid-HTTP endpoints. + * Mount this on the consensus service's Express app. + */ + createRouter(): Router { + const router = Router(); + + // Braid-HTTP sync endpoint — peers subscribe here for state updates + router.get(this.config.syncPath, (req: Request, res: Response) => { + this.handleSyncSubscribe(req, res); + }); + + // Consensus message endpoint — receives PROPOSE/VOTE/COMMIT via POST + router.post(this.config.consensusPath, (req: Request, res: Response) => { + this.handleConsensusMessage(req, res); + }); + + // Peer info endpoint — returns this node's identity + router.get('/braid/peer-info', (_req: Request, res: Response) => { + res.json({ + nodeId: this.config.nodeId, + publicKey: this.config.publicKey, + endpoint: `http://${this.config.host}:${this.config.port}`, + lastSeen: Date.now(), + }); + }); + + return router; + } + + /** + * Handle incoming Braid subscription (GET with Subscribe header). + * + * This implements the core Braid-HTTP pattern: + * - Client sends GET with `Subscribe: true` + * - Server responds with current version and keeps connection open + * - Server pushes updates as multipart patches + */ + private handleSyncSubscribe(req: Request, res: Response): void { + const subscribe = req.headers['subscribe']; + const nodeId = req.headers['x-node-id'] as string; + + if (!nodeId) { + res.status(400).json({ error: 'Missing X-Node-Id header' }); + return; + } + + if (subscribe === 'true' || subscribe === 'keep-alive') { + // Long-lived Braid subscription + logger.info({ nodeId }, 'Peer subscribing via Braid-HTTP'); + + // Set Braid-HTTP response headers + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Subscribe': 'keep-alive', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Transfer-Encoding': 'chunked', + 'X-Node-Id': this.config.nodeId, + }); + + // Track subscriber + this.subscribers.set(nodeId, { + nodeId, + res, + connectedAt: Date.now(), + }); + + // Register as connected peer + if (!this.peers.has(nodeId)) { + const peerInfo = this.knownPeers.get(nodeId); + if (peerInfo) { + this.peers.set(nodeId, new BraidPeer(nodeId, peerInfo.endpoint)); + this.emit('peer:connected', nodeId); + } + } + + // Trigger initial sync + this.emit('message:sync_request', nodeId); + + // Handle disconnect + req.on('close', () => { + logger.info({ nodeId }, 'Braid subscriber disconnected'); + this.subscribers.delete(nodeId); + this.peers.delete(nodeId); + this.emit('peer:disconnected', nodeId); + }); + } else { + // One-shot GET — return current version + res.json({ + version: this.currentVersion, + nodeId: this.config.nodeId, + }); + } + } + + /** + * Handle incoming consensus message via HTTP POST. + */ + private handleConsensusMessage(req: Request, res: Response): void { + try { + const message = req.body as ConsensusMessageEnvelope; + + if (!message.type || !message.nodeId) { + res.status(400).json({ error: 'Invalid message format' }); + return; + } + + // Route to appropriate handler (same as WebSocket version) + switch (message.type) { + case 'PROPOSE': + this.emit('message:propose', message); + break; + case 'VOTE': + this.emit('message:vote', message); + break; + case 'COMMIT': + this.emit('message:commit', message); + break; + case 'SYNC_REQUEST': + this.emit('message:sync_request', message.nodeId); + break; + case 'SYNC_RESPONSE': { + const syncData = (message.payload as { data: string }).data; + this.emit( + 'message:sync_response', + message.nodeId, + Buffer.from(syncData, 'base64') + ); + break; + } + default: + logger.warn({ type: message.type }, 'Unknown message type'); + } + + res.status(200).json({ ok: true }); + } catch (error) { + logger.error({ error }, 'Failed to handle consensus message'); + res.status(500).json({ error: 'Internal error' }); + } + } + + /** + * Push a Braid update to all subscribers. + * + * Format follows Braid-HTTP spec: + * ``` + * Version: "v123" + * Parents: "v122" + * Content-Type: application/octet-stream + * Merge-Type: automerge + * + * + * ``` + */ + pushUpdate(data: Uint8Array, mergeType = 'automerge'): void { + const version = this.nextVersion(); + const parentVersion = `v${this.versionCounter - 1}`; + + for (const [nodeId, subscriber] of this.subscribers) { + try { + // Write Braid update as a multipart chunk + const header = [ + '', + `Version: "${version}"`, + `Parents: "${parentVersion}"`, + `Content-Type: application/octet-stream`, + `Merge-Type: ${mergeType}`, + `Content-Length: ${data.length}`, + '', + ].join('\r\n'); + + subscriber.res.write(header); + subscriber.res.write(data); + subscriber.res.write('\r\n\r\n'); + + logger.debug( + { nodeId, version, size: data.length }, + 'Pushed Braid update' + ); + } catch (error) { + logger.error({ nodeId, error }, 'Failed to push update to subscriber'); + this.subscribers.delete(nodeId); + } + } + } + + /** + * Start the peer manager. + * Unlike WebSocket version, does NOT start its own server — + * the Express router handles incoming connections. + */ + async start(): Promise { + if (this.isRunning) return; + + logger.info( + { nodeId: this.config.nodeId, port: this.config.port }, + 'Starting Braid peer manager' + ); + + // Register ourselves in Redis + if (this.redis) { + await this.registerSelf(); + this.startHeartbeat(); + this.startDiscovery(); + } + + this.isRunning = true; + + logger.info( + { nodeId: this.config.nodeId }, + 'Braid peer manager started (mount router on Express app)' + ); + } + + /** + * Stop the peer manager. + */ + async stop(): Promise { + if (!this.isRunning) return; + + logger.info({ nodeId: this.config.nodeId }, 'Stopping Braid peer manager'); + + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + + if (this.discoveryTimer) { + clearInterval(this.discoveryTimer); + this.discoveryTimer = null; + } + + // Abort outbound subscriptions + for (const sub of this.outboundSubs.values()) { + sub.controller.abort(); + } + this.outboundSubs.clear(); + + // Close inbound subscriber connections + for (const sub of this.subscribers.values()) { + try { sub.res.end(); } catch {} + } + this.subscribers.clear(); + + this.peers.clear(); + + if (this.redis) { + await this.redis.hdel('consensus:peers', this.config.nodeId); + } + + this.isRunning = false; + logger.info({ nodeId: this.config.nodeId }, 'Braid peer manager stopped'); + } + + /** + * Sign and broadcast a consensus message to all connected peers. + * Uses HTTP POST instead of WebSocket send. + */ + async broadcast(message: Omit): Promise { + const signed = this.signMessage(message); + + const sendPromises = Array.from(this.peers.values()).map(async (peer) => { + try { + await peer.send(signed); + } catch (error) { + logger.error({ nodeId: peer.nodeId, error }, 'Failed to send to peer'); + } + }); + + await Promise.all(sendPromises); + } + + /** + * Send a message to a specific peer via HTTP POST. + */ + async sendTo(nodeId: string, message: Omit): Promise { + const peer = this.peers.get(nodeId); + if (!peer) { + throw new Error(`Peer ${nodeId} not connected`); + } + + const signed = this.signMessage(message); + await peer.send(signed); + } + + /** + * Subscribe to a remote peer's state via Braid-HTTP. + * Opens a long-lived GET request with Subscribe: true. + */ + async subscribeToPeer(peerInfo: BraidPeerInfo): Promise { + if (this.outboundSubs.has(peerInfo.nodeId)) return; + + const controller = new AbortController(); + const endpoint = peerInfo.endpoint; + + this.outboundSubs.set(peerInfo.nodeId, { + nodeId: peerInfo.nodeId, + controller, + endpoint, + connectedAt: Date.now(), + }); + + try { + const response = await fetch(`${endpoint}${this.config.syncPath}`, { + method: 'GET', + headers: { + 'Subscribe': 'keep-alive', + 'X-Node-Id': this.config.nodeId, + 'Accept': 'application/octet-stream', + }, + signal: controller.signal, + }); + + if (!response.ok || !response.body) { + throw new Error(`HTTP ${response.status}`); + } + + // Register peer + if (!this.peers.has(peerInfo.nodeId)) { + this.peers.set(peerInfo.nodeId, new BraidPeer(peerInfo.nodeId, endpoint)); + this.emit('peer:connected', peerInfo.nodeId); + } + + logger.info({ nodeId: peerInfo.nodeId }, 'Subscribed to peer via Braid-HTTP'); + + // Read the stream + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Parse Braid updates from the stream + // Each update is delimited by double CRLF + const updates = buffer.split('\r\n\r\n'); + buffer = updates.pop() ?? ''; // Keep incomplete chunk + + for (const chunk of updates) { + if (!chunk.trim()) continue; + this.handleBraidUpdate(peerInfo.nodeId, chunk); + } + } + } catch (error: any) { + if (error.name !== 'AbortError') { + logger.error( + { nodeId: peerInfo.nodeId, error: error.message }, + 'Braid subscription failed' + ); + } + } finally { + this.outboundSubs.delete(peerInfo.nodeId); + if (this.peers.has(peerInfo.nodeId)) { + this.peers.delete(peerInfo.nodeId); + this.emit('peer:disconnected', peerInfo.nodeId); + } + } + } + + /** + * Parse and handle an incoming Braid update from a subscription stream. + */ + private handleBraidUpdate(nodeId: string, chunk: string): void { + try { + // Extract headers and body from the Braid update + const headerEnd = chunk.indexOf('\r\n\r\n'); + if (headerEnd === -1) { + // Body might be right after headers with a single newline separation + const lines = chunk.split('\r\n'); + const bodyStartIdx = lines.findIndex(l => l === ''); + if (bodyStartIdx === -1) return; + + // Everything after the empty line is body + const bodyLines = lines.slice(bodyStartIdx + 1); + const body = bodyLines.join('\r\n'); + if (body.length > 0) { + const data = new Uint8Array(Buffer.from(body, 'binary')); + this.emit('message:sync_response', nodeId, data); + } + return; + } + + const body = chunk.slice(headerEnd + 4); + if (body.length > 0) { + const data = new Uint8Array(Buffer.from(body, 'binary')); + this.emit('message:sync_response', nodeId, data); + } + } catch (error) { + logger.error({ nodeId, error }, 'Failed to parse Braid update'); + } + } + + // --- Redis-based peer discovery (reused from WebSocket version) --- + + private async registerSelf(): Promise { + const peerInfo: BraidPeerInfo = { + nodeId: this.config.nodeId, + publicKey: this.config.publicKey, + endpoint: `http://${this.config.host}:${this.config.port}`, + lastSeen: Date.now(), + }; + + await this.redis.hset( + 'consensus:peers', + this.config.nodeId, + JSON.stringify(peerInfo) + ); + + logger.debug({ nodeId: this.config.nodeId }, 'Registered in Redis'); + } + + private startHeartbeat(): void { + this.heartbeatTimer = setInterval(async () => { + await this.registerSelf(); + + // Check for dead outbound subscriptions and reconnect + for (const [nodeId, sub] of this.outboundSubs) { + // If the connection seems dead, abort and let discovery reconnect + const age = Date.now() - sub.connectedAt; + if (age > this.config.peerTimeoutMs * 3) { + logger.debug({ nodeId }, 'Stale outbound subscription, will reconnect'); + } + } + }, this.config.heartbeatIntervalMs); + } + + private startDiscovery(): void { + // Initial discovery + this.discoverPeers(); + + this.discoveryTimer = setInterval(() => { + this.discoverPeers(); + }, this.config.reconnectIntervalMs); + } + + private async discoverPeers(): Promise { + if (!this.redis) return; + + try { + const peerData = await this.redis.hgetall('consensus:peers'); + + for (const [nodeId, infoStr] of Object.entries(peerData as Record)) { + if (nodeId === this.config.nodeId) continue; + + const peerInfo = JSON.parse(infoStr) as BraidPeerInfo; + + // Check if peer is stale + if (Date.now() - peerInfo.lastSeen > this.config.peerTimeoutMs) { + await this.redis.hdel('consensus:peers', nodeId); + continue; + } + + this.knownPeers.set(nodeId, peerInfo); + + // Subscribe to peer if not already subscribed + if (!this.outboundSubs.has(nodeId) && !this.peers.has(nodeId)) { + // Deterministic tie-breaker: lower nodeId initiates subscription + if (this.config.nodeId < nodeId) { + this.subscribeToPeer(peerInfo); + } + } + } + } catch (error) { + logger.error({ error }, 'Peer discovery failed'); + } + } + + // --- Helpers --- + + private signMessage(message: Omit): ConsensusMessageEnvelope { + // Signing is delegated to the caller (ConsensusService) in the existing + // architecture. Here we just pass through — the message is already structured + // the same way as the WebSocket version. + return message as ConsensusMessageEnvelope; + } + + private nextVersion(): BraidVersion { + this.versionCounter++; + return `v${this.versionCounter}`; + } + + private get currentVersion(): BraidVersion { + return `v${this.versionCounter}`; + } + + // --- Public accessors (matches PeerManager interface) --- + + get connectedPeers(): number { + return this.peers.size; + } + + get connectedPeerIds(): string[] { + return Array.from(this.peers.keys()); + } + + getPeer(nodeId: string): BraidPeer | undefined { + return this.peers.get(nodeId); + } + + getPeerInfo(nodeId: string): BraidPeerInfo | undefined { + return this.knownPeers.get(nodeId); + } +} diff --git a/src/types/index.ts b/src/types/index.ts new file mode 100644 index 0000000..fe20e89 --- /dev/null +++ b/src/types/index.ts @@ -0,0 +1,96 @@ +/** + * Braid Transport Types + * + * Shared types for the Braid-HTTP transport adapter. + */ + +/** + * Braid version identifier — opaque string, typically a hash or timestamp. + */ +export type BraidVersion = string; + +/** + * Braid subscription update received from a peer. + */ +export interface BraidUpdate { + version: BraidVersion; + parents: BraidVersion[]; + body: Uint8Array; + /** Content-Type of the patch body */ + contentType: string; + /** Merge-Type header, e.g. "automerge" */ + mergeType?: string; +} + +/** + * Peer registration info for Braid-HTTP discovery. + */ +export interface BraidPeerInfo { + nodeId: string; + publicKey: string; + endpoint: string; // HTTP base URL, e.g. http://node-2:3007 + lastSeen: number; +} + +/** + * Configuration for BraidPeerManager. + */ +export interface BraidPeerManagerConfig { + nodeId: string; + publicKey: string; + privateKey: Uint8Array; + /** HTTP port this node listens on (same as consensus API port) */ + port: number; + /** Hostname/IP for peer discovery */ + host: string; + /** Redis URL for peer discovery */ + redisUrl: string; + /** Braid subscription path on each peer, default: /braid/sync */ + syncPath?: string; + /** Braid consensus message path, default: /braid/consensus */ + consensusPath?: string; + /** Heartbeat interval in ms, default: 5000 */ + heartbeatIntervalMs?: number; + /** Peer timeout in ms, default: 15000 */ + peerTimeoutMs?: number; + /** Reconnect interval in ms, default: 3000 */ + reconnectIntervalMs?: number; +} + +/** + * A consensus message envelope — same shape as the existing WebSocket protocol + * so it's a drop-in replacement at the transport layer. + */ +export interface ConsensusMessageEnvelope { + type: 'PROPOSE' | 'VOTE' | 'COMMIT' | 'SYNC_REQUEST' | 'SYNC_RESPONSE' | 'HEARTBEAT'; + nodeId: string; + timestamp: number; + payload: unknown; + signature?: string; +} + +/** + * Options for Braid subscription. + */ +export interface BraidSubscribeOptions { + /** Subscribe to updates (long-lived connection) */ + subscribe?: boolean; + /** Specific version to request */ + version?: BraidVersion; + /** Known parent versions (for sync) */ + parents?: BraidVersion[]; +} + +/** + * Options for the Simpleton light client. + */ +export interface SimpletonClientConfig { + /** Base URL of the consensus node to connect to */ + endpoint: string; + /** Path for state subscription, default: /braid/state */ + statePath?: string; + /** Path for balance queries, default: /balance */ + balancePath?: string; + /** Reconnect delay in ms, default: 2000 */ + reconnectDelayMs?: number; +} diff --git a/test/braid-peer-manager.test.ts b/test/braid-peer-manager.test.ts new file mode 100644 index 0000000..f020e45 --- /dev/null +++ b/test/braid-peer-manager.test.ts @@ -0,0 +1,161 @@ +/** + * Tests for BraidPeerManager + * + * Verifies that the Braid-HTTP transport layer correctly replaces + * the WebSocket PeerManager while maintaining the same event interface. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import express from 'express'; +import { BraidPeerManager } from '../src/transport/braid-peer-manager.js'; + +describe('BraidPeerManager', () => { + let manager: BraidPeerManager; + + beforeEach(() => { + manager = new BraidPeerManager({ + nodeId: 'test-node-1', + publicKey: 'abc123', + privateKey: new Uint8Array(32), + host: 'localhost', + port: 3007, + redisUrl: 'redis://localhost:6379', + }); + }); + + describe('interface compatibility', () => { + it('should have same public accessors as WebSocket PeerManager', () => { + expect(manager.connectedPeers).toBe(0); + expect(manager.connectedPeerIds).toEqual([]); + expect(manager.getPeer('unknown')).toBeUndefined(); + expect(manager.getPeerInfo('unknown')).toBeUndefined(); + }); + + it('should emit peer:connected and peer:disconnected events', () => { + const connected = vi.fn(); + const disconnected = vi.fn(); + + manager.on('peer:connected', connected); + manager.on('peer:disconnected', disconnected); + + // Events should be callable (they'll fire when peers connect via HTTP) + expect(connected).not.toHaveBeenCalled(); + expect(disconnected).not.toHaveBeenCalled(); + }); + + it('should emit consensus message events', () => { + const propose = vi.fn(); + const vote = vi.fn(); + const commit = vi.fn(); + + manager.on('message:propose', propose); + manager.on('message:vote', vote); + manager.on('message:commit', commit); + + expect(propose).not.toHaveBeenCalled(); + }); + }); + + describe('router creation', () => { + it('should create an Express router with Braid endpoints', () => { + const router = manager.createRouter(); + expect(router).toBeDefined(); + + // Verify it can be mounted on an Express app + const app = express(); + app.use(router); + }); + }); + + describe('message handling', () => { + it('should handle consensus messages via HTTP POST', async () => { + const proposeFn = vi.fn(); + manager.on('message:propose', proposeFn); + + const app = express(); + app.use(express.json()); + app.use(manager.createRouter()); + + // Simulate a POST to /braid/consensus + const server = app.listen(0); + const addr = server.address() as any; + + try { + const response = await fetch(`http://localhost:${addr.port}/braid/consensus`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + type: 'PROPOSE', + nodeId: 'test-node-2', + timestamp: Date.now(), + payload: { tx: {}, nodeSignature: {} }, + }), + }); + + expect(response.ok).toBe(true); + expect(proposeFn).toHaveBeenCalledTimes(1); + } finally { + server.close(); + } + }); + + it('should reject messages without type', async () => { + const app = express(); + app.use(express.json()); + app.use(manager.createRouter()); + + const server = app.listen(0); + const addr = server.address() as any; + + try { + const response = await fetch(`http://localhost:${addr.port}/braid/consensus`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ nodeId: 'test' }), + }); + + expect(response.status).toBe(400); + } finally { + server.close(); + } + }); + }); + + describe('Braid subscription', () => { + it('should return peer info on GET /braid/peer-info', async () => { + const app = express(); + app.use(manager.createRouter()); + + const server = app.listen(0); + const addr = server.address() as any; + + try { + const response = await fetch(`http://localhost:${addr.port}/braid/peer-info`); + const data = await response.json() as any; + + expect(data.nodeId).toBe('test-node-1'); + expect(data.publicKey).toBe('abc123'); + } finally { + server.close(); + } + }); + + it('should require X-Node-Id header for sync subscription', async () => { + const app = express(); + app.use(manager.createRouter()); + + const server = app.listen(0); + const addr = server.address() as any; + + try { + const response = await fetch(`http://localhost:${addr.port}/braid/sync`, { + headers: { 'Subscribe': 'true' }, + }); + + expect(response.status).toBe(400); + } finally { + server.close(); + } + }); + }); +}); diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..72d78e8 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,20 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "lib": ["ES2022"], + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "test"] +}