rspace-online/server/space-memory.ts

152 lines
4.3 KiB
TypeScript

/**
* Per-Space Conversation Memory — persists MI turns to disk per space.
*
* Lets MI reference past interactions via trigram-ranked retrieval.
* Storage: /data/files/space-knowledge/{space}/memory.json
*/
import { readFile, writeFile, mkdir } from "node:fs/promises";
import { join } from "node:path";
import { trigrams, jaccardSimilarity } from "./mi-trigrams";
interface ConversationTurn {
query: string;
response: string; // first 500 chars
timestamp: number;
}
interface MemoryFile {
turns: ConversationTurn[];
lastWrittenAt: string;
}
const DATA_ROOT = "/data/files/space-knowledge";
const MAX_TURNS = 50;
const FLUSH_DELAY = 10_000; // 10s debounce
class SpaceMemory {
#pending = new Map<string, ConversationTurn[]>();
#timers = new Map<string, ReturnType<typeof setTimeout>>();
/** Get trigram-ranked past turns relevant to current query. */
async getRelevantTurns(space: string, query: string, topN = 3): Promise<string> {
const file = await this.#load(space);
if (!file || file.turns.length === 0) return "";
const queryTri = trigrams(query);
const scored = file.turns
.map((turn) => {
const text = `${turn.query} ${turn.response}`;
const sim = jaccardSimilarity(queryTri, trigrams(text));
return { turn, score: sim };
})
.filter((s) => s.score > 0.05)
.sort((a, b) => b.score - a.score)
.slice(0, topN);
if (scored.length === 0) return "";
const lines = scored.map((s) => {
const date = new Date(s.turn.timestamp).toISOString().split("T")[0];
const shortA = s.turn.response.slice(0, 120).replace(/\n/g, " ");
return ` - [${date}] Q: ${s.turn.query.slice(0, 80)} -> A: ${shortA}`;
});
return `\n- Past relevant conversations:\n${lines.join("\n")}`;
}
/** Fire-and-forget append. Debounced 10s flush to disk. */
appendTurn(space: string, query: string, response: string): void {
const turn: ConversationTurn = {
query: query.slice(0, 300),
response: response.slice(0, 500),
timestamp: Date.now(),
};
if (!this.#pending.has(space)) this.#pending.set(space, []);
this.#pending.get(space)!.push(turn);
// Debounce flush
if (this.#timers.has(space)) clearTimeout(this.#timers.get(space)!);
this.#timers.set(space, setTimeout(() => this.#flush(space), FLUSH_DELAY));
}
async #flush(space: string): Promise<void> {
const pending = this.#pending.get(space);
if (!pending || pending.length === 0) return;
this.#pending.delete(space);
this.#timers.delete(space);
try {
const file = (await this.#load(space)) || { turns: [], lastWrittenAt: "" };
file.turns.push(...pending);
// FIFO eviction
if (file.turns.length > MAX_TURNS) {
file.turns = file.turns.slice(-MAX_TURNS);
}
file.lastWrittenAt = new Date().toISOString();
const dir = join(DATA_ROOT, space);
await mkdir(dir, { recursive: true });
await writeFile(join(dir, "memory.json"), JSON.stringify(file, null, 2));
} catch (e: any) {
console.error(`[space-memory] flush failed for ${space}:`, e.message);
}
}
async #load(space: string): Promise<MemoryFile | null> {
try {
const raw = await readFile(join(DATA_ROOT, space, "memory.json"), "utf-8");
return JSON.parse(raw);
} catch {
return null;
}
}
}
export const spaceMemory = new SpaceMemory();
/**
* Wrap a ReadableStream to capture assistant response text for memory.
* Parses NDJSON lines, accumulates content from message chunks.
*/
export function streamWithMemoryCapture(
source: ReadableStream,
space: string,
query: string,
): ReadableStream {
let accumulated = "";
const decoder = new TextDecoder();
return source.pipeThrough(
new TransformStream({
transform(chunk, controller) {
// Pass through unchanged
controller.enqueue(chunk);
// Try to extract text content from NDJSON lines
try {
const text = decoder.decode(chunk, { stream: true });
for (const line of text.split("\n")) {
if (!line.trim()) continue;
const parsed = JSON.parse(line);
if (parsed.message?.role === "assistant" && parsed.message.content) {
accumulated += parsed.message.content;
}
}
} catch {
// Non-JSON chunk or partial line — ignore
}
},
flush() {
// Stream complete — save to memory
if (accumulated.length > 10) {
spaceMemory.appendTurn(space, query, accumulated);
}
},
}),
);
}