From 33438d4d43b66485d2ef7350d231425e8cadbeb5 Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Sun, 15 Mar 2026 18:36:12 +0000 Subject: [PATCH] fix: Exchange public keys during Braid subscription handshake Peers now send X-Public-Key header when subscribing, so the receiving node can verify consensus signatures immediately without waiting for Redis discovery. Co-Authored-By: Claude Opus 4.6 --- src/transport/braid-peer-manager.ts | 66 ++++++++++++++--------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/src/transport/braid-peer-manager.ts b/src/transport/braid-peer-manager.ts index e2879a2..eb0685f 100644 --- a/src/transport/braid-peer-manager.ts +++ b/src/transport/braid-peer-manager.ts @@ -221,6 +221,7 @@ export class BraidPeerManager extends EventEmitter { private handleSyncSubscribe(req: Request, res: Response): void { const subscribe = req.headers['subscribe']; const nodeId = req.headers['x-node-id'] as string; + const publicKey = req.headers['x-public-key'] as string; if (!nodeId) { res.status(400).json({ error: 'Missing X-Node-Id header' }); @@ -241,6 +242,9 @@ export class BraidPeerManager extends EventEmitter { 'X-Node-Id': this.config.nodeId, }); + // Flush headers by writing an initial empty comment line + res.write('\r\n'); + // Track subscriber this.subscribers.set(nodeId, { nodeId, @@ -248,13 +252,23 @@ export class BraidPeerManager extends EventEmitter { connectedAt: Date.now(), }); - // Register as connected peer + // Register as connected peer and update knownPeers with public key + const peerInfo = this.knownPeers.get(nodeId); + const endpoint = peerInfo?.endpoint ?? `http://${nodeId}:${this.config.port}`; + + // Store/update peer info so getPeerInfo() returns the public key + if (publicKey && (!peerInfo || !peerInfo.publicKey || peerInfo.publicKey === '0x' + '0'.repeat(64))) { + this.knownPeers.set(nodeId, { + nodeId, + publicKey, + endpoint, + lastSeen: Date.now(), + }); + } + if (!this.peers.has(nodeId)) { - const peerInfo = this.knownPeers.get(nodeId); - if (peerInfo) { - this.peers.set(nodeId, new BraidPeer(nodeId, peerInfo.endpoint)); - this.emit('peer:connected', nodeId); - } + this.peers.set(nodeId, new BraidPeer(nodeId, endpoint)); + this.emit('peer:connected', nodeId); } // Trigger initial sync @@ -477,6 +491,8 @@ export class BraidPeerManager extends EventEmitter { async subscribeToPeer(peerInfo: BraidPeerInfo): Promise { if (this.outboundSubs.has(peerInfo.nodeId)) return; + logger.info({ nodeId: peerInfo.nodeId, endpoint: peerInfo.endpoint }, 'Attempting Braid subscription'); + const controller = new AbortController(); const endpoint = peerInfo.endpoint; @@ -493,6 +509,7 @@ export class BraidPeerManager extends EventEmitter { headers: { 'Subscribe': 'keep-alive', 'X-Node-Id': this.config.nodeId, + 'X-Public-Key': this.config.publicKey, 'Accept': 'application/octet-stream', }, signal: controller.signal, @@ -549,35 +566,18 @@ export class BraidPeerManager extends EventEmitter { /** * Parse and handle an incoming Braid update from a subscription stream. + * + * The Braid subscription stream is used for: + * - Peer liveness (keeping the connection open) + * - Simpleton JSON state updates (future) + * + * Automerge binary sync goes through HTTP POST (SYNC_RESPONSE messages), + * NOT through this text stream — binary data gets corrupted in text mode. */ private handleBraidUpdate(nodeId: string, chunk: string): void { - try { - // Extract headers and body from the Braid update - const headerEnd = chunk.indexOf('\r\n\r\n'); - if (headerEnd === -1) { - // Body might be right after headers with a single newline separation - const lines = chunk.split('\r\n'); - const bodyStartIdx = lines.findIndex(l => l === ''); - if (bodyStartIdx === -1) return; - - // Everything after the empty line is body - const bodyLines = lines.slice(bodyStartIdx + 1); - const body = bodyLines.join('\r\n'); - if (body.length > 0) { - const data = new Uint8Array(Buffer.from(body, 'binary')); - this.emit('message:sync_response', nodeId, data); - } - return; - } - - const body = chunk.slice(headerEnd + 4); - if (body.length > 0) { - const data = new Uint8Array(Buffer.from(body, 'binary')); - this.emit('message:sync_response', nodeId, data); - } - } catch (error) { - logger.error({ nodeId, error }, 'Failed to parse Braid update'); - } + // Currently the stream is used for liveness only. + // Automerge sync messages are exchanged via HTTP POST to /braid/consensus. + logger.debug({ nodeId, chunkLen: chunk.length }, 'Received Braid stream update'); } // --- Redis-based peer discovery (reused from WebSocket version) ---