From 5d76475801cd0c4dfa5a867b73ed62609ab03abd Mon Sep 17 00:00:00 2001 From: Nevo David Date: Tue, 6 Jan 2026 12:34:54 +0700 Subject: [PATCH] fix: 50ms between emails, prevent resend from crashing --- .../src/activities/email.activity.ts | 2 +- .../src/signals/send.email.signal.ts | 9 +++ .../src/workflows/digest.email.workflow.ts | 2 +- apps/orchestrator/src/workflows/index.ts | 1 + .../src/workflows/send.email.workflow.ts | 59 +++++++++++++++++++ .../src/services/email.service.ts | 20 ++++++- 6 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 apps/orchestrator/src/signals/send.email.signal.ts create mode 100644 apps/orchestrator/src/workflows/send.email.workflow.ts diff --git a/apps/orchestrator/src/activities/email.activity.ts b/apps/orchestrator/src/activities/email.activity.ts index a48cbc0b..3430a871 100644 --- a/apps/orchestrator/src/activities/email.activity.ts +++ b/apps/orchestrator/src/activities/email.activity.ts @@ -13,7 +13,7 @@ export class EmailActivity { @ActivityMethod() async sendEmail(to: string, subject: string, html: string, replyTo?: string) { - return this._emailService.sendEmail(to, subject, html, replyTo); + return this._emailService.sendEmailSync(to, subject, html, replyTo); } @ActivityMethod() diff --git a/apps/orchestrator/src/signals/send.email.signal.ts b/apps/orchestrator/src/signals/send.email.signal.ts new file mode 100644 index 00000000..f1a99bbd --- /dev/null +++ b/apps/orchestrator/src/signals/send.email.signal.ts @@ -0,0 +1,9 @@ +import { defineSignal } from '@temporalio/workflow'; + +export type SendEmail = { + to: string; + subject: string; + html: string; + replyTo?: string; +}; +export const sendEmailSignal = defineSignal<[SendEmail]>('sendEmail'); diff --git a/apps/orchestrator/src/workflows/digest.email.workflow.ts b/apps/orchestrator/src/workflows/digest.email.workflow.ts index 92802ec2..acfe9eb6 100644 --- a/apps/orchestrator/src/workflows/digest.email.workflow.ts +++ b/apps/orchestrator/src/workflows/digest.email.workflow.ts @@ -61,7 +61,7 @@ export async function digestEmailWorkflow({ ); } - return continueAsNew({ + return await continueAsNew({ organizationId, queue, }); diff --git a/apps/orchestrator/src/workflows/index.ts b/apps/orchestrator/src/workflows/index.ts index 1781c2ea..3a865864 100644 --- a/apps/orchestrator/src/workflows/index.ts +++ b/apps/orchestrator/src/workflows/index.ts @@ -2,3 +2,4 @@ export * from './post.workflow'; export * from './autopost.workflow'; export * from './digest.email.workflow'; export * from './missing.post.workflow'; +export * from './send.email.workflow'; diff --git a/apps/orchestrator/src/workflows/send.email.workflow.ts b/apps/orchestrator/src/workflows/send.email.workflow.ts new file mode 100644 index 00000000..71e552eb --- /dev/null +++ b/apps/orchestrator/src/workflows/send.email.workflow.ts @@ -0,0 +1,59 @@ +import { + proxyActivities, + setHandler, + condition, + sleep, + continueAsNew, +} from '@temporalio/workflow'; +import { EmailActivity } from '@gitroom/orchestrator/activities/email.activity'; +import { + SendEmail, + sendEmailSignal, +} from '@gitroom/orchestrator/signals/send.email.signal'; + +const { sendEmail } = proxyActivities({ + startToCloseTimeout: '10 minute', + taskQueue: 'main', + retry: { + maximumAttempts: 3, + backoffCoefficient: 1, + initialInterval: '2 minutes', + }, +}); + +// Rate limit: 2 requests per second = 500ms between requests +const RATE_LIMIT_MS = 500; + +export async function sendEmailWorkflow({ + queue = [], +}: { + queue: SendEmail[]; +}) { + let processedThisRun = 0; + // Handle incoming email signals + setHandler(sendEmailSignal, (email: SendEmail) => { + queue.push(email); + }); + + // Process emails with rate limiting + while (true) { + // Wait until there's an email in the queue or timeout after 1 hour of inactivity + const waitForQueue = await condition(() => queue.length > 0, '1 hour'); + if (!waitForQueue) { + break; + } + + try { + const email = queue.shift()!; + + await sendEmail(email.to, email.subject, email.html, email.replyTo); + processedThisRun++; + } catch (err) {} + + await sleep(RATE_LIMIT_MS); + + if (processedThisRun >= 100) { + return await continueAsNew({ queue }); + } + } +} diff --git a/libraries/nestjs-libraries/src/services/email.service.ts b/libraries/nestjs-libraries/src/services/email.service.ts index 574167d6..529228e1 100644 --- a/libraries/nestjs-libraries/src/services/email.service.ts +++ b/libraries/nestjs-libraries/src/services/email.service.ts @@ -3,11 +3,12 @@ import { EmailInterface } from '@gitroom/nestjs-libraries/emails/email.interface import { ResendProvider } from '@gitroom/nestjs-libraries/emails/resend.provider'; import { EmptyProvider } from '@gitroom/nestjs-libraries/emails/empty.provider'; import { NodeMailerProvider } from '@gitroom/nestjs-libraries/emails/node.mailer.provider'; +import { TemporalService } from 'nestjs-temporal-core'; @Injectable() export class EmailService { emailService: EmailInterface; - constructor() { + constructor(private _temporalService: TemporalService) { this.emailService = this.selectProvider(process.env.EMAIL_PROVIDER!); console.log('Email service provider:', this.emailService.name); for (const key of this.emailService.validateEnvKeys) { @@ -33,6 +34,23 @@ export class EmailService { } async sendEmail(to: string, subject: string, html: string, replyTo?: string) { + return this._temporalService.client + .getRawClient() + ?.workflow.signalWithStart('sendEmailWorkflow', { + taskQueue: 'main', + workflowId: 'send_email', + signal: 'sendEmail', + args: [{ queue: [] }], + signalArgs: [{ to, subject, html, replyTo }], + }); + } + + async sendEmailSync( + to: string, + subject: string, + html: string, + replyTo?: string + ) { if (to.indexOf('@') === -1) { return; }