diff --git a/apps/backend/src/api/api.module.ts b/apps/backend/src/api/api.module.ts index 803db821..01156270 100644 --- a/apps/backend/src/api/api.module.ts +++ b/apps/backend/src/api/api.module.ts @@ -11,7 +11,6 @@ import { PermissionsService } from '@gitroom/backend/services/auth/permissions/p import { IntegrationsController } from '@gitroom/backend/api/routes/integrations.controller'; import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager'; import { SettingsController } from '@gitroom/backend/api/routes/settings.controller'; -import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module'; import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service'; import { PostsController } from '@gitroom/backend/api/routes/posts.controller'; import { MediaController } from '@gitroom/backend/api/routes/media.controller'; @@ -47,9 +46,6 @@ const authenticatedController = [ @Module({ imports: [ UploadModule, - BullMqModule.forRoot({ - connection: ioRedis, - }), ...(!!process.env.UPLOAD_DIRECTORY && !!process.env.NEXT_PUBLIC_UPLOAD_STATIC_DIRECTORY ? [ @@ -63,7 +59,12 @@ const authenticatedController = [ ] : []), ], - controllers: [StripeController, AuthController, PublicController, ...authenticatedController], + controllers: [ + StripeController, + AuthController, + PublicController, + ...authenticatedController, + ], providers: [ AuthService, StripeService, diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index ba65b22a..f2f81a3f 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -4,10 +4,11 @@ import {DatabaseModule} from "@gitroom/nestjs-libraries/database/prisma/database import {ApiModule} from "@gitroom/backend/api/api.module"; import {APP_GUARD} from "@nestjs/core"; import {PoliciesGuard} from "@gitroom/backend/services/auth/permissions/permissions.guard"; +import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module'; @Global() @Module({ - imports: [DatabaseModule, ApiModule], + imports: [BullMqModule, DatabaseModule, ApiModule], controllers: [], providers: [{ provide: APP_GUARD, diff --git a/apps/commands/src/command.module.ts b/apps/commands/src/command.module.ts index 1dcc36ca..dfa6acaa 100644 --- a/apps/commands/src/command.module.ts +++ b/apps/commands/src/command.module.ts @@ -2,18 +2,11 @@ import { Module } from '@nestjs/common'; import { CommandModule as ExternalCommandModule } from 'nestjs-command'; import { CheckStars } from './tasks/check.stars'; import { DatabaseModule } from '@gitroom/nestjs-libraries/database/prisma/database.module'; -import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module'; -import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service'; -import {RefreshTokens} from "./tasks/refresh.tokens"; +import { RefreshTokens } from './tasks/refresh.tokens'; +import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module'; @Module({ - imports: [ - ExternalCommandModule, - DatabaseModule, - BullMqModule.forRoot({ - connection: ioRedis, - }), - ], + imports: [ExternalCommandModule, DatabaseModule, BullMqModule], controllers: [], providers: [CheckStars, RefreshTokens], get exports() { diff --git a/apps/commands/src/tasks/check.stars.ts b/apps/commands/src/tasks/check.stars.ts index 1845811c..b80ce434 100644 --- a/apps/commands/src/tasks/check.stars.ts +++ b/apps/commands/src/tasks/check.stars.ts @@ -1,51 +1,52 @@ -import {Command, Positional} from 'nestjs-command'; +import { Command, Positional } from 'nestjs-command'; import { Injectable } from '@nestjs/common'; -import {BullMqClient} from "@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client"; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; @Injectable() export class CheckStars { - constructor( - private _workerServiceProducer: BullMqClient - ) { - } - @Command({ - command: 'sync:stars ', - describe: 'Sync stars for a login', + constructor(private _workerServiceProducer: BullMqClient) {} + @Command({ + command: 'sync:stars ', + describe: 'Sync stars for a login', + }) + async create( + @Positional({ + name: 'login', + describe: 'login {owner}/{repo}', + type: 'string', }) - async create( - @Positional({ - name: 'login', - describe: 'login {owner}/{repo}', - type: 'string' - }) - login: string, - ) { - this._workerServiceProducer.emit('check_stars', {payload: {login}}).subscribe(); - return true; - } + login: string + ) { + this._workerServiceProducer + .emit('check_stars', { payload: { login } }) + .subscribe(); + return true; + } - @Command({ - command: 'sync:all_stars ', - describe: 'Sync all stars for a login', + @Command({ + command: 'sync:all_stars ', + describe: 'Sync all stars for a login', + }) + async syncAllStars( + @Positional({ + name: 'login', + describe: 'login {owner}/{repo}', + type: 'string', }) - async syncAllStars( - @Positional({ - name: 'login', - describe: 'login {owner}/{repo}', - type: 'string' - }) - login: string, - ) { - this._workerServiceProducer.emit('sync_all_stars', {payload: {login}}).subscribe(); - return true; - } + login: string + ) { + this._workerServiceProducer + .emit('sync_all_stars', { payload: { login } }) + .subscribe(); + return true; + } - @Command({ - command: 'sync:trending', - describe: 'Sync trending', - }) - async syncTrending() { - this._workerServiceProducer.emit('sync_trending', {}).subscribe(); - return true; - } -} \ No newline at end of file + @Command({ + command: 'sync:trending', + describe: 'Sync trending', + }) + async syncTrending() { + this._workerServiceProducer.emit('sync_trending', {}).subscribe(); + return true; + } +} diff --git a/apps/cron/src/cron.module.ts b/apps/cron/src/cron.module.ts index 151f2f99..a44744b9 100644 --- a/apps/cron/src/cron.module.ts +++ b/apps/cron/src/cron.module.ts @@ -2,18 +2,11 @@ import { Module } from '@nestjs/common'; import { ScheduleModule } from '@nestjs/schedule'; import { CheckStars } from '@gitroom/cron/tasks/check.stars'; import { DatabaseModule } from '@gitroom/nestjs-libraries/database/prisma/database.module'; -import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module'; -import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service'; import { SyncTrending } from '@gitroom/cron/tasks/sync.trending'; +import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module'; @Module({ - imports: [ - DatabaseModule, - ScheduleModule.forRoot(), - BullMqModule.forRoot({ - connection: ioRedis, - }), - ], + imports: [DatabaseModule, ScheduleModule.forRoot(), BullMqModule], controllers: [], providers: [CheckStars, SyncTrending], }) diff --git a/apps/cron/src/tasks/check.stars.ts b/apps/cron/src/tasks/check.stars.ts index 6e77a9ba..20fd63c7 100644 --- a/apps/cron/src/tasks/check.stars.ts +++ b/apps/cron/src/tasks/check.stars.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; import { StarsService } from '@gitroom/nestjs-libraries/database/prisma/stars/stars.service'; -import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; @Injectable() export class CheckStars { diff --git a/apps/cron/src/tasks/sync.trending.ts b/apps/cron/src/tasks/sync.trending.ts index dc5a1a57..28f16250 100644 --- a/apps/cron/src/tasks/sync.trending.ts +++ b/apps/cron/src/tasks/sync.trending.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; -import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; @Injectable() export class SyncTrending { diff --git a/apps/workers/src/app/app.module.ts b/apps/workers/src/app/app.module.ts index 967118e3..efaf1029 100644 --- a/apps/workers/src/app/app.module.ts +++ b/apps/workers/src/app/app.module.ts @@ -2,15 +2,12 @@ import { Module } from '@nestjs/common'; import { StarsController } from './stars.controller'; import {DatabaseModule} from "@gitroom/nestjs-libraries/database/prisma/database.module"; -import {BullMqModule} from "@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module"; -import {ioRedis} from "@gitroom/nestjs-libraries/redis/redis.service"; import {TrendingService} from "@gitroom/nestjs-libraries/services/trending.service"; import {PostsController} from "@gitroom/workers/app/posts.controller"; +import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module'; @Module({ - imports: [DatabaseModule, BullMqModule.forRoot({ - connection: ioRedis - })], + imports: [DatabaseModule, BullMqModule], controllers: [StarsController, PostsController], providers: [TrendingService], }) diff --git a/apps/workers/src/app/posts.controller.ts b/apps/workers/src/app/posts.controller.ts index 34b48b69..a0232ddd 100644 --- a/apps/workers/src/app/posts.controller.ts +++ b/apps/workers/src/app/posts.controller.ts @@ -6,12 +6,13 @@ import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/po export class PostsController { constructor(private _postsService: PostsService) {} @EventPattern('post', Transport.REDIS) - async checkStars(data: { id: string }) { + async post(data: { id: string }) { + console.log('proceccsing', data); return this._postsService.post(data.id); } @EventPattern('submit', Transport.REDIS) - async submitOrderItemForPayout(data: { id: string, releaseURL: string }) { + async payout(data: { id: string, releaseURL: string }) { return this._postsService.payout(data.id, data.releaseURL); } } diff --git a/apps/workers/src/main.ts b/apps/workers/src/main.ts index ac1ebb84..6c89fb61 100644 --- a/apps/workers/src/main.ts +++ b/apps/workers/src/main.ts @@ -2,15 +2,12 @@ import {NestFactory} from '@nestjs/core'; import {AppModule} from './app/app.module'; import {MicroserviceOptions} from '@nestjs/microservices'; -import {BullMqServer} from "@gitroom/nestjs-libraries/bull-mq-transport/server/bull-mq.server"; +import { BullMqServer } from '@gitroom/nestjs-libraries/bull-mq-transport-new/strategy'; async function bootstrap() { - const load = await NestFactory.create(AppModule); - const strategy = load.get(BullMqServer); - // some comment again const app = await NestFactory.createMicroservice(AppModule, { - strategy, + strategy: new BullMqServer() }); await app.listen(); diff --git a/libraries/nestjs-libraries/src/bull-mq-transport-new/bull.mq.module.ts b/libraries/nestjs-libraries/src/bull-mq-transport-new/bull.mq.module.ts new file mode 100644 index 00000000..30431ec6 --- /dev/null +++ b/libraries/nestjs-libraries/src/bull-mq-transport-new/bull.mq.module.ts @@ -0,0 +1,9 @@ +import { Global, Module } from '@nestjs/common'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; + +@Global() +@Module({ + providers: [BullMqClient], + exports: [BullMqClient], +}) +export class BullMqModule {} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport-new/client.ts b/libraries/nestjs-libraries/src/bull-mq-transport-new/client.ts new file mode 100644 index 00000000..2aed109f --- /dev/null +++ b/libraries/nestjs-libraries/src/bull-mq-transport-new/client.ts @@ -0,0 +1,85 @@ +import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices'; +import { Queue, QueueEvents } from 'bullmq'; +import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service'; +import { v4 } from 'uuid'; +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class BullMqClient extends ClientProxy { + queues = new Map(); + queueEvents = new Map(); + + async connect(): Promise { + return; + } + + async close() { + return; + } + + publish( + packet: ReadPacket, + callback: (packet: WritePacket) => void + ) { + // console.log('hello'); + // this.publishAsync(packet, callback); + return () => console.log('sent'); + } + + delete(pattern: string, jobId: string) { + const queue = this.getQueue(pattern); + return queue.remove(jobId); + } + + async publishAsync( + packet: ReadPacket, + callback: (packet: WritePacket) => void + ) { + const queue = this.getQueue(packet.pattern); + const queueEvents = this.getQueueEvents(packet.pattern); + const job = await queue.add(packet.pattern, packet.data, { + jobId: packet.data.id ?? v4(), + ...packet.data.options, + removeOnComplete: true, + removeOnFail: true, + }); + + try { + await job.waitUntilFinished(queueEvents); + console.log('success'); + callback({ response: job.returnvalue, isDisposed: true }); + } catch (err) { + console.log('err'); + callback({ err, isDisposed: true }); + } + } + + getQueueEvents(pattern: string) { + return ( + this.queueEvents.get(pattern) || + new QueueEvents(pattern, { + connection: ioRedis, + }) + ); + } + + getQueue(pattern: string) { + return ( + this.queues.get(pattern) || + new Queue(pattern, { + connection: ioRedis, + }) + ); + } + + async dispatchEvent(packet: ReadPacket): Promise { + console.log('event to dispatch: ', packet); + const queue = this.getQueue(packet.pattern); + await queue.add(packet.pattern, packet.data, { + jobId: packet.data.id ?? v4(), + ...packet.data.options, + removeOnComplete: true, + removeOnFail: true, + }); + } +} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport-new/strategy.ts b/libraries/nestjs-libraries/src/bull-mq-transport-new/strategy.ts new file mode 100644 index 00000000..ff4566d3 --- /dev/null +++ b/libraries/nestjs-libraries/src/bull-mq-transport-new/strategy.ts @@ -0,0 +1,59 @@ +import { CustomTransportStrategy, Server } from '@nestjs/microservices'; +import { Queue, Worker } from 'bullmq'; +import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service'; + +export class BullMqServer extends Server implements CustomTransportStrategy { + queues: Map; + workers: Worker[] = []; + + /** + * This method is triggered when you run "app.listen()". + */ + listen(callback: () => void) { + this.queues = [...this.messageHandlers.keys()].reduce((all, pattern) => { + all.set(pattern, new Queue(pattern, { connection: ioRedis })); + return all; + }, new Map()); + + this.workers = Array.from(this.messageHandlers).map( + ([pattern, handler]) => { + return new Worker( + pattern, + async (job) => { + const stream$ = this.transformToObservable( + await handler(job.data.payload, job) + ); + + this.send(stream$, (packet) => { + if (packet.err) { + return job.discard(); + } + + return true; + }); + }, + { + connection: ioRedis, + removeOnComplete: { + count: 0, + }, + removeOnFail: { + count: 0, + }, + } + ); + } + ); + + callback(); + } + + /** + * This method is triggered on application shutdown. + */ + close() { + this.workers.map((worker) => worker.close()); + this.queues.forEach((queue) => queue.close()); + return true; + } +} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/bull-mq-core.module.ts b/libraries/nestjs-libraries/src/bull-mq-transport/bull-mq-core.module.ts deleted file mode 100644 index 1c955935..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/bull-mq-core.module.ts +++ /dev/null @@ -1,89 +0,0 @@ -import { DynamicModule, Module } from '@nestjs/common'; -import { BullMqClient } from './client/bull-mq.client'; -import { BULLMQ_MODULE_OPTIONS } from './constants/bull-mq.constants'; -import { QueueEventsFactory } from './factories/queue-events.factory'; -import { QueueFactory } from './factories/queue.factory'; -import { WorkerFactory } from './factories/worker.factory'; -import { IBullMqModuleOptionsAsync } from './interfaces/bull-mq-module-options-async.interface'; -import { IBullMqModuleOptionsFactory } from './interfaces/bull-mq-module-options-factory.interface'; -import { IBullMqModuleOptions } from './interfaces/bull-mq-module-options.interface'; -import { BullMqServer } from './server/bull-mq.server'; - -@Module({}) -export class BullMqCoreModule { - static forRoot(options: IBullMqModuleOptions): DynamicModule { - return { - module: BullMqCoreModule, - global: true, - providers: [ - { provide: BULLMQ_MODULE_OPTIONS, useValue: options }, - QueueFactory, - QueueEventsFactory, - WorkerFactory, - BullMqServer, - BullMqClient, - ], - exports: [BullMqServer, BullMqClient, BULLMQ_MODULE_OPTIONS], - }; - } - - static forRootAsync(options: IBullMqModuleOptionsAsync): DynamicModule { - return { - module: BullMqCoreModule, - global: true, - imports: options.imports ?? [], - providers: [ - ...(options.providers ?? []), - ...this.createAsyncProviders(options), - QueueFactory, - QueueEventsFactory, - WorkerFactory, - BullMqServer, - BullMqClient, - ], - exports: [BullMqServer, BullMqClient, BULLMQ_MODULE_OPTIONS], - }; - } - - private static createAsyncProviders(options: IBullMqModuleOptionsAsync) { - if (options.useExisting ?? options.useFactory) { - return [this.createAsyncOptionsProvider(options)]; - } - - if (options.useClass) { - return [ - this.createAsyncOptionsProvider(options), - { provide: options.useClass, useClass: options.useClass }, - ]; - } - - throw new Error( - 'Invalid BullMqModule async options: one of `useClass`, `useExisting` or `useFactory` should be defined.', - ); - } - - private static createAsyncOptionsProvider( - options: IBullMqModuleOptionsAsync, - ) { - if (options.useFactory) { - return { - provide: BULLMQ_MODULE_OPTIONS, - useFactory: options.useFactory, - inject: options.inject ?? [], - }; - } - - const inject: any[] = []; - - if (options.useClass ?? options.useExisting) { - inject.push(options.useClass ?? options.useExisting); - } - - return { - provide: BULLMQ_MODULE_OPTIONS, - useFactory: async (optionsFactory: IBullMqModuleOptionsFactory) => - await optionsFactory.createModuleOptions(), - inject, - }; - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/bull-mq.module.ts b/libraries/nestjs-libraries/src/bull-mq-transport/bull-mq.module.ts deleted file mode 100644 index 151bfca2..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/bull-mq.module.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { DynamicModule } from '@nestjs/common'; -import { BullMqCoreModule } from './bull-mq-core.module'; -import { IBullMqModuleOptionsAsync } from './interfaces/bull-mq-module-options-async.interface'; -import { IBullMqModuleOptions } from './interfaces/bull-mq-module-options.interface'; -export class BullMqModule { - static forRoot(options: IBullMqModuleOptions): DynamicModule { - return { - module: BullMqModule, - imports: [BullMqCoreModule.forRoot(options)], - }; - } - - static forRootAsync(options: IBullMqModuleOptionsAsync): DynamicModule { - return { - module: BullMqModule, - imports: [BullMqCoreModule.forRootAsync(options)], - }; - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/client/bull-mq.client.ts b/libraries/nestjs-libraries/src/bull-mq-transport/client/bull-mq.client.ts deleted file mode 100644 index 356db032..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/client/bull-mq.client.ts +++ /dev/null @@ -1,112 +0,0 @@ -import { Inject, Injectable } from '@nestjs/common'; -import { - ClientProxy, - ReadPacket, - RpcException, - WritePacket, -} from '@nestjs/microservices'; -import { Queue } from 'bullmq'; -import { v4 } from 'uuid'; -import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants'; -import { QueueEventsFactory } from '../factories/queue-events.factory'; -import { QueueFactory } from '../factories/queue.factory'; -import { IBullMqEvent } from '../interfaces/bull-mq-event.interface'; -import {IBullMqModuleOptions} from "@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface"; - -@Injectable() -export class BullMqClient extends ClientProxy { - constructor( - @Inject(BULLMQ_MODULE_OPTIONS) - private readonly options: IBullMqModuleOptions, - private readonly queueFactory: QueueFactory, - private readonly queueEventsFactory: QueueEventsFactory, - ) { - super(); - } - - async connect(): Promise { - return; - } - - async close(): Promise { - return; - } - - protected publish( - packet: ReadPacket>, - callback: (packet: WritePacket) => void, - ): () => void { - const queue = this.getQueue(packet.pattern); - const events = this.queueEventsFactory.create(packet.pattern, { - connection: this.options.connection, - }); - events.on('completed', (job) => - callback({ - response: job.returnvalue, - isDisposed: true, - }), - ); - events.on('failed', async (jobInfo) => { - const job = await queue.getJob(jobInfo.jobId); - const err = new RpcException(jobInfo.failedReason); - err.stack = job?.stacktrace?.[0]; - callback({ - err, - isDisposed: true, - }); - }); - queue - .add(packet.pattern, packet.data, { - jobId: packet.data.id ?? v4(), - removeOnComplete: true, - removeOnFail: true, - ...packet.data.options, - }) - .then(async (job) => { - try { - await job.waitUntilFinished(events); - } catch { - // BullMq unnecessarily re-throws the error we're handling in - // waitUntilFinished(), so we ignore that here. - } finally { - await events.close(); - await queue.close(); - } - }); - return () => void 0; - } - - delay(pattern: string, jobId: string, delay: number) { - const queue = this.getQueue(pattern); - return queue.getJob(jobId).then((job) => job?.changeDelay(delay)); - } - - async delete(pattern: string, jobId: string) { - const queue = this.getQueue(pattern); - return queue.getJob(jobId).then((job) => job?.remove()); - } - - job(pattern: string, jobId: string) { - const queue = this.getQueue(pattern); - return queue.getJob(jobId); - } - - protected async dispatchEvent( - packet: ReadPacket>, - ): Promise { - const queue = this.getQueue(packet.pattern); - console.log(packet); - await queue.add(packet.pattern, packet.data, { - jobId: packet.data.id ?? v4(), - ...packet.data.options, - }); - await queue.close(); - } - - protected getQueue(pattern: any): Queue { - const queue = this.queueFactory.create(pattern, { - connection: this.options.connection, - }); - return queue; - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/constants/bull-mq.constants.ts b/libraries/nestjs-libraries/src/bull-mq-transport/constants/bull-mq.constants.ts deleted file mode 100644 index f6757110..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/constants/bull-mq.constants.ts +++ /dev/null @@ -1 +0,0 @@ -export const BULLMQ_MODULE_OPTIONS = 'BULLMQ_MODULE_OPTIONS'; diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/exceptions/bull-mq-rpc-exception.filter.ts b/libraries/nestjs-libraries/src/bull-mq-transport/exceptions/bull-mq-rpc-exception.filter.ts deleted file mode 100644 index 7ed83006..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/exceptions/bull-mq-rpc-exception.filter.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { - ArgumentsHost, - Catch, - Inject, - InternalServerErrorException, - Logger, - LogLevel, - RpcExceptionFilter, -} from '@nestjs/common'; -import { BaseExceptionFilter } from '@nestjs/core'; -import { RpcException } from '@nestjs/microservices'; -import { Observable, of, throwError } from 'rxjs'; -import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants'; -import {IBullMqModuleOptions} from "@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface"; - -@Catch(RpcException) -export class BullMqRpcExceptionFilter - extends BaseExceptionFilter - implements RpcExceptionFilter -{ - private readonly logger = new Logger(); - - constructor( - @Inject(BULLMQ_MODULE_OPTIONS) - private readonly options: IBullMqModuleOptions, - ) { - super(); - } - - override catch(exception: RpcException, host: ArgumentsHost): Observable { - if (host.getType() === 'http') { - const err = new InternalServerErrorException( - exception.message, - exception.constructor.name, - ); - if (exception.stack) { - err.stack = exception.stack; - } - this.logException(err, host); - return of(super.catch(err, host)); - } - - const err = { - name: exception.name, - error: exception.name, - message: exception.message, - stack: exception.stack || undefined, - }; - - this.logException(err, host); - - return throwError(() => err); - } - - logException(exception: Error, host: ArgumentsHost): void { - const defaultLogLevel: LogLevel = 'error'; - - switch (this.options.logExceptionsAsLevel ?? defaultLogLevel) { - case 'off': - return; - case 'log': - return this.logger.log(exception.stack, host.getType()); - case 'error': - return this.logger.error( - exception.message, - exception.stack, - host.getType(), - ); - case 'warn': - return this.logger.warn(exception.stack, host.getType()); - case 'debug': - return this.logger.debug(exception.stack, host.getType()); - case 'verbose': - return this.logger.verbose(exception.stack, host.getType()); - } - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/exceptions/bull-mq-rpc-validation.exception.ts b/libraries/nestjs-libraries/src/bull-mq-transport/exceptions/bull-mq-rpc-validation.exception.ts deleted file mode 100644 index d8f5d1f3..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/exceptions/bull-mq-rpc-validation.exception.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { RpcException } from '@nestjs/microservices'; - -export class BullMqRpcValidationException extends RpcException { - override name = this.constructor.name; -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/factories/queue-events.factory.ts b/libraries/nestjs-libraries/src/bull-mq-transport/factories/queue-events.factory.ts deleted file mode 100644 index 13dc70f1..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/factories/queue-events.factory.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { QueueEvents, QueueEventsOptions } from 'bullmq'; - -@Injectable() -export class QueueEventsFactory { - create(name: string, options?: QueueEventsOptions): QueueEvents { - return new QueueEvents(name, options); - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/factories/queue.factory.ts b/libraries/nestjs-libraries/src/bull-mq-transport/factories/queue.factory.ts deleted file mode 100644 index 158a8400..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/factories/queue.factory.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { Queue, QueueOptions } from 'bullmq'; - -@Injectable() -export class QueueFactory { - create(name: string, options?: QueueOptions): Queue { - return new Queue(name, options); - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/factories/worker.factory.ts b/libraries/nestjs-libraries/src/bull-mq-transport/factories/worker.factory.ts deleted file mode 100644 index 7cea0168..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/factories/worker.factory.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { Processor, Worker, WorkerOptions } from 'bullmq'; - -@Injectable() -export class WorkerFactory { - create( - name: string, - processor?: string | Processor, - opts?: WorkerOptions, - ): Worker { - return new Worker(name, processor, opts); - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-event.interface.ts b/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-event.interface.ts deleted file mode 100644 index cde75d0b..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-event.interface.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { JobsOptions } from 'bullmq'; - -export interface IBullMqEvent { - id?: string; - payload: T; - options?: JobsOptions; -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options-async.interface.ts b/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options-async.interface.ts deleted file mode 100644 index d334d2a2..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options-async.interface.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { Type } from '@nestjs/common'; -import { IBullMqModuleOptionsFactory } from './bull-mq-module-options-factory.interface'; -import { IBullMqModuleOptions } from './bull-mq-module-options.interface'; - -export interface IBullMqModuleOptionsAsync { - imports?: any[]; - providers?: any[]; - inject?: any[]; - useClass?: Type; - useExisting?: Type; - useFactory?: ( - ...args: any[] - ) => IBullMqModuleOptions | Promise; -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options-factory.interface.ts b/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options-factory.interface.ts deleted file mode 100644 index 148baca3..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options-factory.interface.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { IBullMqModuleOptions } from './bull-mq-module-options.interface'; - -export interface IBullMqModuleOptionsFactory { - createModuleOptions(): IBullMqModuleOptions | Promise; -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options.interface.ts b/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options.interface.ts deleted file mode 100644 index 0e5ede5c..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/interfaces/bull-mq-module-options.interface.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { LogLevel } from '@nestjs/common'; -import { WorkerOptions } from 'bullmq'; - -export interface IBullMqModuleOptions extends WorkerOptions { - logExceptionsAsLevel?: LogLevel | 'off'; -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/pipes/bull-mq-rpc-validation.pipe.ts b/libraries/nestjs-libraries/src/bull-mq-transport/pipes/bull-mq-rpc-validation.pipe.ts deleted file mode 100644 index 42a5121f..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/pipes/bull-mq-rpc-validation.pipe.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { - ValidationError, - ValidationPipe, - ValidationPipeOptions, -} from '@nestjs/common'; -import {BullMqRpcValidationException} from "@gitroom/nestjs-libraries/bull-mq-transport/exceptions/bull-mq-rpc-validation.exception"; - -export class BullMqRpcValidationPipe extends ValidationPipe { - constructor(options?: ValidationPipeOptions) { - super({ - exceptionFactory: ( - errors: ValidationError[], - ): BullMqRpcValidationException => { - return new BullMqRpcValidationException(errors.toString()); - }, - ...options, - }); - } -} diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/server/bull-mq.server.ts b/libraries/nestjs-libraries/src/bull-mq-transport/server/bull-mq.server.ts deleted file mode 100644 index 3aef1bc5..00000000 --- a/libraries/nestjs-libraries/src/bull-mq-transport/server/bull-mq.server.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { - CustomTransportStrategy, - Server, - Transport, -} from '@nestjs/microservices'; -import { Job, Worker } from 'bullmq'; -import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants'; -import { WorkerFactory } from '../factories/worker.factory'; -import { IBullMqModuleOptions } from '@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface'; - -@Injectable() -export class BullMqServer extends Server implements CustomTransportStrategy { - transportId = Transport.REDIS; - - protected override readonly logger = new Logger(this.constructor.name); - protected readonly workers = new Map(); - - constructor( - @Inject(BULLMQ_MODULE_OPTIONS) - private readonly options: IBullMqModuleOptions, - private readonly workerFactory: WorkerFactory - ) { - super(); - - this.initializeSerializer(this.serializer); - this.initializeDeserializer(this.deserializer); - } - - listen(callback: (...optionalParams: unknown[]) => void) { - for (const [pattern, handler] of this.messageHandlers) { - if (pattern && handler && !this.workers.has(pattern)) { - const worker = this.workerFactory.create( - pattern, - (job: Job) => { - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve, reject) => { - const stream$ = this.transformToObservable( - await handler(job.data.payload, job) - ); - this.send(stream$, (packet) => { - if (packet.err) { - return reject(packet.err); - } - resolve(packet.response); - }); - }); - }, - { - ...this.options, - ...{ removeOnComplete: { count: 0 }, removeOnFail: { count: 0 } }, - ...handler?.extras, - } - ); - this.workers.set(pattern, worker); - this.logger.log(`Registered queue "${pattern}"`); - } - } - callback(); - } - - async close() { - for (const worker of this.workers.values()) { - await worker.close(); - } - } -} diff --git a/libraries/nestjs-libraries/src/database/prisma/marketplace/messages.service.ts b/libraries/nestjs-libraries/src/database/prisma/marketplace/messages.service.ts index 6e9a3c20..6e0e87a1 100644 --- a/libraries/nestjs-libraries/src/database/prisma/marketplace/messages.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/marketplace/messages.service.ts @@ -4,10 +4,10 @@ import { NewConversationDto } from '@gitroom/nestjs-libraries/dtos/marketplace/n import { AddMessageDto } from '@gitroom/nestjs-libraries/dtos/messages/add.message'; import { CreateOfferDto } from '@gitroom/nestjs-libraries/dtos/marketplace/create.offer.dto'; import { From, OrderStatus, User } from '@prisma/client'; -import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client'; import { OrganizationRepository } from '@gitroom/nestjs-libraries/database/prisma/organizations/organization.repository'; import { NotificationService } from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service'; import dayjs from 'dayjs'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; @Injectable() export class MessagesService { diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts index 80eee3be..3222e75e 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts @@ -1,13 +1,12 @@ import { Injectable } from '@nestjs/common'; import { PostsRepository } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.repository'; import { CreatePostDto } from '@gitroom/nestjs-libraries/dtos/posts/create.post.dto'; -import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client'; import dayjs from 'dayjs'; import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager'; import { Integration, Post, Media, From } from '@prisma/client'; import { GetPostsDto } from '@gitroom/nestjs-libraries/dtos/posts/get.posts.dto'; import { NotificationService } from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service'; -import { capitalize, chunk, shuffle } from 'lodash'; +import { capitalize, shuffle } from 'lodash'; import { MessagesService } from '@gitroom/nestjs-libraries/database/prisma/marketplace/messages.service'; import { StripeService } from '@gitroom/nestjs-libraries/services/stripe.service'; import { GeneratorDto } from '@gitroom/nestjs-libraries/dtos/generator/generator.dto'; @@ -17,6 +16,7 @@ import { CreateGeneratedPostsDto } from '@gitroom/nestjs-libraries/dtos/generato import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service'; import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; import { RefreshToken } from '@gitroom/nestjs-libraries/integrations/social.abstract'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; type PostWithConditionals = Post & { integration?: Integration; diff --git a/libraries/nestjs-libraries/src/database/prisma/stars/stars.service.ts b/libraries/nestjs-libraries/src/database/prisma/stars/stars.service.ts index e3dde497..fd8130ec 100644 --- a/libraries/nestjs-libraries/src/database/prisma/stars/stars.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/stars/stars.service.ts @@ -4,8 +4,8 @@ import { chunk, groupBy } from 'lodash'; import dayjs from 'dayjs'; import { NotificationService } from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service'; import { StarsListDto } from '@gitroom/nestjs-libraries/dtos/analytics/stars.list.dto'; -import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client'; import { mean } from 'simple-statistics'; +import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; enum Inform { Removed, New, diff --git a/package-lock.json b/package-lock.json index 0f7f392a..49fcffdd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -62,7 +62,7 @@ "axios": "1.6.7", "bcrypt": "^5.1.1", "bufferutil": "^4.0.8", - "bullmq": "^5.1.5", + "bullmq": "^5.12.12", "cache-manager-redis-store": "^2.0.0", "chart.js": "^4.4.1", "class-transformer": "^0.5.1", @@ -17182,9 +17182,9 @@ } }, "node_modules/bullmq": { - "version": "5.8.3", - "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.8.3.tgz", - "integrity": "sha512-RJgQu/vgSZqjOYrZ7F1UJsSAzveNx7FFpR3Tp/1TxOMXXN9TtZMSly5MT+vjzOhQX//3+YWNRbMWpC1mkqBc9w==", + "version": "5.12.12", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.12.12.tgz", + "integrity": "sha512-xrWKDj1ZwnGKmrlmFqF6Vmub3WqDFfdBcIRLCooIs5+jeVzbHK7/1usgYSFg2pZiwK6h6eMivTb9WvcKkNW/+w==", "dependencies": { "cron-parser": "^4.6.0", "ioredis": "^5.4.1", diff --git a/package.json b/package.json index 2fb58d91..3c7c1504 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,7 @@ "axios": "1.6.7", "bcrypt": "^5.1.1", "bufferutil": "^4.0.8", - "bullmq": "^5.1.5", + "bullmq": "^5.12.12", "cache-manager-redis-store": "^2.0.0", "chart.js": "^4.4.1", "class-transformer": "^0.5.1",