152 lines
4.3 KiB
TypeScript
152 lines
4.3 KiB
TypeScript
/**
|
|
* Per-Space Conversation Memory — persists MI turns to disk per space.
|
|
*
|
|
* Lets MI reference past interactions via trigram-ranked retrieval.
|
|
* Storage: /data/files/space-knowledge/{space}/memory.json
|
|
*/
|
|
|
|
import { readFile, writeFile, mkdir } from "node:fs/promises";
|
|
import { join } from "node:path";
|
|
import { trigrams, jaccardSimilarity } from "./mi-trigrams";
|
|
|
|
interface ConversationTurn {
|
|
query: string;
|
|
response: string; // first 500 chars
|
|
timestamp: number;
|
|
}
|
|
|
|
interface MemoryFile {
|
|
turns: ConversationTurn[];
|
|
lastWrittenAt: string;
|
|
}
|
|
|
|
const DATA_ROOT = "/data/files/space-knowledge";
|
|
const MAX_TURNS = 50;
|
|
const FLUSH_DELAY = 10_000; // 10s debounce
|
|
|
|
class SpaceMemory {
|
|
#pending = new Map<string, ConversationTurn[]>();
|
|
#timers = new Map<string, ReturnType<typeof setTimeout>>();
|
|
|
|
/** Get trigram-ranked past turns relevant to current query. */
|
|
async getRelevantTurns(space: string, query: string, topN = 3): Promise<string> {
|
|
const file = await this.#load(space);
|
|
if (!file || file.turns.length === 0) return "";
|
|
|
|
const queryTri = trigrams(query);
|
|
const scored = file.turns
|
|
.map((turn) => {
|
|
const text = `${turn.query} ${turn.response}`;
|
|
const sim = jaccardSimilarity(queryTri, trigrams(text));
|
|
return { turn, score: sim };
|
|
})
|
|
.filter((s) => s.score > 0.05)
|
|
.sort((a, b) => b.score - a.score)
|
|
.slice(0, topN);
|
|
|
|
if (scored.length === 0) return "";
|
|
|
|
const lines = scored.map((s) => {
|
|
const date = new Date(s.turn.timestamp).toISOString().split("T")[0];
|
|
const shortA = s.turn.response.slice(0, 120).replace(/\n/g, " ");
|
|
return ` - [${date}] Q: ${s.turn.query.slice(0, 80)} -> A: ${shortA}`;
|
|
});
|
|
|
|
return `\n- Past relevant conversations:\n${lines.join("\n")}`;
|
|
}
|
|
|
|
/** Fire-and-forget append. Debounced 10s flush to disk. */
|
|
appendTurn(space: string, query: string, response: string): void {
|
|
const turn: ConversationTurn = {
|
|
query: query.slice(0, 300),
|
|
response: response.slice(0, 500),
|
|
timestamp: Date.now(),
|
|
};
|
|
|
|
if (!this.#pending.has(space)) this.#pending.set(space, []);
|
|
this.#pending.get(space)!.push(turn);
|
|
|
|
// Debounce flush
|
|
if (this.#timers.has(space)) clearTimeout(this.#timers.get(space)!);
|
|
this.#timers.set(space, setTimeout(() => this.#flush(space), FLUSH_DELAY));
|
|
}
|
|
|
|
async #flush(space: string): Promise<void> {
|
|
const pending = this.#pending.get(space);
|
|
if (!pending || pending.length === 0) return;
|
|
this.#pending.delete(space);
|
|
this.#timers.delete(space);
|
|
|
|
try {
|
|
const file = (await this.#load(space)) || { turns: [], lastWrittenAt: "" };
|
|
file.turns.push(...pending);
|
|
|
|
// FIFO eviction
|
|
if (file.turns.length > MAX_TURNS) {
|
|
file.turns = file.turns.slice(-MAX_TURNS);
|
|
}
|
|
|
|
file.lastWrittenAt = new Date().toISOString();
|
|
|
|
const dir = join(DATA_ROOT, space);
|
|
await mkdir(dir, { recursive: true });
|
|
await writeFile(join(dir, "memory.json"), JSON.stringify(file, null, 2));
|
|
} catch (e: any) {
|
|
console.error(`[space-memory] flush failed for ${space}:`, e.message);
|
|
}
|
|
}
|
|
|
|
async #load(space: string): Promise<MemoryFile | null> {
|
|
try {
|
|
const raw = await readFile(join(DATA_ROOT, space, "memory.json"), "utf-8");
|
|
return JSON.parse(raw);
|
|
} catch {
|
|
return null;
|
|
}
|
|
}
|
|
}
|
|
|
|
export const spaceMemory = new SpaceMemory();
|
|
|
|
/**
|
|
* Wrap a ReadableStream to capture assistant response text for memory.
|
|
* Parses NDJSON lines, accumulates content from message chunks.
|
|
*/
|
|
export function streamWithMemoryCapture(
|
|
source: ReadableStream,
|
|
space: string,
|
|
query: string,
|
|
): ReadableStream {
|
|
let accumulated = "";
|
|
const decoder = new TextDecoder();
|
|
|
|
return source.pipeThrough(
|
|
new TransformStream({
|
|
transform(chunk, controller) {
|
|
// Pass through unchanged
|
|
controller.enqueue(chunk);
|
|
|
|
// Try to extract text content from NDJSON lines
|
|
try {
|
|
const text = decoder.decode(chunk, { stream: true });
|
|
for (const line of text.split("\n")) {
|
|
if (!line.trim()) continue;
|
|
const parsed = JSON.parse(line);
|
|
if (parsed.message?.role === "assistant" && parsed.message.content) {
|
|
accumulated += parsed.message.content;
|
|
}
|
|
}
|
|
} catch {
|
|
// Non-JSON chunk or partial line — ignore
|
|
}
|
|
},
|
|
flush() {
|
|
// Stream complete — save to memory
|
|
if (accumulated.length > 10) {
|
|
spaceMemory.appendTurn(space, query, accumulated);
|
|
}
|
|
},
|
|
}),
|
|
);
|
|
}
|