fix: 50ms between emails, prevent resend from crashing

This commit is contained in:
Nevo David 2026-01-06 12:34:54 +07:00
parent bb9aa1aee6
commit 5d76475801
6 changed files with 90 additions and 3 deletions

View File

@ -13,7 +13,7 @@ export class EmailActivity {
@ActivityMethod()
async sendEmail(to: string, subject: string, html: string, replyTo?: string) {
return this._emailService.sendEmail(to, subject, html, replyTo);
return this._emailService.sendEmailSync(to, subject, html, replyTo);
}
@ActivityMethod()

View File

@ -0,0 +1,9 @@
import { defineSignal } from '@temporalio/workflow';
export type SendEmail = {
to: string;
subject: string;
html: string;
replyTo?: string;
};
export const sendEmailSignal = defineSignal<[SendEmail]>('sendEmail');

View File

@ -61,7 +61,7 @@ export async function digestEmailWorkflow({
);
}
return continueAsNew({
return await continueAsNew({
organizationId,
queue,
});

View File

@ -2,3 +2,4 @@ export * from './post.workflow';
export * from './autopost.workflow';
export * from './digest.email.workflow';
export * from './missing.post.workflow';
export * from './send.email.workflow';

View File

@ -0,0 +1,59 @@
import {
proxyActivities,
setHandler,
condition,
sleep,
continueAsNew,
} from '@temporalio/workflow';
import { EmailActivity } from '@gitroom/orchestrator/activities/email.activity';
import {
SendEmail,
sendEmailSignal,
} from '@gitroom/orchestrator/signals/send.email.signal';
const { sendEmail } = proxyActivities<EmailActivity>({
startToCloseTimeout: '10 minute',
taskQueue: 'main',
retry: {
maximumAttempts: 3,
backoffCoefficient: 1,
initialInterval: '2 minutes',
},
});
// Rate limit: 2 requests per second = 500ms between requests
const RATE_LIMIT_MS = 500;
export async function sendEmailWorkflow({
queue = [],
}: {
queue: SendEmail[];
}) {
let processedThisRun = 0;
// Handle incoming email signals
setHandler(sendEmailSignal, (email: SendEmail) => {
queue.push(email);
});
// Process emails with rate limiting
while (true) {
// Wait until there's an email in the queue or timeout after 1 hour of inactivity
const waitForQueue = await condition(() => queue.length > 0, '1 hour');
if (!waitForQueue) {
break;
}
try {
const email = queue.shift()!;
await sendEmail(email.to, email.subject, email.html, email.replyTo);
processedThisRun++;
} catch (err) {}
await sleep(RATE_LIMIT_MS);
if (processedThisRun >= 100) {
return await continueAsNew({ queue });
}
}
}

View File

@ -3,11 +3,12 @@ import { EmailInterface } from '@gitroom/nestjs-libraries/emails/email.interface
import { ResendProvider } from '@gitroom/nestjs-libraries/emails/resend.provider';
import { EmptyProvider } from '@gitroom/nestjs-libraries/emails/empty.provider';
import { NodeMailerProvider } from '@gitroom/nestjs-libraries/emails/node.mailer.provider';
import { TemporalService } from 'nestjs-temporal-core';
@Injectable()
export class EmailService {
emailService: EmailInterface;
constructor() {
constructor(private _temporalService: TemporalService) {
this.emailService = this.selectProvider(process.env.EMAIL_PROVIDER!);
console.log('Email service provider:', this.emailService.name);
for (const key of this.emailService.validateEnvKeys) {
@ -33,6 +34,23 @@ export class EmailService {
}
async sendEmail(to: string, subject: string, html: string, replyTo?: string) {
return this._temporalService.client
.getRawClient()
?.workflow.signalWithStart('sendEmailWorkflow', {
taskQueue: 'main',
workflowId: 'send_email',
signal: 'sendEmail',
args: [{ queue: [] }],
signalArgs: [{ to, subject, html, replyTo }],
});
}
async sendEmailSync(
to: string,
subject: string,
html: string,
replyTo?: string
) {
if (to.indexOf('@') === -1) {
return;
}