diff --git a/libraries/nestjs-libraries/src/bull-mq-transport/server/bull-mq.server.ts b/libraries/nestjs-libraries/src/bull-mq-transport/server/bull-mq.server.ts index 4573dfd2..39c6803e 100644 --- a/libraries/nestjs-libraries/src/bull-mq-transport/server/bull-mq.server.ts +++ b/libraries/nestjs-libraries/src/bull-mq-transport/server/bull-mq.server.ts @@ -7,7 +7,7 @@ import { import { Job, Worker } from 'bullmq'; import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants'; import { WorkerFactory } from '../factories/worker.factory'; -import {IBullMqModuleOptions} from "@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface"; +import { IBullMqModuleOptions } from '@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface'; @Injectable() export class BullMqServer extends Server implements CustomTransportStrategy { @@ -19,7 +19,7 @@ export class BullMqServer extends Server implements CustomTransportStrategy { constructor( @Inject(BULLMQ_MODULE_OPTIONS) private readonly options: IBullMqModuleOptions, - private readonly workerFactory: WorkerFactory, + private readonly workerFactory: WorkerFactory ) { super(); @@ -29,18 +29,14 @@ export class BullMqServer extends Server implements CustomTransportStrategy { listen(callback: (...optionalParams: unknown[]) => void) { for (const [pattern, handler] of this.messageHandlers) { - if ( - pattern && - handler && - !this.workers.has(pattern) - ) { + if (pattern && handler && !this.workers.has(pattern)) { const worker = this.workerFactory.create( pattern, (job: Job) => { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { const stream$ = this.transformToObservable( - await handler(job.data.payload, job), + await handler(job.data.payload, job) ); this.send(stream$, (packet) => { if (packet.err) { @@ -50,10 +46,12 @@ export class BullMqServer extends Server implements CustomTransportStrategy { }); }); }, + // @ts-ignore { ...this.options, - ...handler?.extras - }, + ...{ removeOnComplete: true, removeOnFail: true }, + ...handler?.extras, + } ); this.workers.set(pattern, worker); this.logger.log(`Registered queue "${pattern}"`);