/** * MI Agentic Loop — multi-turn streaming orchestrator. * * Streams LLM output, detects [MI_ACTION:{...}] markers, executes * server-side actions (media gen, data queries), and feeds results * back for the next LLM turn. Client-side actions (canvas shapes) * pass through unchanged in the stream. */ import type { MiProvider, MiMessage, MiStreamChunk } from "./mi-provider"; import { parseMiActions } from "../lib/mi-actions"; import type { MiAction } from "../lib/mi-actions"; import { generateImage, generateVideoViaFal } from "./mi-media"; import { queryModuleContent } from "./mi-data-queries"; export interface AgenticLoopOptions { messages: MiMessage[]; provider: MiProvider; providerModel: string; space: string; maxTurns?: number; } /** NDJSON line types emitted by the agentic loop. */ interface TurnLine { type: "turn"; turn: number; maxTurns: number } interface ActionStartLine { type: "action-start"; action: { type: string; ref?: string } } interface ActionResultLine { type: "action-result"; actionType: string; ok: boolean; url?: string; data?: any; summary?: string; error?: string; ref?: string; } interface MessageLine { message: { role: string; content: string }; done: boolean } type NDJSONLine = TurnLine | ActionStartLine | ActionResultLine | MessageLine; const SERVER_ACTION_TYPES = new Set(["generate-image", "generate-video", "query-content"]); function isServerAction(action: MiAction): boolean { return SERVER_ACTION_TYPES.has(action.type); } /** * Execute a single server-side MI action. */ async function executeServerAction( action: MiAction, space: string, ): Promise { const ref = "ref" in action ? (action as any).ref : undefined; switch (action.type) { case "generate-image": { const result = await generateImage(action.prompt, action.style); if (result.ok) { return { type: "action-result", actionType: "generate-image", ok: true, url: result.url, ref }; } return { type: "action-result", actionType: "generate-image", ok: false, error: result.error, ref }; } case "generate-video": { const result = await generateVideoViaFal(action.prompt, action.source_image); if (result.ok) { return { type: "action-result", actionType: "generate-video", ok: true, url: result.url, ref }; } return { type: "action-result", actionType: "generate-video", ok: false, error: result.error, ref }; } case "query-content": { const result = queryModuleContent(space, action.module, action.queryType, action.limit); return { type: "action-result", actionType: "query-content", ok: result.ok, data: result.data, summary: result.summary, ref, }; } default: return { type: "action-result", actionType: (action as any).type, ok: false, error: "Unknown server action", ref }; } } /** * Run the agentic loop. Returns a ReadableStream of NDJSON lines. * * Each turn: stream LLM → accumulate text → parse actions → * execute server-side actions → feed results back → next turn. * Client-side actions pass through in the text stream. */ export function runAgenticLoop(opts: AgenticLoopOptions): ReadableStream { const { messages, provider, providerModel, space, maxTurns = 5 } = opts; const encoder = new TextEncoder(); // Working copy of conversation const conversation = [...messages]; return new ReadableStream({ async start(controller) { try { for (let turn = 0; turn < maxTurns; turn++) { // Emit turn indicator (skip turn 0 for backwards compat) if (turn > 0) { emit(controller, encoder, { type: "turn", turn, maxTurns }); } // Stream LLM response let fullText = ""; const gen = provider.stream(conversation, providerModel); for await (const chunk of gen) { if (chunk.content) { fullText += chunk.content; // Stream text to client in real-time (Ollama NDJSON format) emit(controller, encoder, { message: { role: "assistant", content: chunk.content }, done: false, }); } if (chunk.done) break; } // Parse actions from accumulated text const { actions } = parseMiActions(fullText); const serverActions = actions.filter(isServerAction); // If no server-side actions, we're done if (serverActions.length === 0) { emit(controller, encoder, { message: { role: "assistant", content: "" }, done: true, }); controller.close(); return; } // Execute server-side actions const resultSummaries: string[] = []; for (const action of serverActions) { // Notify client that action is starting emit(controller, encoder, { type: "action-start", action: { type: action.type, ref: "ref" in action ? (action as any).ref : undefined }, }); const result = await executeServerAction(action, space); emit(controller, encoder, result); // Build summary for next LLM turn if (result.ok) { if (result.url) { resultSummaries.push(`[${result.actionType}] Generated: ${result.url}`); } else if (result.summary) { resultSummaries.push(`[${result.actionType}] ${result.summary}`); } } else { resultSummaries.push(`[${result.actionType}] Failed: ${result.error}`); } } // Add assistant message + action results to conversation for next turn conversation.push({ role: "assistant", content: fullText }); conversation.push({ role: "user", content: `Action results:\n${resultSummaries.join("\n")}\n\nIncorporate these results and continue. If you generated an image or video, you can now create a canvas shape referencing the URL. Do not re-generate media that already succeeded.`, }); } // Max turns reached emit(controller, encoder, { message: { role: "assistant", content: "" }, done: true, }); controller.close(); } catch (err: any) { // Emit error as a message so client can display it try { emit(controller, encoder, { message: { role: "assistant", content: `\n\n*Error: ${err.message}*` }, done: true, }); } catch { /* controller may be closed */ } controller.close(); } }, }); } function emit( controller: ReadableStreamDefaultController, encoder: TextEncoder, data: NDJSONLine, ): void { controller.enqueue(encoder.encode(JSON.stringify(data) + "\n")); }