From 849becc2819d43b27595d51bf3fb0fd7989134e5 Mon Sep 17 00:00:00 2001 From: Nevo David Date: Thu, 7 Aug 2025 22:05:19 +0700 Subject: [PATCH] feat: post pending --- apps/cron/src/cron.module.ts | 3 +- .../cron/src/tasks/post.now.pending.queues.ts | 41 +++++++++++++++++++ .../database/prisma/posts/posts.repository.ts | 20 ++++++++- .../database/prisma/posts/posts.service.ts | 6 ++- 4 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 apps/cron/src/tasks/post.now.pending.queues.ts diff --git a/apps/cron/src/cron.module.ts b/apps/cron/src/cron.module.ts index 3a010154..01eb6ed7 100644 --- a/apps/cron/src/cron.module.ts +++ b/apps/cron/src/cron.module.ts @@ -5,6 +5,7 @@ import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bu import { SentryModule } from '@sentry/nestjs/setup'; import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception'; import { CheckMissingQueues } from '@gitroom/cron/tasks/check.missing.queues'; +import { PostNowPendingQueues } from '@gitroom/cron/tasks/post.now.pending.queues'; @Module({ imports: [ @@ -14,6 +15,6 @@ import { CheckMissingQueues } from '@gitroom/cron/tasks/check.missing.queues'; BullMqModule, ], controllers: [], - providers: [FILTER, CheckMissingQueues], + providers: [FILTER, CheckMissingQueues, PostNowPendingQueues], }) export class CronModule {} diff --git a/apps/cron/src/tasks/post.now.pending.queues.ts b/apps/cron/src/tasks/post.now.pending.queues.ts new file mode 100644 index 00000000..3138a25c --- /dev/null +++ b/apps/cron/src/tasks/post.now.pending.queues.ts @@ -0,0 +1,41 @@ +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; + +@Injectable() +export class PostNowPendingQueues { + constructor( + private _postService: PostsService, + private _workerServiceProducer: BullMqClient + ) {} + @Cron('*/16 * * * *') + async handleCron() { + const list = await this._postService.checkPending15minutesBack(); + const notExists = ( + await Promise.all( + list.map(async (p) => ({ + id: p.id, + publishDate: p.publishDate, + isJob: + (await this._workerServiceProducer + .getQueue('post') + .getJobState(p.id)) === 'delayed', + })) + ) + ).filter((p) => !p.isJob); + + for (const job of notExists) { + this._workerServiceProducer.emit('post', { + id: job.id, + options: { + delay: 0, + }, + payload: { + id: job.id, + delay: 0, + }, + }); + } + } +} diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts index 8598b8ac..9508a26c 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts @@ -27,12 +27,30 @@ export class PostsRepository { private _errors: PrismaRepository<'errors'> ) {} + checkPending15minutesBack() { + return this._post.model.post.findMany({ + where: { + publishDate: { + lte: dayjs.utc().subtract(15, 'minute').toDate(), + gte: dayjs.utc().subtract(30, 'minute').toDate(), + }, + state: 'QUEUE', + deletedAt: null, + parentPostId: null, + }, + select: { + id: true, + publishDate: true, + }, + }); + } + searchForMissingThreeHoursPosts() { return this._post.model.post.findMany({ where: { publishDate: { gte: dayjs.utc().toDate(), - lt: dayjs.utc().add(3, 'hour').toDate() + lt: dayjs.utc().add(3, 'hour').toDate(), }, state: 'QUEUE', deletedAt: null, diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts index 06b362dd..fea33271 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts @@ -44,7 +44,6 @@ type PostWithConditionals = Post & { childrenPost: Post[]; }; - @Injectable() export class PostsService { private storage = UploadFactory.createStorage(); @@ -62,6 +61,9 @@ export class PostsService { private openaiService: OpenaiService ) {} + checkPending15minutesBack() { + return this._postRepository.checkPending15minutesBack(); + } searchForMissingThreeHoursPosts() { return this._postRepository.searchForMissingThreeHoursPosts(); } @@ -479,7 +481,7 @@ export class PostsService { p.content, true, false, - !(/<\/?[a-z][\s\S]*>/i.test(p.content)), + !/<\/?[a-z][\s\S]*>/i.test(p.content), getIntegration.mentionFormat ), settings: JSON.parse(p.settings || '{}'),