Harden rinbox agent mailbox pipeline: loop detection, rate limits, envelope fix
CI/CD / deploy (push) Waiting to run Details

- Fix executeApproval SMTP envelope to authenticate as SMTP_USER (Mailcow sender mismatch)
- Add reply loop detection: skip auto-replies, noreply, mailer-daemon, postmaster senders
- Per-sender rate limit: 3 replies/hr per sender per agent mailbox
- Daily send cap: 50 replies/day per agent mailbox
- Reply length cap: truncate agent replies at 2000 chars
- Bootstrap existing spaces on init: provision missing team inbox + agent mailbox docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-01 12:36:23 -07:00
parent adb1c7cb87
commit 9fd3ca931c
1 changed files with 169 additions and 2 deletions

View File

@ -1727,8 +1727,7 @@ async function syncAgentMailbox(creds: AgentImapCreds) {
try {
const parsed = await simpleParser(msg.source);
const threadId = generateId();
const fromAddr = parsed.from?.value?.[0]?.address || '';
const fromAddr = (parsed.from?.value?.[0]?.address || '').toLowerCase();
const fromName = parsed.from?.value?.[0]?.name || '';
const subject = parsed.subject || '(no subject)';
const messageId = parsed.messageId || null;
@ -1737,6 +1736,56 @@ async function syncAgentMailbox(creds: AgentImapCreds) {
const toAddrs = parsed.to?.value?.map((a: any) => a.address).filter(Boolean) || [];
const ccAddrs = parsed.cc?.value?.map((a: any) => a.address).filter(Boolean) || [];
// ── Reply loop detection ──
const isAutoReply =
fromAddr.endsWith('-agent@rspace.online') ||
fromAddr.startsWith('noreply@') ||
fromAddr.startsWith('no-reply@') ||
fromAddr.startsWith('mailer-daemon@') ||
fromAddr.startsWith('postmaster@') ||
/^(auto[_-]?reply|out[_-]?of[_-]?office)/i.test(subject) ||
parsed.headers?.get('auto-submitted')?.toString() !== undefined &&
parsed.headers?.get('auto-submitted')?.toString() !== 'no' ||
parsed.headers?.get('x-auto-response-suppress') !== undefined;
if (isAutoReply) {
// Store as skipped thread but don't trigger MI processing
const skipThreadId = generateId();
_syncServer!.changeDoc<MailboxDoc>(agentDoc.docId, `Agent IMAP (skipped auto-reply): ${subject}`, (d) => {
d.threads[skipThreadId] = {
id: skipThreadId,
mailboxId: d.mailbox.id,
messageId,
subject,
fromAddress: fromAddr,
fromName,
toAddresses: toAddrs,
ccAddresses: ccAddrs,
bodyText: parsed.text || '',
bodyHtml: parsed.html || '',
tags: ['agent-inbound-skipped'],
status: 'closed',
isRead: true,
isStarred: false,
assignedTo: null,
hasAttachments: parsed.attachments?.length > 0 || false,
receivedAt: parsed.date?.getTime() || Date.now(),
createdAt: Date.now(),
comments: [],
inReplyTo: inReplyTo || null,
references,
direction: 'inbound',
parentThreadId: null,
};
});
console.log(`[Inbox] Agent ${creds.email} skipped auto-reply from ${fromAddr}: "${subject}"`);
if (msg.uid > maxUid) maxUid = msg.uid;
count++;
continue;
}
const threadId = generateId();
_syncServer!.changeDoc<MailboxDoc>(agentDoc.docId, `Agent IMAP: ${subject}`, (d) => {
d.threads[threadId] = {
id: threadId,
@ -1802,6 +1851,16 @@ async function syncAllAgentMailboxes() {
}
}
// ── Agent reply rate limiting ──
const _agentReplyTracker = new Map<string, { count: number; windowStart: number }>();
const AGENT_RATE_LIMIT = 3; // max replies per sender per window
const AGENT_RATE_WINDOW = 3600000; // 1 hour
const _agentDailySends = new Map<string, { count: number; day: string }>();
const AGENT_DAILY_CAP = 50; // max replies per agent mailbox per day
const AGENT_REPLY_MAX_LENGTH = 2000; // max characters in agent reply
/**
* Process an inbound agent email through MI agentic loop.
* Creates an auto-approved reply and sends it via SMTP.
@ -1817,6 +1876,44 @@ async function processAgentMI(docId: string, threadId: string, spaceSlug: string
const agent = Object.values(doc.agentInboxes)[0];
if (!agent?.autoReply) return;
// ── Per-sender rate limit ──
const senderKey = `${spaceSlug}:${thread.fromAddress!.toLowerCase()}`;
const now = Date.now();
const tracker = _agentReplyTracker.get(senderKey);
if (tracker) {
if (now - tracker.windowStart < AGENT_RATE_WINDOW) {
if (tracker.count >= AGENT_RATE_LIMIT) {
console.warn(`[Inbox] Rate limit: ${thread.fromAddress} exceeded ${AGENT_RATE_LIMIT} replies/hr for ${spaceSlug}`);
return;
}
tracker.count++;
} else {
tracker.count = 1;
tracker.windowStart = now;
}
} else {
_agentReplyTracker.set(senderKey, { count: 1, windowStart: now });
}
// ── Daily send cap per agent mailbox ──
const today = new Date().toISOString().slice(0, 10);
const dailyKey = spaceSlug;
const daily = _agentDailySends.get(dailyKey);
if (daily) {
if (daily.day === today) {
if (daily.count >= AGENT_DAILY_CAP) {
console.warn(`[Inbox] Daily cap: ${spaceSlug}-agent hit ${AGENT_DAILY_CAP} replies for ${today}`);
return;
}
daily.count++;
} else {
daily.count = 1;
daily.day = today;
}
} else {
_agentDailySends.set(dailyKey, { count: 1, day: today });
}
try {
const { miRegistry } = await import('../../server/mi-provider');
const { runAgenticLoop } = await import('../../server/mi-agent');
@ -1860,6 +1957,11 @@ async function processAgentMI(docId: string, threadId: string, spaceSlug: string
if (!replyText.trim()) return;
// ── Reply length cap ──
if (replyText.length > AGENT_REPLY_MAX_LENGTH) {
replyText = replyText.slice(0, AGENT_REPLY_MAX_LENGTH) + '...[truncated]';
}
// Create auto-approved approval and execute it
const approvalId = generateId();
const replySubject = thread.subject.startsWith('Re:') ? thread.subject : `Re: ${thread.subject}`;
@ -2173,6 +2275,69 @@ function seedTemplateInbox(space: string) {
console.log(`[Inbox] Template seeded for "${space}": 1 mailbox (${space}-inbox), 3 threads`);
}
/**
* Bootstrap mailboxes for spaces created before rinbox existed.
* Iterates all space docs and ensures each has team inbox + agent mailbox.
*/
async function bootstrapExistingSpaces() {
if (!_syncServer) return;
try {
// Collect all known space slugs from existing inbox docs
const existingSlugs = new Set<string>();
const existingAgentSlugs = new Set<string>();
for (const id of _syncServer.listDocs()) {
const parts = id.split(':');
if (parts.length >= 4 && parts[1] === 'inbox' && parts[2] === 'mailboxes') {
const doc = _syncServer.getDoc<MailboxDoc>(id);
if (doc) {
if (doc.mailbox.slug.endsWith('-agent')) {
existingAgentSlugs.add(parts[0]);
} else {
existingSlugs.add(parts[0]);
}
}
}
}
// Find all space slugs from ANY doc on the SyncServer (spaces module stores space docs)
const allSpaceSlugs = new Set<string>();
for (const id of _syncServer.listDocs()) {
const parts = id.split(':');
// Space docs follow pattern: {space}:spaces:... or just use the first segment
if (parts.length >= 2 && parts[0] !== 'global') {
allSpaceSlugs.add(parts[0]);
}
}
let bootstrapped = 0;
for (const slug of allSpaceSlugs) {
if (slug === 'demo' || slug === 'template') continue; // skip demo/template
if (!existingSlugs.has(slug)) {
initSpaceInbox(slug, 'did:bootstrap');
// Provision Mailcow alias
fetch(`${ENCRYPTID_INTERNAL}/api/internal/spaces/${slug}/alias`, { method: 'POST' }).catch(() => {});
bootstrapped++;
}
if (!existingAgentSlugs.has(slug)) {
initAgentMailbox(slug, 'did:bootstrap');
// Provision Mailcow agent mailbox
fetch(`${ENCRYPTID_INTERNAL}/api/internal/spaces/${slug}/agent-mailbox`, { method: 'POST' }).catch(() => {});
bootstrapped++;
}
}
if (bootstrapped > 0) {
console.log(`[Inbox] Bootstrap: provisioned ${bootstrapped} missing mailboxes across ${allSpaceSlugs.size} spaces`);
} else {
console.log(`[Inbox] Bootstrap: all ${allSpaceSlugs.size} spaces already have mailboxes`);
}
} catch (e: any) {
console.error(`[Inbox] Bootstrap error:`, e.message);
}
}
export const inboxModule: RSpaceModule = {
id: "rinbox",
name: "rInbox",
@ -2188,6 +2353,8 @@ export const inboxModule: RSpaceModule = {
console.log("[Inbox] Module initialized (Automerge-only, no PG)");
// Pre-warm SMTP transport
if (SMTP_USER) getSmtpTransport().catch(() => {});
// Bootstrap mailboxes for spaces created before rinbox existed
setTimeout(() => bootstrapExistingSpaces(), 10000);
},
async onSpaceCreate(ctx) {
initSpaceInbox(ctx.spaceSlug, ctx.ownerDID || 'did:unknown');