/** * Per-Space Conversation Memory — persists MI turns to disk per space. * * Lets MI reference past interactions via trigram-ranked retrieval. * Storage: /data/files/space-knowledge/{space}/memory.json */ import { readFile, writeFile, mkdir } from "node:fs/promises"; import { join } from "node:path"; import { trigrams, jaccardSimilarity } from "./mi-trigrams"; interface ConversationTurn { query: string; response: string; // first 500 chars timestamp: number; } interface MemoryFile { turns: ConversationTurn[]; lastWrittenAt: string; } const DATA_ROOT = "/data/files/space-knowledge"; const MAX_TURNS = 50; const FLUSH_DELAY = 10_000; // 10s debounce class SpaceMemory { #pending = new Map(); #timers = new Map>(); /** Get trigram-ranked past turns relevant to current query. */ async getRelevantTurns(space: string, query: string, topN = 3): Promise { const file = await this.#load(space); if (!file || file.turns.length === 0) return ""; const queryTri = trigrams(query); const scored = file.turns .map((turn) => { const text = `${turn.query} ${turn.response}`; const sim = jaccardSimilarity(queryTri, trigrams(text)); return { turn, score: sim }; }) .filter((s) => s.score > 0.05) .sort((a, b) => b.score - a.score) .slice(0, topN); if (scored.length === 0) return ""; const lines = scored.map((s) => { const date = new Date(s.turn.timestamp).toISOString().split("T")[0]; const shortA = s.turn.response.slice(0, 120).replace(/\n/g, " "); return ` - [${date}] Q: ${s.turn.query.slice(0, 80)} -> A: ${shortA}`; }); return `\n- Past relevant conversations:\n${lines.join("\n")}`; } /** Fire-and-forget append. Debounced 10s flush to disk. */ appendTurn(space: string, query: string, response: string): void { const turn: ConversationTurn = { query: query.slice(0, 300), response: response.slice(0, 500), timestamp: Date.now(), }; if (!this.#pending.has(space)) this.#pending.set(space, []); this.#pending.get(space)!.push(turn); // Debounce flush if (this.#timers.has(space)) clearTimeout(this.#timers.get(space)!); this.#timers.set(space, setTimeout(() => this.#flush(space), FLUSH_DELAY)); } async #flush(space: string): Promise { const pending = this.#pending.get(space); if (!pending || pending.length === 0) return; this.#pending.delete(space); this.#timers.delete(space); try { const file = (await this.#load(space)) || { turns: [], lastWrittenAt: "" }; file.turns.push(...pending); // FIFO eviction if (file.turns.length > MAX_TURNS) { file.turns = file.turns.slice(-MAX_TURNS); } file.lastWrittenAt = new Date().toISOString(); const dir = join(DATA_ROOT, space); await mkdir(dir, { recursive: true }); await writeFile(join(dir, "memory.json"), JSON.stringify(file, null, 2)); } catch (e: any) { console.error(`[space-memory] flush failed for ${space}:`, e.message); } } async #load(space: string): Promise { try { const raw = await readFile(join(DATA_ROOT, space, "memory.json"), "utf-8"); return JSON.parse(raw); } catch { return null; } } } export const spaceMemory = new SpaceMemory(); /** * Wrap a ReadableStream to capture assistant response text for memory. * Parses NDJSON lines, accumulates content from message chunks. */ export function streamWithMemoryCapture( source: ReadableStream, space: string, query: string, ): ReadableStream { let accumulated = ""; const decoder = new TextDecoder(); return source.pipeThrough( new TransformStream({ transform(chunk, controller) { // Pass through unchanged controller.enqueue(chunk); // Try to extract text content from NDJSON lines try { const text = decoder.decode(chunk, { stream: true }); for (const line of text.split("\n")) { if (!line.trim()) continue; const parsed = JSON.parse(line); if (parsed.message?.role === "assistant" && parsed.message.content) { accumulated += parsed.message.content; } } } catch { // Non-JSON chunk or partial line — ignore } }, flush() { // Stream complete — save to memory if (accumulated.length > 10) { spaceMemory.appendTurn(space, query, accumulated); } }, }), ); }