rspace-online/server/mi-agent.ts

202 lines
6.3 KiB
TypeScript

/**
* MI Agentic Loop — multi-turn streaming orchestrator.
*
* Streams LLM output, detects [MI_ACTION:{...}] markers, executes
* server-side actions (media gen, data queries), and feeds results
* back for the next LLM turn. Client-side actions (canvas shapes)
* pass through unchanged in the stream.
*/
import type { MiProvider, MiMessage, MiStreamChunk } from "./mi-provider";
import { parseMiActions } from "../lib/mi-actions";
import type { MiAction } from "../lib/mi-actions";
import { generateImage, generateVideoViaFal } from "./mi-media";
import { queryModuleContent } from "./mi-data-queries";
export interface AgenticLoopOptions {
messages: MiMessage[];
provider: MiProvider;
providerModel: string;
space: string;
maxTurns?: number;
}
/** NDJSON line types emitted by the agentic loop. */
interface TurnLine { type: "turn"; turn: number; maxTurns: number }
interface ActionStartLine { type: "action-start"; action: { type: string; ref?: string } }
interface ActionResultLine {
type: "action-result";
actionType: string;
ok: boolean;
url?: string;
data?: any;
summary?: string;
error?: string;
ref?: string;
}
interface MessageLine { message: { role: string; content: string }; done: boolean }
type NDJSONLine = TurnLine | ActionStartLine | ActionResultLine | MessageLine;
const SERVER_ACTION_TYPES = new Set(["generate-image", "generate-video", "query-content"]);
function isServerAction(action: MiAction): boolean {
return SERVER_ACTION_TYPES.has(action.type);
}
/**
* Execute a single server-side MI action.
*/
async function executeServerAction(
action: MiAction,
space: string,
): Promise<ActionResultLine> {
const ref = "ref" in action ? (action as any).ref : undefined;
switch (action.type) {
case "generate-image": {
const result = await generateImage(action.prompt, action.style);
if (result.ok) {
return { type: "action-result", actionType: "generate-image", ok: true, url: result.url, ref };
}
return { type: "action-result", actionType: "generate-image", ok: false, error: result.error, ref };
}
case "generate-video": {
const result = await generateVideoViaFal(action.prompt, action.source_image);
if (result.ok) {
return { type: "action-result", actionType: "generate-video", ok: true, url: result.url, ref };
}
return { type: "action-result", actionType: "generate-video", ok: false, error: result.error, ref };
}
case "query-content": {
const result = queryModuleContent(space, action.module, action.queryType, action.limit);
return {
type: "action-result",
actionType: "query-content",
ok: result.ok,
data: result.data,
summary: result.summary,
ref,
};
}
default:
return { type: "action-result", actionType: (action as any).type, ok: false, error: "Unknown server action", ref };
}
}
/**
* Run the agentic loop. Returns a ReadableStream of NDJSON lines.
*
* Each turn: stream LLM → accumulate text → parse actions →
* execute server-side actions → feed results back → next turn.
* Client-side actions pass through in the text stream.
*/
export function runAgenticLoop(opts: AgenticLoopOptions): ReadableStream {
const { messages, provider, providerModel, space, maxTurns = 5 } = opts;
const encoder = new TextEncoder();
// Working copy of conversation
const conversation = [...messages];
return new ReadableStream({
async start(controller) {
try {
for (let turn = 0; turn < maxTurns; turn++) {
// Emit turn indicator (skip turn 0 for backwards compat)
if (turn > 0) {
emit(controller, encoder, { type: "turn", turn, maxTurns });
}
// Stream LLM response
let fullText = "";
const gen = provider.stream(conversation, providerModel);
for await (const chunk of gen) {
if (chunk.content) {
fullText += chunk.content;
// Stream text to client in real-time (Ollama NDJSON format)
emit(controller, encoder, {
message: { role: "assistant", content: chunk.content },
done: false,
});
}
if (chunk.done) break;
}
// Parse actions from accumulated text
const { actions } = parseMiActions(fullText);
const serverActions = actions.filter(isServerAction);
// If no server-side actions, we're done
if (serverActions.length === 0) {
emit(controller, encoder, {
message: { role: "assistant", content: "" },
done: true,
});
controller.close();
return;
}
// Execute server-side actions
const resultSummaries: string[] = [];
for (const action of serverActions) {
// Notify client that action is starting
emit(controller, encoder, {
type: "action-start",
action: { type: action.type, ref: "ref" in action ? (action as any).ref : undefined },
});
const result = await executeServerAction(action, space);
emit(controller, encoder, result);
// Build summary for next LLM turn
if (result.ok) {
if (result.url) {
resultSummaries.push(`[${result.actionType}] Generated: ${result.url}`);
} else if (result.summary) {
resultSummaries.push(`[${result.actionType}] ${result.summary}`);
}
} else {
resultSummaries.push(`[${result.actionType}] Failed: ${result.error}`);
}
}
// Add assistant message + action results to conversation for next turn
conversation.push({ role: "assistant", content: fullText });
conversation.push({
role: "user",
content: `Action results:\n${resultSummaries.join("\n")}\n\nIncorporate these results and continue. If you generated an image or video, you can now create a canvas shape referencing the URL. Do not re-generate media that already succeeded.`,
});
}
// Max turns reached
emit(controller, encoder, {
message: { role: "assistant", content: "" },
done: true,
});
controller.close();
} catch (err: any) {
// Emit error as a message so client can display it
try {
emit(controller, encoder, {
message: { role: "assistant", content: `\n\n*Error: ${err.message}*` },
done: true,
});
} catch { /* controller may be closed */ }
controller.close();
}
},
});
}
function emit(
controller: ReadableStreamDefaultController,
encoder: TextEncoder,
data: NDJSONLine,
): void {
controller.enqueue(encoder.encode(JSON.stringify(data) + "\n"));
}