refactor bull mq

This commit is contained in:
Nevo David 2024-09-02 11:44:59 +07:00
parent 8ac7c652f6
commit d26b3c93b2
33 changed files with 228 additions and 543 deletions

View File

@ -11,7 +11,6 @@ import { PermissionsService } from '@gitroom/backend/services/auth/permissions/p
import { IntegrationsController } from '@gitroom/backend/api/routes/integrations.controller';
import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager';
import { SettingsController } from '@gitroom/backend/api/routes/settings.controller';
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module';
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
import { PostsController } from '@gitroom/backend/api/routes/posts.controller';
import { MediaController } from '@gitroom/backend/api/routes/media.controller';
@ -47,9 +46,6 @@ const authenticatedController = [
@Module({
imports: [
UploadModule,
BullMqModule.forRoot({
connection: ioRedis,
}),
...(!!process.env.UPLOAD_DIRECTORY &&
!!process.env.NEXT_PUBLIC_UPLOAD_STATIC_DIRECTORY
? [
@ -63,7 +59,12 @@ const authenticatedController = [
]
: []),
],
controllers: [StripeController, AuthController, PublicController, ...authenticatedController],
controllers: [
StripeController,
AuthController,
PublicController,
...authenticatedController,
],
providers: [
AuthService,
StripeService,

View File

@ -4,10 +4,11 @@ import {DatabaseModule} from "@gitroom/nestjs-libraries/database/prisma/database
import {ApiModule} from "@gitroom/backend/api/api.module";
import {APP_GUARD} from "@nestjs/core";
import {PoliciesGuard} from "@gitroom/backend/services/auth/permissions/permissions.guard";
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module';
@Global()
@Module({
imports: [DatabaseModule, ApiModule],
imports: [BullMqModule, DatabaseModule, ApiModule],
controllers: [],
providers: [{
provide: APP_GUARD,

View File

@ -2,18 +2,11 @@ import { Module } from '@nestjs/common';
import { CommandModule as ExternalCommandModule } from 'nestjs-command';
import { CheckStars } from './tasks/check.stars';
import { DatabaseModule } from '@gitroom/nestjs-libraries/database/prisma/database.module';
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module';
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
import {RefreshTokens} from "./tasks/refresh.tokens";
import { RefreshTokens } from './tasks/refresh.tokens';
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module';
@Module({
imports: [
ExternalCommandModule,
DatabaseModule,
BullMqModule.forRoot({
connection: ioRedis,
}),
],
imports: [ExternalCommandModule, DatabaseModule, BullMqModule],
controllers: [],
providers: [CheckStars, RefreshTokens],
get exports() {

View File

@ -1,51 +1,52 @@
import {Command, Positional} from 'nestjs-command';
import { Command, Positional } from 'nestjs-command';
import { Injectable } from '@nestjs/common';
import {BullMqClient} from "@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client";
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
@Injectable()
export class CheckStars {
constructor(
private _workerServiceProducer: BullMqClient
) {
}
@Command({
command: 'sync:stars <login>',
describe: 'Sync stars for a login',
constructor(private _workerServiceProducer: BullMqClient) {}
@Command({
command: 'sync:stars <login>',
describe: 'Sync stars for a login',
})
async create(
@Positional({
name: 'login',
describe: 'login {owner}/{repo}',
type: 'string',
})
async create(
@Positional({
name: 'login',
describe: 'login {owner}/{repo}',
type: 'string'
})
login: string,
) {
this._workerServiceProducer.emit('check_stars', {payload: {login}}).subscribe();
return true;
}
login: string
) {
this._workerServiceProducer
.emit('check_stars', { payload: { login } })
.subscribe();
return true;
}
@Command({
command: 'sync:all_stars <login>',
describe: 'Sync all stars for a login',
@Command({
command: 'sync:all_stars <login>',
describe: 'Sync all stars for a login',
})
async syncAllStars(
@Positional({
name: 'login',
describe: 'login {owner}/{repo}',
type: 'string',
})
async syncAllStars(
@Positional({
name: 'login',
describe: 'login {owner}/{repo}',
type: 'string'
})
login: string,
) {
this._workerServiceProducer.emit('sync_all_stars', {payload: {login}}).subscribe();
return true;
}
login: string
) {
this._workerServiceProducer
.emit('sync_all_stars', { payload: { login } })
.subscribe();
return true;
}
@Command({
command: 'sync:trending',
describe: 'Sync trending',
})
async syncTrending() {
this._workerServiceProducer.emit('sync_trending', {}).subscribe();
return true;
}
}
@Command({
command: 'sync:trending',
describe: 'Sync trending',
})
async syncTrending() {
this._workerServiceProducer.emit('sync_trending', {}).subscribe();
return true;
}
}

View File

@ -2,18 +2,11 @@ import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { CheckStars } from '@gitroom/cron/tasks/check.stars';
import { DatabaseModule } from '@gitroom/nestjs-libraries/database/prisma/database.module';
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module';
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
import { SyncTrending } from '@gitroom/cron/tasks/sync.trending';
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module';
@Module({
imports: [
DatabaseModule,
ScheduleModule.forRoot(),
BullMqModule.forRoot({
connection: ioRedis,
}),
],
imports: [DatabaseModule, ScheduleModule.forRoot(), BullMqModule],
controllers: [],
providers: [CheckStars, SyncTrending],
})

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { StarsService } from '@gitroom/nestjs-libraries/database/prisma/stars/stars.service';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
@Injectable()
export class CheckStars {

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
@Injectable()
export class SyncTrending {

View File

@ -2,15 +2,12 @@ import { Module } from '@nestjs/common';
import { StarsController } from './stars.controller';
import {DatabaseModule} from "@gitroom/nestjs-libraries/database/prisma/database.module";
import {BullMqModule} from "@gitroom/nestjs-libraries/bull-mq-transport/bull-mq.module";
import {ioRedis} from "@gitroom/nestjs-libraries/redis/redis.service";
import {TrendingService} from "@gitroom/nestjs-libraries/services/trending.service";
import {PostsController} from "@gitroom/workers/app/posts.controller";
import { BullMqModule } from '@gitroom/nestjs-libraries/bull-mq-transport-new/bull.mq.module';
@Module({
imports: [DatabaseModule, BullMqModule.forRoot({
connection: ioRedis
})],
imports: [DatabaseModule, BullMqModule],
controllers: [StarsController, PostsController],
providers: [TrendingService],
})

View File

@ -6,12 +6,13 @@ import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/po
export class PostsController {
constructor(private _postsService: PostsService) {}
@EventPattern('post', Transport.REDIS)
async checkStars(data: { id: string }) {
async post(data: { id: string }) {
console.log('proceccsing', data);
return this._postsService.post(data.id);
}
@EventPattern('submit', Transport.REDIS)
async submitOrderItemForPayout(data: { id: string, releaseURL: string }) {
async payout(data: { id: string, releaseURL: string }) {
return this._postsService.payout(data.id, data.releaseURL);
}
}

View File

@ -2,15 +2,12 @@ import {NestFactory} from '@nestjs/core';
import {AppModule} from './app/app.module';
import {MicroserviceOptions} from '@nestjs/microservices';
import {BullMqServer} from "@gitroom/nestjs-libraries/bull-mq-transport/server/bull-mq.server";
import { BullMqServer } from '@gitroom/nestjs-libraries/bull-mq-transport-new/strategy';
async function bootstrap() {
const load = await NestFactory.create(AppModule);
const strategy = load.get(BullMqServer);
// some comment again
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
strategy,
strategy: new BullMqServer()
});
await app.listen();

View File

@ -0,0 +1,9 @@
import { Global, Module } from '@nestjs/common';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
@Global()
@Module({
providers: [BullMqClient],
exports: [BullMqClient],
})
export class BullMqModule {}

View File

@ -0,0 +1,85 @@
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
import { Queue, QueueEvents } from 'bullmq';
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
import { v4 } from 'uuid';
import { Injectable } from '@nestjs/common';
@Injectable()
export class BullMqClient extends ClientProxy {
queues = new Map<string, Queue>();
queueEvents = new Map<string, QueueEvents>();
async connect(): Promise<any> {
return;
}
async close() {
return;
}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void
) {
// console.log('hello');
// this.publishAsync(packet, callback);
return () => console.log('sent');
}
delete(pattern: string, jobId: string) {
const queue = this.getQueue(pattern);
return queue.remove(jobId);
}
async publishAsync(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void
) {
const queue = this.getQueue(packet.pattern);
const queueEvents = this.getQueueEvents(packet.pattern);
const job = await queue.add(packet.pattern, packet.data, {
jobId: packet.data.id ?? v4(),
...packet.data.options,
removeOnComplete: true,
removeOnFail: true,
});
try {
await job.waitUntilFinished(queueEvents);
console.log('success');
callback({ response: job.returnvalue, isDisposed: true });
} catch (err) {
console.log('err');
callback({ err, isDisposed: true });
}
}
getQueueEvents(pattern: string) {
return (
this.queueEvents.get(pattern) ||
new QueueEvents(pattern, {
connection: ioRedis,
})
);
}
getQueue(pattern: string) {
return (
this.queues.get(pattern) ||
new Queue(pattern, {
connection: ioRedis,
})
);
}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
console.log('event to dispatch: ', packet);
const queue = this.getQueue(packet.pattern);
await queue.add(packet.pattern, packet.data, {
jobId: packet.data.id ?? v4(),
...packet.data.options,
removeOnComplete: true,
removeOnFail: true,
});
}
}

View File

@ -0,0 +1,59 @@
import { CustomTransportStrategy, Server } from '@nestjs/microservices';
import { Queue, Worker } from 'bullmq';
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
export class BullMqServer extends Server implements CustomTransportStrategy {
queues: Map<string, Queue>;
workers: Worker[] = [];
/**
* This method is triggered when you run "app.listen()".
*/
listen(callback: () => void) {
this.queues = [...this.messageHandlers.keys()].reduce((all, pattern) => {
all.set(pattern, new Queue(pattern, { connection: ioRedis }));
return all;
}, new Map());
this.workers = Array.from(this.messageHandlers).map(
([pattern, handler]) => {
return new Worker(
pattern,
async (job) => {
const stream$ = this.transformToObservable(
await handler(job.data.payload, job)
);
this.send(stream$, (packet) => {
if (packet.err) {
return job.discard();
}
return true;
});
},
{
connection: ioRedis,
removeOnComplete: {
count: 0,
},
removeOnFail: {
count: 0,
},
}
);
}
);
callback();
}
/**
* This method is triggered on application shutdown.
*/
close() {
this.workers.map((worker) => worker.close());
this.queues.forEach((queue) => queue.close());
return true;
}
}

View File

@ -1,89 +0,0 @@
import { DynamicModule, Module } from '@nestjs/common';
import { BullMqClient } from './client/bull-mq.client';
import { BULLMQ_MODULE_OPTIONS } from './constants/bull-mq.constants';
import { QueueEventsFactory } from './factories/queue-events.factory';
import { QueueFactory } from './factories/queue.factory';
import { WorkerFactory } from './factories/worker.factory';
import { IBullMqModuleOptionsAsync } from './interfaces/bull-mq-module-options-async.interface';
import { IBullMqModuleOptionsFactory } from './interfaces/bull-mq-module-options-factory.interface';
import { IBullMqModuleOptions } from './interfaces/bull-mq-module-options.interface';
import { BullMqServer } from './server/bull-mq.server';
@Module({})
export class BullMqCoreModule {
static forRoot(options: IBullMqModuleOptions): DynamicModule {
return {
module: BullMqCoreModule,
global: true,
providers: [
{ provide: BULLMQ_MODULE_OPTIONS, useValue: options },
QueueFactory,
QueueEventsFactory,
WorkerFactory,
BullMqServer,
BullMqClient,
],
exports: [BullMqServer, BullMqClient, BULLMQ_MODULE_OPTIONS],
};
}
static forRootAsync(options: IBullMqModuleOptionsAsync): DynamicModule {
return {
module: BullMqCoreModule,
global: true,
imports: options.imports ?? [],
providers: [
...(options.providers ?? []),
...this.createAsyncProviders(options),
QueueFactory,
QueueEventsFactory,
WorkerFactory,
BullMqServer,
BullMqClient,
],
exports: [BullMqServer, BullMqClient, BULLMQ_MODULE_OPTIONS],
};
}
private static createAsyncProviders(options: IBullMqModuleOptionsAsync) {
if (options.useExisting ?? options.useFactory) {
return [this.createAsyncOptionsProvider(options)];
}
if (options.useClass) {
return [
this.createAsyncOptionsProvider(options),
{ provide: options.useClass, useClass: options.useClass },
];
}
throw new Error(
'Invalid BullMqModule async options: one of `useClass`, `useExisting` or `useFactory` should be defined.',
);
}
private static createAsyncOptionsProvider(
options: IBullMqModuleOptionsAsync,
) {
if (options.useFactory) {
return {
provide: BULLMQ_MODULE_OPTIONS,
useFactory: options.useFactory,
inject: options.inject ?? [],
};
}
const inject: any[] = [];
if (options.useClass ?? options.useExisting) {
inject.push(options.useClass ?? options.useExisting);
}
return {
provide: BULLMQ_MODULE_OPTIONS,
useFactory: async (optionsFactory: IBullMqModuleOptionsFactory) =>
await optionsFactory.createModuleOptions(),
inject,
};
}
}

View File

@ -1,19 +0,0 @@
import { DynamicModule } from '@nestjs/common';
import { BullMqCoreModule } from './bull-mq-core.module';
import { IBullMqModuleOptionsAsync } from './interfaces/bull-mq-module-options-async.interface';
import { IBullMqModuleOptions } from './interfaces/bull-mq-module-options.interface';
export class BullMqModule {
static forRoot(options: IBullMqModuleOptions): DynamicModule {
return {
module: BullMqModule,
imports: [BullMqCoreModule.forRoot(options)],
};
}
static forRootAsync(options: IBullMqModuleOptionsAsync): DynamicModule {
return {
module: BullMqModule,
imports: [BullMqCoreModule.forRootAsync(options)],
};
}
}

View File

@ -1,112 +0,0 @@
import { Inject, Injectable } from '@nestjs/common';
import {
ClientProxy,
ReadPacket,
RpcException,
WritePacket,
} from '@nestjs/microservices';
import { Queue } from 'bullmq';
import { v4 } from 'uuid';
import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants';
import { QueueEventsFactory } from '../factories/queue-events.factory';
import { QueueFactory } from '../factories/queue.factory';
import { IBullMqEvent } from '../interfaces/bull-mq-event.interface';
import {IBullMqModuleOptions} from "@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface";
@Injectable()
export class BullMqClient extends ClientProxy {
constructor(
@Inject(BULLMQ_MODULE_OPTIONS)
private readonly options: IBullMqModuleOptions,
private readonly queueFactory: QueueFactory,
private readonly queueEventsFactory: QueueEventsFactory,
) {
super();
}
async connect(): Promise<void> {
return;
}
async close(): Promise<void> {
return;
}
protected publish(
packet: ReadPacket<IBullMqEvent<any>>,
callback: (packet: WritePacket<any>) => void,
): () => void {
const queue = this.getQueue(packet.pattern);
const events = this.queueEventsFactory.create(packet.pattern, {
connection: this.options.connection,
});
events.on('completed', (job) =>
callback({
response: job.returnvalue,
isDisposed: true,
}),
);
events.on('failed', async (jobInfo) => {
const job = await queue.getJob(jobInfo.jobId);
const err = new RpcException(jobInfo.failedReason);
err.stack = job?.stacktrace?.[0];
callback({
err,
isDisposed: true,
});
});
queue
.add(packet.pattern, packet.data, {
jobId: packet.data.id ?? v4(),
removeOnComplete: true,
removeOnFail: true,
...packet.data.options,
})
.then(async (job) => {
try {
await job.waitUntilFinished(events);
} catch {
// BullMq unnecessarily re-throws the error we're handling in
// waitUntilFinished(), so we ignore that here.
} finally {
await events.close();
await queue.close();
}
});
return () => void 0;
}
delay(pattern: string, jobId: string, delay: number) {
const queue = this.getQueue(pattern);
return queue.getJob(jobId).then((job) => job?.changeDelay(delay));
}
async delete(pattern: string, jobId: string) {
const queue = this.getQueue(pattern);
return queue.getJob(jobId).then((job) => job?.remove());
}
job(pattern: string, jobId: string) {
const queue = this.getQueue(pattern);
return queue.getJob(jobId);
}
protected async dispatchEvent(
packet: ReadPacket<IBullMqEvent<any>>,
): Promise<any> {
const queue = this.getQueue(packet.pattern);
console.log(packet);
await queue.add(packet.pattern, packet.data, {
jobId: packet.data.id ?? v4(),
...packet.data.options,
});
await queue.close();
}
protected getQueue(pattern: any): Queue {
const queue = this.queueFactory.create(pattern, {
connection: this.options.connection,
});
return queue;
}
}

View File

@ -1 +0,0 @@
export const BULLMQ_MODULE_OPTIONS = 'BULLMQ_MODULE_OPTIONS';

View File

@ -1,77 +0,0 @@
import {
ArgumentsHost,
Catch,
Inject,
InternalServerErrorException,
Logger,
LogLevel,
RpcExceptionFilter,
} from '@nestjs/common';
import { BaseExceptionFilter } from '@nestjs/core';
import { RpcException } from '@nestjs/microservices';
import { Observable, of, throwError } from 'rxjs';
import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants';
import {IBullMqModuleOptions} from "@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface";
@Catch(RpcException)
export class BullMqRpcExceptionFilter
extends BaseExceptionFilter
implements RpcExceptionFilter<RpcException>
{
private readonly logger = new Logger();
constructor(
@Inject(BULLMQ_MODULE_OPTIONS)
private readonly options: IBullMqModuleOptions,
) {
super();
}
override catch(exception: RpcException, host: ArgumentsHost): Observable<void> {
if (host.getType() === 'http') {
const err = new InternalServerErrorException(
exception.message,
exception.constructor.name,
);
if (exception.stack) {
err.stack = exception.stack;
}
this.logException(err, host);
return of(super.catch(err, host));
}
const err = {
name: exception.name,
error: exception.name,
message: exception.message,
stack: exception.stack || undefined,
};
this.logException(err, host);
return throwError(() => err);
}
logException(exception: Error, host: ArgumentsHost): void {
const defaultLogLevel: LogLevel = 'error';
switch (this.options.logExceptionsAsLevel ?? defaultLogLevel) {
case 'off':
return;
case 'log':
return this.logger.log(exception.stack, host.getType());
case 'error':
return this.logger.error(
exception.message,
exception.stack,
host.getType(),
);
case 'warn':
return this.logger.warn(exception.stack, host.getType());
case 'debug':
return this.logger.debug(exception.stack, host.getType());
case 'verbose':
return this.logger.verbose(exception.stack, host.getType());
}
}
}

View File

@ -1,5 +0,0 @@
import { RpcException } from '@nestjs/microservices';
export class BullMqRpcValidationException extends RpcException {
override name = this.constructor.name;
}

View File

@ -1,9 +0,0 @@
import { Injectable } from '@nestjs/common';
import { QueueEvents, QueueEventsOptions } from 'bullmq';
@Injectable()
export class QueueEventsFactory {
create(name: string, options?: QueueEventsOptions): QueueEvents {
return new QueueEvents(name, options);
}
}

View File

@ -1,9 +0,0 @@
import { Injectable } from '@nestjs/common';
import { Queue, QueueOptions } from 'bullmq';
@Injectable()
export class QueueFactory {
create(name: string, options?: QueueOptions): Queue {
return new Queue(name, options);
}
}

View File

@ -1,13 +0,0 @@
import { Injectable } from '@nestjs/common';
import { Processor, Worker, WorkerOptions } from 'bullmq';
@Injectable()
export class WorkerFactory {
create<T, R, N extends string>(
name: string,
processor?: string | Processor<T, R, N>,
opts?: WorkerOptions,
): Worker<T, R, N> {
return new Worker(name, processor, opts);
}
}

View File

@ -1,7 +0,0 @@
import { JobsOptions } from 'bullmq';
export interface IBullMqEvent<T> {
id?: string;
payload: T;
options?: JobsOptions;
}

View File

@ -1,14 +0,0 @@
import { Type } from '@nestjs/common';
import { IBullMqModuleOptionsFactory } from './bull-mq-module-options-factory.interface';
import { IBullMqModuleOptions } from './bull-mq-module-options.interface';
export interface IBullMqModuleOptionsAsync {
imports?: any[];
providers?: any[];
inject?: any[];
useClass?: Type<IBullMqModuleOptionsFactory>;
useExisting?: Type<IBullMqModuleOptionsFactory>;
useFactory?: (
...args: any[]
) => IBullMqModuleOptions | Promise<IBullMqModuleOptions>;
}

View File

@ -1,5 +0,0 @@
import { IBullMqModuleOptions } from './bull-mq-module-options.interface';
export interface IBullMqModuleOptionsFactory {
createModuleOptions(): IBullMqModuleOptions | Promise<IBullMqModuleOptions>;
}

View File

@ -1,6 +0,0 @@
import { LogLevel } from '@nestjs/common';
import { WorkerOptions } from 'bullmq';
export interface IBullMqModuleOptions extends WorkerOptions {
logExceptionsAsLevel?: LogLevel | 'off';
}

View File

@ -1,19 +0,0 @@
import {
ValidationError,
ValidationPipe,
ValidationPipeOptions,
} from '@nestjs/common';
import {BullMqRpcValidationException} from "@gitroom/nestjs-libraries/bull-mq-transport/exceptions/bull-mq-rpc-validation.exception";
export class BullMqRpcValidationPipe extends ValidationPipe {
constructor(options?: ValidationPipeOptions) {
super({
exceptionFactory: (
errors: ValidationError[],
): BullMqRpcValidationException => {
return new BullMqRpcValidationException(errors.toString());
},
...options,
});
}
}

View File

@ -1,67 +0,0 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import {
CustomTransportStrategy,
Server,
Transport,
} from '@nestjs/microservices';
import { Job, Worker } from 'bullmq';
import { BULLMQ_MODULE_OPTIONS } from '../constants/bull-mq.constants';
import { WorkerFactory } from '../factories/worker.factory';
import { IBullMqModuleOptions } from '@gitroom/nestjs-libraries/bull-mq-transport/interfaces/bull-mq-module-options.interface';
@Injectable()
export class BullMqServer extends Server implements CustomTransportStrategy {
transportId = Transport.REDIS;
protected override readonly logger = new Logger(this.constructor.name);
protected readonly workers = new Map<string, Worker>();
constructor(
@Inject(BULLMQ_MODULE_OPTIONS)
private readonly options: IBullMqModuleOptions,
private readonly workerFactory: WorkerFactory
) {
super();
this.initializeSerializer(this.serializer);
this.initializeDeserializer(this.deserializer);
}
listen(callback: (...optionalParams: unknown[]) => void) {
for (const [pattern, handler] of this.messageHandlers) {
if (pattern && handler && !this.workers.has(pattern)) {
const worker = this.workerFactory.create(
pattern,
(job: Job) => {
// eslint-disable-next-line no-async-promise-executor
return new Promise<unknown>(async (resolve, reject) => {
const stream$ = this.transformToObservable(
await handler(job.data.payload, job)
);
this.send(stream$, (packet) => {
if (packet.err) {
return reject(packet.err);
}
resolve(packet.response);
});
});
},
{
...this.options,
...{ removeOnComplete: { count: 0 }, removeOnFail: { count: 0 } },
...handler?.extras,
}
);
this.workers.set(pattern, worker);
this.logger.log(`Registered queue "${pattern}"`);
}
}
callback();
}
async close() {
for (const worker of this.workers.values()) {
await worker.close();
}
}
}

View File

@ -4,10 +4,10 @@ import { NewConversationDto } from '@gitroom/nestjs-libraries/dtos/marketplace/n
import { AddMessageDto } from '@gitroom/nestjs-libraries/dtos/messages/add.message';
import { CreateOfferDto } from '@gitroom/nestjs-libraries/dtos/marketplace/create.offer.dto';
import { From, OrderStatus, User } from '@prisma/client';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client';
import { OrganizationRepository } from '@gitroom/nestjs-libraries/database/prisma/organizations/organization.repository';
import { NotificationService } from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service';
import dayjs from 'dayjs';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
@Injectable()
export class MessagesService {

View File

@ -1,13 +1,12 @@
import { Injectable } from '@nestjs/common';
import { PostsRepository } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.repository';
import { CreatePostDto } from '@gitroom/nestjs-libraries/dtos/posts/create.post.dto';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client';
import dayjs from 'dayjs';
import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager';
import { Integration, Post, Media, From } from '@prisma/client';
import { GetPostsDto } from '@gitroom/nestjs-libraries/dtos/posts/get.posts.dto';
import { NotificationService } from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service';
import { capitalize, chunk, shuffle } from 'lodash';
import { capitalize, shuffle } from 'lodash';
import { MessagesService } from '@gitroom/nestjs-libraries/database/prisma/marketplace/messages.service';
import { StripeService } from '@gitroom/nestjs-libraries/services/stripe.service';
import { GeneratorDto } from '@gitroom/nestjs-libraries/dtos/generator/generator.dto';
@ -17,6 +16,7 @@ import { CreateGeneratedPostsDto } from '@gitroom/nestjs-libraries/dtos/generato
import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service';
import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
import { RefreshToken } from '@gitroom/nestjs-libraries/integrations/social.abstract';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
type PostWithConditionals = Post & {
integration?: Integration;

View File

@ -4,8 +4,8 @@ import { chunk, groupBy } from 'lodash';
import dayjs from 'dayjs';
import { NotificationService } from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service';
import { StarsListDto } from '@gitroom/nestjs-libraries/dtos/analytics/stars.list.dto';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport/client/bull-mq.client';
import { mean } from 'simple-statistics';
import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client';
enum Inform {
Removed,
New,

8
package-lock.json generated
View File

@ -62,7 +62,7 @@
"axios": "1.6.7",
"bcrypt": "^5.1.1",
"bufferutil": "^4.0.8",
"bullmq": "^5.1.5",
"bullmq": "^5.12.12",
"cache-manager-redis-store": "^2.0.0",
"chart.js": "^4.4.1",
"class-transformer": "^0.5.1",
@ -17182,9 +17182,9 @@
}
},
"node_modules/bullmq": {
"version": "5.8.3",
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.8.3.tgz",
"integrity": "sha512-RJgQu/vgSZqjOYrZ7F1UJsSAzveNx7FFpR3Tp/1TxOMXXN9TtZMSly5MT+vjzOhQX//3+YWNRbMWpC1mkqBc9w==",
"version": "5.12.12",
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.12.12.tgz",
"integrity": "sha512-xrWKDj1ZwnGKmrlmFqF6Vmub3WqDFfdBcIRLCooIs5+jeVzbHK7/1usgYSFg2pZiwK6h6eMivTb9WvcKkNW/+w==",
"dependencies": {
"cron-parser": "^4.6.0",
"ioredis": "^5.4.1",

View File

@ -66,7 +66,7 @@
"axios": "1.6.7",
"bcrypt": "^5.1.1",
"bufferutil": "^4.0.8",
"bullmq": "^5.1.5",
"bullmq": "^5.12.12",
"cache-manager-redis-store": "^2.0.0",
"chart.js": "^4.4.1",
"class-transformer": "^0.5.1",