From 9c5dbb2b4fd1f4663902b056d031f24b8ff53faf Mon Sep 17 00:00:00 2001 From: Nevo David Date: Mon, 21 Jul 2025 15:55:26 +0700 Subject: [PATCH] feat: monitor stuck queues --- apps/backend/src/api/api.module.ts | 2 ++ apps/backend/src/api/routes/monitor.controller.ts | 14 ++++++++++++++ .../src/bull-mq-transport-new/client.ts | 13 +++++++++++++ package.json | 2 +- 4 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 apps/backend/src/api/routes/monitor.controller.ts diff --git a/apps/backend/src/api/api.module.ts b/apps/backend/src/api/api.module.ts index e1d47314..a56b8fd9 100644 --- a/apps/backend/src/api/api.module.ts +++ b/apps/backend/src/api/api.module.ts @@ -35,6 +35,7 @@ import { McpService } from '@gitroom/nestjs-libraries/mcp/mcp.service'; import { McpController } from '@gitroom/backend/api/routes/mcp.controller'; import { SetsController } from '@gitroom/backend/api/routes/sets.controller'; import { ThirdPartyController } from '@gitroom/backend/api/routes/third-party.controller'; +import { MonitorController } from '@gitroom/backend/api/routes/monitor.controller'; const authenticatedController = [ UsersController, @@ -63,6 +64,7 @@ const authenticatedController = [ AuthController, PublicController, McpController, + MonitorController, ...authenticatedController, ], providers: [ diff --git a/apps/backend/src/api/routes/monitor.controller.ts b/apps/backend/src/api/routes/monitor.controller.ts new file mode 100644 index 00000000..c90fe4b1 --- /dev/null +++ b/apps/backend/src/api/routes/monitor.controller.ts @@ -0,0 +1,14 @@ +import { Controller, Get, Param } from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; + +@ApiTags('Monitor') +@Controller('/monitor') +export class MonitorController { + constructor(private _workerServiceProducer: BullMqClient) {} + + @Get('/queue/:name') + getMessagesGroup(@Param('name') name: string) { + return this._workerServiceProducer.checkForStuckWaitingJobs(name); + } +} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport-new/client.ts b/libraries/nestjs-libraries/src/bull-mq-transport-new/client.ts index 0354b3f3..b7581521 100644 --- a/libraries/nestjs-libraries/src/bull-mq-transport-new/client.ts +++ b/libraries/nestjs-libraries/src/bull-mq-transport-new/client.ts @@ -77,6 +77,19 @@ export class BullMqClient extends ClientProxy { ); } + async checkForStuckWaitingJobs(queueName: string) { + const queue = this.getQueue(queueName); + const getJobs = await queue.getJobs('waiting' as const); + const now = Date.now(); + const thresholdMs = 60 * 60 * 1000; + return { + valid: !getJobs.some((job) => { + const age = now - job.timestamp; + return age > thresholdMs; + }), + }; + } + async dispatchEvent(packet: ReadPacket): Promise { console.log('event to dispatch: ', packet); const queue = this.getQueue(packet.pattern); diff --git a/package.json b/package.json index eac61e35..8b2efd15 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ }, "packageManager": "pnpm@10.6.1", "scripts": { - "dev": "pnpm run --filter ./apps/extension --filter ./apps/workers --filter ./apps/backend --filter ./apps/frontend --parallel dev", + "dev": "pnpm run --filter ./apps/extension --filter ./apps/backend --filter ./apps/frontend --parallel dev", "pm2": "pnpm dlx concurrently \"pnpm run pm2-run\" \"pnpm run entryfile\"", "entryfile": "./entrypoint.sh", "publish-sdk": "pnpm run --filter ./apps/sdk publish",