From ef0b668770cba86f32f06c92e836584b49d5418a Mon Sep 17 00:00:00 2001 From: Nevo David Date: Mon, 4 Aug 2025 20:02:59 +0700 Subject: [PATCH] feat: ensure jobs are not missed --- apps/cron/src/cron.module.ts | 5 +-- apps/cron/src/tasks/check.missing.queues.ts | 42 +++++++++++++++++++ .../database/prisma/posts/posts.repository.ts | 18 ++++++++ .../database/prisma/posts/posts.service.ts | 4 ++ package.json | 2 +- 5 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 apps/cron/src/tasks/check.missing.queues.ts diff --git a/apps/cron/src/cron.module.ts b/apps/cron/src/cron.module.ts index 253b8202..3a010154 100644 --- a/apps/cron/src/cron.module.ts +++ b/apps/cron/src/cron.module.ts @@ -4,6 +4,7 @@ import { DatabaseModule } from '@gitroom/nestjs-libraries/database/prisma/databa import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module'; import { SentryModule } from '@sentry/nestjs/setup'; import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception'; +import { CheckMissingQueues } from '@gitroom/cron/tasks/check.missing.queues'; @Module({ imports: [ @@ -13,8 +14,6 @@ import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception'; BullMqModule, ], controllers: [], - providers: [ - FILTER - ], + providers: [FILTER, CheckMissingQueues], }) export class CronModule {} diff --git a/apps/cron/src/tasks/check.missing.queues.ts b/apps/cron/src/tasks/check.missing.queues.ts new file mode 100644 index 00000000..c383ab6a --- /dev/null +++ b/apps/cron/src/tasks/check.missing.queues.ts @@ -0,0 +1,42 @@ +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; +import dayjs from 'dayjs'; + +@Injectable() +export class CheckMissingQueues { + constructor( + private _postService: PostsService, + private _workerServiceProducer: BullMqClient + ) {} + @Cron('0 * * * *') + async handleCron() { + const list = await this._postService.searchForMissingThreeHoursPosts(); + const notExists = ( + await Promise.all( + list.map(async (p) => ({ + id: p.id, + publishDate: p.publishDate, + isJob: + (await this._workerServiceProducer + .getQueue('post') + .getJobState(p.id)) === 'delayed', + })) + ) + ).filter((p) => !p.isJob); + + for (const job of notExists) { + this._workerServiceProducer.emit('post', { + id: job.id, + options: { + delay: dayjs(job.publishDate).diff(dayjs(), 'millisecond'), + }, + payload: { + id: job.id, + delay: dayjs(job.publishDate).diff(dayjs(), 'millisecond'), + }, + }); + } + } +} diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts index f6db7693..8598b8ac 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts @@ -27,6 +27,24 @@ export class PostsRepository { private _errors: PrismaRepository<'errors'> ) {} + searchForMissingThreeHoursPosts() { + return this._post.model.post.findMany({ + where: { + publishDate: { + gte: dayjs.utc().toDate(), + lt: dayjs.utc().add(3, 'hour').toDate() + }, + state: 'QUEUE', + deletedAt: null, + parentPostId: null, + }, + select: { + id: true, + publishDate: true, + }, + }); + } + getOldPosts(orgId: string, date: string) { return this._post.model.post.findMany({ where: { diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts index b1970d24..0a3d7642 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts @@ -61,6 +61,10 @@ export class PostsService { private openaiService: OpenaiService ) {} + searchForMissingThreeHoursPosts() { + return this._postRepository.searchForMissingThreeHoursPosts(); + } + async getStatistics(orgId: string, id: string) { const getPost = await this.getPostsRecursively(id, true, orgId, true); const content = getPost.map((p) => p.content); diff --git a/package.json b/package.json index 37bf62bf..04cccb0d 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ }, "packageManager": "pnpm@10.6.1", "scripts": { - "dev": "pnpm run --filter ./apps/extension --filter ./apps/workers --filter ./apps/backend --filter ./apps/frontend --parallel dev", + "dev": "pnpm run --filter ./apps/extension --filter ./apps/cron --filter ./apps/workers --filter ./apps/backend --filter ./apps/frontend --parallel dev", "pm2": "pnpm run pm2-run", "publish-sdk": "pnpm run --filter ./apps/sdk publish", "pm2-run": "pm2 delete all || true && pnpm run prisma-db-push && pnpm run --parallel pm2 && pm2 logs",