From 9fd3ca931cd68968b6a8f7f87d0960798c25cb5d Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Wed, 1 Apr 2026 12:36:23 -0700 Subject: [PATCH] Harden rinbox agent mailbox pipeline: loop detection, rate limits, envelope fix - Fix executeApproval SMTP envelope to authenticate as SMTP_USER (Mailcow sender mismatch) - Add reply loop detection: skip auto-replies, noreply, mailer-daemon, postmaster senders - Per-sender rate limit: 3 replies/hr per sender per agent mailbox - Daily send cap: 50 replies/day per agent mailbox - Reply length cap: truncate agent replies at 2000 chars - Bootstrap existing spaces on init: provision missing team inbox + agent mailbox docs Co-Authored-By: Claude Opus 4.6 --- modules/rinbox/mod.ts | 171 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 2 deletions(-) diff --git a/modules/rinbox/mod.ts b/modules/rinbox/mod.ts index 60938ba..128a2d9 100644 --- a/modules/rinbox/mod.ts +++ b/modules/rinbox/mod.ts @@ -1727,8 +1727,7 @@ async function syncAgentMailbox(creds: AgentImapCreds) { try { const parsed = await simpleParser(msg.source); - const threadId = generateId(); - const fromAddr = parsed.from?.value?.[0]?.address || ''; + const fromAddr = (parsed.from?.value?.[0]?.address || '').toLowerCase(); const fromName = parsed.from?.value?.[0]?.name || ''; const subject = parsed.subject || '(no subject)'; const messageId = parsed.messageId || null; @@ -1737,6 +1736,56 @@ async function syncAgentMailbox(creds: AgentImapCreds) { const toAddrs = parsed.to?.value?.map((a: any) => a.address).filter(Boolean) || []; const ccAddrs = parsed.cc?.value?.map((a: any) => a.address).filter(Boolean) || []; + // ── Reply loop detection ── + const isAutoReply = + fromAddr.endsWith('-agent@rspace.online') || + fromAddr.startsWith('noreply@') || + fromAddr.startsWith('no-reply@') || + fromAddr.startsWith('mailer-daemon@') || + fromAddr.startsWith('postmaster@') || + /^(auto[_-]?reply|out[_-]?of[_-]?office)/i.test(subject) || + parsed.headers?.get('auto-submitted')?.toString() !== undefined && + parsed.headers?.get('auto-submitted')?.toString() !== 'no' || + parsed.headers?.get('x-auto-response-suppress') !== undefined; + + if (isAutoReply) { + // Store as skipped thread but don't trigger MI processing + const skipThreadId = generateId(); + _syncServer!.changeDoc(agentDoc.docId, `Agent IMAP (skipped auto-reply): ${subject}`, (d) => { + d.threads[skipThreadId] = { + id: skipThreadId, + mailboxId: d.mailbox.id, + messageId, + subject, + fromAddress: fromAddr, + fromName, + toAddresses: toAddrs, + ccAddresses: ccAddrs, + bodyText: parsed.text || '', + bodyHtml: parsed.html || '', + tags: ['agent-inbound-skipped'], + status: 'closed', + isRead: true, + isStarred: false, + assignedTo: null, + hasAttachments: parsed.attachments?.length > 0 || false, + receivedAt: parsed.date?.getTime() || Date.now(), + createdAt: Date.now(), + comments: [], + inReplyTo: inReplyTo || null, + references, + direction: 'inbound', + parentThreadId: null, + }; + }); + console.log(`[Inbox] Agent ${creds.email} skipped auto-reply from ${fromAddr}: "${subject}"`); + if (msg.uid > maxUid) maxUid = msg.uid; + count++; + continue; + } + + const threadId = generateId(); + _syncServer!.changeDoc(agentDoc.docId, `Agent IMAP: ${subject}`, (d) => { d.threads[threadId] = { id: threadId, @@ -1802,6 +1851,16 @@ async function syncAllAgentMailboxes() { } } +// ── Agent reply rate limiting ── +const _agentReplyTracker = new Map(); +const AGENT_RATE_LIMIT = 3; // max replies per sender per window +const AGENT_RATE_WINDOW = 3600000; // 1 hour + +const _agentDailySends = new Map(); +const AGENT_DAILY_CAP = 50; // max replies per agent mailbox per day + +const AGENT_REPLY_MAX_LENGTH = 2000; // max characters in agent reply + /** * Process an inbound agent email through MI agentic loop. * Creates an auto-approved reply and sends it via SMTP. @@ -1817,6 +1876,44 @@ async function processAgentMI(docId: string, threadId: string, spaceSlug: string const agent = Object.values(doc.agentInboxes)[0]; if (!agent?.autoReply) return; + // ── Per-sender rate limit ── + const senderKey = `${spaceSlug}:${thread.fromAddress!.toLowerCase()}`; + const now = Date.now(); + const tracker = _agentReplyTracker.get(senderKey); + if (tracker) { + if (now - tracker.windowStart < AGENT_RATE_WINDOW) { + if (tracker.count >= AGENT_RATE_LIMIT) { + console.warn(`[Inbox] Rate limit: ${thread.fromAddress} exceeded ${AGENT_RATE_LIMIT} replies/hr for ${spaceSlug}`); + return; + } + tracker.count++; + } else { + tracker.count = 1; + tracker.windowStart = now; + } + } else { + _agentReplyTracker.set(senderKey, { count: 1, windowStart: now }); + } + + // ── Daily send cap per agent mailbox ── + const today = new Date().toISOString().slice(0, 10); + const dailyKey = spaceSlug; + const daily = _agentDailySends.get(dailyKey); + if (daily) { + if (daily.day === today) { + if (daily.count >= AGENT_DAILY_CAP) { + console.warn(`[Inbox] Daily cap: ${spaceSlug}-agent hit ${AGENT_DAILY_CAP} replies for ${today}`); + return; + } + daily.count++; + } else { + daily.count = 1; + daily.day = today; + } + } else { + _agentDailySends.set(dailyKey, { count: 1, day: today }); + } + try { const { miRegistry } = await import('../../server/mi-provider'); const { runAgenticLoop } = await import('../../server/mi-agent'); @@ -1860,6 +1957,11 @@ async function processAgentMI(docId: string, threadId: string, spaceSlug: string if (!replyText.trim()) return; + // ── Reply length cap ── + if (replyText.length > AGENT_REPLY_MAX_LENGTH) { + replyText = replyText.slice(0, AGENT_REPLY_MAX_LENGTH) + '...[truncated]'; + } + // Create auto-approved approval and execute it const approvalId = generateId(); const replySubject = thread.subject.startsWith('Re:') ? thread.subject : `Re: ${thread.subject}`; @@ -2173,6 +2275,69 @@ function seedTemplateInbox(space: string) { console.log(`[Inbox] Template seeded for "${space}": 1 mailbox (${space}-inbox), 3 threads`); } +/** + * Bootstrap mailboxes for spaces created before rinbox existed. + * Iterates all space docs and ensures each has team inbox + agent mailbox. + */ +async function bootstrapExistingSpaces() { + if (!_syncServer) return; + + try { + // Collect all known space slugs from existing inbox docs + const existingSlugs = new Set(); + const existingAgentSlugs = new Set(); + for (const id of _syncServer.listDocs()) { + const parts = id.split(':'); + if (parts.length >= 4 && parts[1] === 'inbox' && parts[2] === 'mailboxes') { + const doc = _syncServer.getDoc(id); + if (doc) { + if (doc.mailbox.slug.endsWith('-agent')) { + existingAgentSlugs.add(parts[0]); + } else { + existingSlugs.add(parts[0]); + } + } + } + } + + // Find all space slugs from ANY doc on the SyncServer (spaces module stores space docs) + const allSpaceSlugs = new Set(); + for (const id of _syncServer.listDocs()) { + const parts = id.split(':'); + // Space docs follow pattern: {space}:spaces:... or just use the first segment + if (parts.length >= 2 && parts[0] !== 'global') { + allSpaceSlugs.add(parts[0]); + } + } + + let bootstrapped = 0; + for (const slug of allSpaceSlugs) { + if (slug === 'demo' || slug === 'template') continue; // skip demo/template + + if (!existingSlugs.has(slug)) { + initSpaceInbox(slug, 'did:bootstrap'); + // Provision Mailcow alias + fetch(`${ENCRYPTID_INTERNAL}/api/internal/spaces/${slug}/alias`, { method: 'POST' }).catch(() => {}); + bootstrapped++; + } + if (!existingAgentSlugs.has(slug)) { + initAgentMailbox(slug, 'did:bootstrap'); + // Provision Mailcow agent mailbox + fetch(`${ENCRYPTID_INTERNAL}/api/internal/spaces/${slug}/agent-mailbox`, { method: 'POST' }).catch(() => {}); + bootstrapped++; + } + } + + if (bootstrapped > 0) { + console.log(`[Inbox] Bootstrap: provisioned ${bootstrapped} missing mailboxes across ${allSpaceSlugs.size} spaces`); + } else { + console.log(`[Inbox] Bootstrap: all ${allSpaceSlugs.size} spaces already have mailboxes`); + } + } catch (e: any) { + console.error(`[Inbox] Bootstrap error:`, e.message); + } +} + export const inboxModule: RSpaceModule = { id: "rinbox", name: "rInbox", @@ -2188,6 +2353,8 @@ export const inboxModule: RSpaceModule = { console.log("[Inbox] Module initialized (Automerge-only, no PG)"); // Pre-warm SMTP transport if (SMTP_USER) getSmtpTransport().catch(() => {}); + // Bootstrap mailboxes for spaces created before rinbox existed + setTimeout(() => bootstrapExistingSpaces(), 10000); }, async onSpaceCreate(ctx) { initSpaceInbox(ctx.spaceSlug, ctx.ownerDID || 'did:unknown');