fix: Exchange public keys during Braid subscription handshake
Peers now send X-Public-Key header when subscribing, so the receiving node can verify consensus signatures immediately without waiting for Redis discovery. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
3fedbc4774
commit
33438d4d43
|
|
@ -221,6 +221,7 @@ export class BraidPeerManager extends EventEmitter {
|
|||
private handleSyncSubscribe(req: Request, res: Response): void {
|
||||
const subscribe = req.headers['subscribe'];
|
||||
const nodeId = req.headers['x-node-id'] as string;
|
||||
const publicKey = req.headers['x-public-key'] as string;
|
||||
|
||||
if (!nodeId) {
|
||||
res.status(400).json({ error: 'Missing X-Node-Id header' });
|
||||
|
|
@ -241,6 +242,9 @@ export class BraidPeerManager extends EventEmitter {
|
|||
'X-Node-Id': this.config.nodeId,
|
||||
});
|
||||
|
||||
// Flush headers by writing an initial empty comment line
|
||||
res.write('\r\n');
|
||||
|
||||
// Track subscriber
|
||||
this.subscribers.set(nodeId, {
|
||||
nodeId,
|
||||
|
|
@ -248,13 +252,23 @@ export class BraidPeerManager extends EventEmitter {
|
|||
connectedAt: Date.now(),
|
||||
});
|
||||
|
||||
// Register as connected peer
|
||||
if (!this.peers.has(nodeId)) {
|
||||
// Register as connected peer and update knownPeers with public key
|
||||
const peerInfo = this.knownPeers.get(nodeId);
|
||||
if (peerInfo) {
|
||||
this.peers.set(nodeId, new BraidPeer(nodeId, peerInfo.endpoint));
|
||||
this.emit('peer:connected', nodeId);
|
||||
const endpoint = peerInfo?.endpoint ?? `http://${nodeId}:${this.config.port}`;
|
||||
|
||||
// Store/update peer info so getPeerInfo() returns the public key
|
||||
if (publicKey && (!peerInfo || !peerInfo.publicKey || peerInfo.publicKey === '0x' + '0'.repeat(64))) {
|
||||
this.knownPeers.set(nodeId, {
|
||||
nodeId,
|
||||
publicKey,
|
||||
endpoint,
|
||||
lastSeen: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
if (!this.peers.has(nodeId)) {
|
||||
this.peers.set(nodeId, new BraidPeer(nodeId, endpoint));
|
||||
this.emit('peer:connected', nodeId);
|
||||
}
|
||||
|
||||
// Trigger initial sync
|
||||
|
|
@ -477,6 +491,8 @@ export class BraidPeerManager extends EventEmitter {
|
|||
async subscribeToPeer(peerInfo: BraidPeerInfo): Promise<void> {
|
||||
if (this.outboundSubs.has(peerInfo.nodeId)) return;
|
||||
|
||||
logger.info({ nodeId: peerInfo.nodeId, endpoint: peerInfo.endpoint }, 'Attempting Braid subscription');
|
||||
|
||||
const controller = new AbortController();
|
||||
const endpoint = peerInfo.endpoint;
|
||||
|
||||
|
|
@ -493,6 +509,7 @@ export class BraidPeerManager extends EventEmitter {
|
|||
headers: {
|
||||
'Subscribe': 'keep-alive',
|
||||
'X-Node-Id': this.config.nodeId,
|
||||
'X-Public-Key': this.config.publicKey,
|
||||
'Accept': 'application/octet-stream',
|
||||
},
|
||||
signal: controller.signal,
|
||||
|
|
@ -549,35 +566,18 @@ export class BraidPeerManager extends EventEmitter {
|
|||
|
||||
/**
|
||||
* Parse and handle an incoming Braid update from a subscription stream.
|
||||
*
|
||||
* The Braid subscription stream is used for:
|
||||
* - Peer liveness (keeping the connection open)
|
||||
* - Simpleton JSON state updates (future)
|
||||
*
|
||||
* Automerge binary sync goes through HTTP POST (SYNC_RESPONSE messages),
|
||||
* NOT through this text stream — binary data gets corrupted in text mode.
|
||||
*/
|
||||
private handleBraidUpdate(nodeId: string, chunk: string): void {
|
||||
try {
|
||||
// Extract headers and body from the Braid update
|
||||
const headerEnd = chunk.indexOf('\r\n\r\n');
|
||||
if (headerEnd === -1) {
|
||||
// Body might be right after headers with a single newline separation
|
||||
const lines = chunk.split('\r\n');
|
||||
const bodyStartIdx = lines.findIndex(l => l === '');
|
||||
if (bodyStartIdx === -1) return;
|
||||
|
||||
// Everything after the empty line is body
|
||||
const bodyLines = lines.slice(bodyStartIdx + 1);
|
||||
const body = bodyLines.join('\r\n');
|
||||
if (body.length > 0) {
|
||||
const data = new Uint8Array(Buffer.from(body, 'binary'));
|
||||
this.emit('message:sync_response', nodeId, data);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const body = chunk.slice(headerEnd + 4);
|
||||
if (body.length > 0) {
|
||||
const data = new Uint8Array(Buffer.from(body, 'binary'));
|
||||
this.emit('message:sync_response', nodeId, data);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ nodeId, error }, 'Failed to parse Braid update');
|
||||
}
|
||||
// Currently the stream is used for liveness only.
|
||||
// Automerge sync messages are exchanged via HTTP POST to /braid/consensus.
|
||||
logger.debug({ nodeId, chunkLen: chunk.length }, 'Received Braid stream update');
|
||||
}
|
||||
|
||||
// --- Redis-based peer discovery (reused from WebSocket version) ---
|
||||
|
|
|
|||
Loading…
Reference in New Issue