feat: ensure jobs are not missed

This commit is contained in:
Nevo David 2025-08-04 20:02:59 +07:00
parent 2163b59c07
commit ef0b668770
5 changed files with 67 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import { DatabaseModule } from '@gitroom/nestjs-libraries/database/prisma/databa
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module';
import { SentryModule } from '@sentry/nestjs/setup';
import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception';
import { CheckMissingQueues } from '@gitroom/cron/tasks/check.missing.queues';
@Module({
imports: [
@ -13,8 +14,6 @@ import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception';
BullMqModule,
],
controllers: [],
providers: [
FILTER
],
providers: [FILTER, CheckMissingQueues],
})
export class CronModule {}

View File

@ -0,0 +1,42 @@
import { Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
import dayjs from 'dayjs';
@Injectable()
export class CheckMissingQueues {
constructor(
private _postService: PostsService,
private _workerServiceProducer: BullMqClient
) {}
@Cron('0 * * * *')
async handleCron() {
const list = await this._postService.searchForMissingThreeHoursPosts();
const notExists = (
await Promise.all(
list.map(async (p) => ({
id: p.id,
publishDate: p.publishDate,
isJob:
(await this._workerServiceProducer
.getQueue('post')
.getJobState(p.id)) === 'delayed',
}))
)
).filter((p) => !p.isJob);
for (const job of notExists) {
this._workerServiceProducer.emit('post', {
id: job.id,
options: {
delay: dayjs(job.publishDate).diff(dayjs(), 'millisecond'),
},
payload: {
id: job.id,
delay: dayjs(job.publishDate).diff(dayjs(), 'millisecond'),
},
});
}
}
}

View File

@ -27,6 +27,24 @@ export class PostsRepository {
private _errors: PrismaRepository<'errors'>
) {}
searchForMissingThreeHoursPosts() {
return this._post.model.post.findMany({
where: {
publishDate: {
gte: dayjs.utc().toDate(),
lt: dayjs.utc().add(3, 'hour').toDate()
},
state: 'QUEUE',
deletedAt: null,
parentPostId: null,
},
select: {
id: true,
publishDate: true,
},
});
}
getOldPosts(orgId: string, date: string) {
return this._post.model.post.findMany({
where: {

View File

@ -61,6 +61,10 @@ export class PostsService {
private openaiService: OpenaiService
) {}
searchForMissingThreeHoursPosts() {
return this._postRepository.searchForMissingThreeHoursPosts();
}
async getStatistics(orgId: string, id: string) {
const getPost = await this.getPostsRecursively(id, true, orgId, true);
const content = getPost.map((p) => p.content);

View File

@ -11,7 +11,7 @@
},
"packageManager": "pnpm@10.6.1",
"scripts": {
"dev": "pnpm run --filter ./apps/extension --filter ./apps/workers --filter ./apps/backend --filter ./apps/frontend --parallel dev",
"dev": "pnpm run --filter ./apps/extension --filter ./apps/cron --filter ./apps/workers --filter ./apps/backend --filter ./apps/frontend --parallel dev",
"pm2": "pnpm run pm2-run",
"publish-sdk": "pnpm run --filter ./apps/sdk publish",
"pm2-run": "pm2 delete all || true && pnpm run prisma-db-push && pnpm run --parallel pm2 && pm2 logs",