Feat: remove on complete

This commit is contained in:
Nevo David 2024-08-09 21:51:48 +07:00
parent 1fc47a00dd
commit f51b2b500a
1 changed files with 8 additions and 10 deletions

View File

@ -7,7 +7,7 @@ import {
import { Job, Worker } from 'bullmq';
import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants';
import { WorkerFactory } from '../factories/worker.factory';
import {IBullMqModuleOptions} from "@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface";
import { IBullMqModuleOptions } from '@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface';
@Injectable()
export class BullMqServer extends Server implements CustomTransportStrategy {
@ -19,7 +19,7 @@ export class BullMqServer extends Server implements CustomTransportStrategy {
constructor(
@Inject(BULLMQ_MODULE_OPTIONS)
private readonly options: IBullMqModuleOptions,
private readonly workerFactory: WorkerFactory,
private readonly workerFactory: WorkerFactory
) {
super();
@ -29,18 +29,14 @@ export class BullMqServer extends Server implements CustomTransportStrategy {
listen(callback: (...optionalParams: unknown[]) => void) {
for (const [pattern, handler] of this.messageHandlers) {
if (
pattern &&
handler &&
!this.workers.has(pattern)
) {
if (pattern && handler && !this.workers.has(pattern)) {
const worker = this.workerFactory.create(
pattern,
(job: Job) => {
// eslint-disable-next-line no-async-promise-executor
return new Promise<unknown>(async (resolve, reject) => {
const stream$ = this.transformToObservable(
await handler(job.data.payload, job),
await handler(job.data.payload, job)
);
this.send(stream$, (packet) => {
if (packet.err) {
@ -50,10 +46,12 @@ export class BullMqServer extends Server implements CustomTransportStrategy {
});
});
},
// @ts-ignore
{
...this.options,
...handler?.extras
},
...{ removeOnComplete: true, removeOnFail: true },
...handler?.extras,
}
);
this.workers.set(pattern, worker);
this.logger.log(`Registered queue "${pattern}"`);