202 lines
6.3 KiB
TypeScript
202 lines
6.3 KiB
TypeScript
/**
|
|
* MI Agentic Loop — multi-turn streaming orchestrator.
|
|
*
|
|
* Streams LLM output, detects [MI_ACTION:{...}] markers, executes
|
|
* server-side actions (media gen, data queries), and feeds results
|
|
* back for the next LLM turn. Client-side actions (canvas shapes)
|
|
* pass through unchanged in the stream.
|
|
*/
|
|
|
|
import type { MiProvider, MiMessage, MiStreamChunk } from "./mi-provider";
|
|
import { parseMiActions } from "../lib/mi-actions";
|
|
import type { MiAction } from "../lib/mi-actions";
|
|
import { generateImage, generateVideoViaFal } from "./mi-media";
|
|
import { queryModuleContent } from "./mi-data-queries";
|
|
|
|
export interface AgenticLoopOptions {
|
|
messages: MiMessage[];
|
|
provider: MiProvider;
|
|
providerModel: string;
|
|
space: string;
|
|
maxTurns?: number;
|
|
}
|
|
|
|
/** NDJSON line types emitted by the agentic loop. */
|
|
interface TurnLine { type: "turn"; turn: number; maxTurns: number }
|
|
interface ActionStartLine { type: "action-start"; action: { type: string; ref?: string } }
|
|
interface ActionResultLine {
|
|
type: "action-result";
|
|
actionType: string;
|
|
ok: boolean;
|
|
url?: string;
|
|
data?: any;
|
|
summary?: string;
|
|
error?: string;
|
|
ref?: string;
|
|
}
|
|
interface MessageLine { message: { role: string; content: string }; done: boolean }
|
|
|
|
type NDJSONLine = TurnLine | ActionStartLine | ActionResultLine | MessageLine;
|
|
|
|
const SERVER_ACTION_TYPES = new Set(["generate-image", "generate-video", "query-content"]);
|
|
|
|
function isServerAction(action: MiAction): boolean {
|
|
return SERVER_ACTION_TYPES.has(action.type);
|
|
}
|
|
|
|
/**
|
|
* Execute a single server-side MI action.
|
|
*/
|
|
async function executeServerAction(
|
|
action: MiAction,
|
|
space: string,
|
|
): Promise<ActionResultLine> {
|
|
const ref = "ref" in action ? (action as any).ref : undefined;
|
|
|
|
switch (action.type) {
|
|
case "generate-image": {
|
|
const result = await generateImage(action.prompt, action.style);
|
|
if (result.ok) {
|
|
return { type: "action-result", actionType: "generate-image", ok: true, url: result.url, ref };
|
|
}
|
|
return { type: "action-result", actionType: "generate-image", ok: false, error: result.error, ref };
|
|
}
|
|
|
|
case "generate-video": {
|
|
const result = await generateVideoViaFal(action.prompt, action.source_image);
|
|
if (result.ok) {
|
|
return { type: "action-result", actionType: "generate-video", ok: true, url: result.url, ref };
|
|
}
|
|
return { type: "action-result", actionType: "generate-video", ok: false, error: result.error, ref };
|
|
}
|
|
|
|
case "query-content": {
|
|
const result = queryModuleContent(space, action.module, action.queryType, action.limit);
|
|
return {
|
|
type: "action-result",
|
|
actionType: "query-content",
|
|
ok: result.ok,
|
|
data: result.data,
|
|
summary: result.summary,
|
|
ref,
|
|
};
|
|
}
|
|
|
|
default:
|
|
return { type: "action-result", actionType: (action as any).type, ok: false, error: "Unknown server action", ref };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Run the agentic loop. Returns a ReadableStream of NDJSON lines.
|
|
*
|
|
* Each turn: stream LLM → accumulate text → parse actions →
|
|
* execute server-side actions → feed results back → next turn.
|
|
* Client-side actions pass through in the text stream.
|
|
*/
|
|
export function runAgenticLoop(opts: AgenticLoopOptions): ReadableStream {
|
|
const { messages, provider, providerModel, space, maxTurns = 5 } = opts;
|
|
const encoder = new TextEncoder();
|
|
|
|
// Working copy of conversation
|
|
const conversation = [...messages];
|
|
|
|
return new ReadableStream({
|
|
async start(controller) {
|
|
try {
|
|
for (let turn = 0; turn < maxTurns; turn++) {
|
|
// Emit turn indicator (skip turn 0 for backwards compat)
|
|
if (turn > 0) {
|
|
emit(controller, encoder, { type: "turn", turn, maxTurns });
|
|
}
|
|
|
|
// Stream LLM response
|
|
let fullText = "";
|
|
const gen = provider.stream(conversation, providerModel);
|
|
|
|
for await (const chunk of gen) {
|
|
if (chunk.content) {
|
|
fullText += chunk.content;
|
|
// Stream text to client in real-time (Ollama NDJSON format)
|
|
emit(controller, encoder, {
|
|
message: { role: "assistant", content: chunk.content },
|
|
done: false,
|
|
});
|
|
}
|
|
if (chunk.done) break;
|
|
}
|
|
|
|
// Parse actions from accumulated text
|
|
const { actions } = parseMiActions(fullText);
|
|
const serverActions = actions.filter(isServerAction);
|
|
|
|
// If no server-side actions, we're done
|
|
if (serverActions.length === 0) {
|
|
emit(controller, encoder, {
|
|
message: { role: "assistant", content: "" },
|
|
done: true,
|
|
});
|
|
controller.close();
|
|
return;
|
|
}
|
|
|
|
// Execute server-side actions
|
|
const resultSummaries: string[] = [];
|
|
for (const action of serverActions) {
|
|
// Notify client that action is starting
|
|
emit(controller, encoder, {
|
|
type: "action-start",
|
|
action: { type: action.type, ref: "ref" in action ? (action as any).ref : undefined },
|
|
});
|
|
|
|
const result = await executeServerAction(action, space);
|
|
emit(controller, encoder, result);
|
|
|
|
// Build summary for next LLM turn
|
|
if (result.ok) {
|
|
if (result.url) {
|
|
resultSummaries.push(`[${result.actionType}] Generated: ${result.url}`);
|
|
} else if (result.summary) {
|
|
resultSummaries.push(`[${result.actionType}] ${result.summary}`);
|
|
}
|
|
} else {
|
|
resultSummaries.push(`[${result.actionType}] Failed: ${result.error}`);
|
|
}
|
|
}
|
|
|
|
// Add assistant message + action results to conversation for next turn
|
|
conversation.push({ role: "assistant", content: fullText });
|
|
conversation.push({
|
|
role: "user",
|
|
content: `Action results:\n${resultSummaries.join("\n")}\n\nIncorporate these results and continue. If you generated an image or video, you can now create a canvas shape referencing the URL. Do not re-generate media that already succeeded.`,
|
|
});
|
|
}
|
|
|
|
// Max turns reached
|
|
emit(controller, encoder, {
|
|
message: { role: "assistant", content: "" },
|
|
done: true,
|
|
});
|
|
controller.close();
|
|
} catch (err: any) {
|
|
// Emit error as a message so client can display it
|
|
try {
|
|
emit(controller, encoder, {
|
|
message: { role: "assistant", content: `\n\n*Error: ${err.message}*` },
|
|
done: true,
|
|
});
|
|
} catch { /* controller may be closed */ }
|
|
controller.close();
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
function emit(
|
|
controller: ReadableStreamDefaultController,
|
|
encoder: TextEncoder,
|
|
data: NDJSONLine,
|
|
): void {
|
|
controller.enqueue(encoder.encode(JSON.stringify(data) + "\n"));
|
|
}
|