postiz/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts

727 lines
20 KiB
TypeScript

import {
BadRequestException,
Injectable,
ValidationPipe,
} from '@nestjs/common';
import { PostsRepository } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.repository';
import { CreatePostDto } from '@gitroom/nestjs-libraries/dtos/posts/create.post.dto';
import dayjs from 'dayjs';
import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager';
import { Integration, Post, Media, From, State } from '@prisma/client';
import { GetPostsDto } from '@gitroom/nestjs-libraries/dtos/posts/get.posts.dto';
import { shuffle } from 'lodash';
import { CreateGeneratedPostsDto } from '@gitroom/nestjs-libraries/dtos/generator/create.generated.posts.dto';
import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service';
import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
import utc from 'dayjs/plugin/utc';
import { MediaService } from '@gitroom/nestjs-libraries/database/prisma/media/media.service';
import { ShortLinkService } from '@gitroom/nestjs-libraries/short-linking/short.link.service';
import { CreateTagDto } from '@gitroom/nestjs-libraries/dtos/posts/create.tag.dto';
import axios from 'axios';
import sharp from 'sharp';
import { UploadFactory } from '@gitroom/nestjs-libraries/upload/upload.factory';
import { Readable } from 'stream';
import { OpenaiService } from '@gitroom/nestjs-libraries/openai/openai.service';
dayjs.extend(utc);
import * as Sentry from '@sentry/nestjs';
import { TemporalService } from 'nestjs-temporal-core';
import { TypedSearchAttributes } from '@temporalio/common';
import {
organizationId,
postId as postIdSearchParam,
} from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
type PostWithConditionals = Post & {
integration?: Integration;
childrenPost: Post[];
};
@Injectable()
export class PostsService {
private storage = UploadFactory.createStorage();
constructor(
private _postRepository: PostsRepository,
private _integrationManager: IntegrationManager,
private _integrationService: IntegrationService,
private _mediaService: MediaService,
private _shortLinkService: ShortLinkService,
private _openaiService: OpenaiService,
private _temporalService: TemporalService
) {}
searchForMissingThreeHoursPosts() {
return this._postRepository.searchForMissingThreeHoursPosts();
}
updatePost(id: string, postId: string, releaseURL: string) {
return this._postRepository.updatePost(id, postId, releaseURL);
}
async getStatistics(orgId: string, id: string) {
const getPost = await this.getPostsRecursively(id, true, orgId, true);
const content = getPost.map((p) => p.content);
const shortLinksTracking = await this._shortLinkService.getStatistics(
content
);
return {
clicks: shortLinksTracking,
};
}
async mapTypeToPost(
body: CreatePostDto,
organization: string,
replaceDraft: boolean = false
): Promise<CreatePostDto> {
if (!body?.posts?.every((p) => p?.integration?.id)) {
throw new BadRequestException('All posts must have an integration id');
}
const mappedValues = {
...body,
type: replaceDraft ? 'schedule' : body.type,
posts: await Promise.all(
body.posts.map(async (post) => {
const integration = await this._integrationService.getIntegrationById(
organization,
post.integration.id
);
if (!integration) {
throw new BadRequestException(
`Integration with id ${post.integration.id} not found`
);
}
return {
...post,
settings: {
...(post.settings || ({} as any)),
__type: integration.providerIdentifier,
},
};
})
),
};
const validationPipe = new ValidationPipe({
skipMissingProperties: false,
transform: true,
transformOptions: {
enableImplicitConversion: true,
},
});
return await validationPipe.transform(mappedValues, {
type: 'body',
metatype: CreatePostDto,
});
}
async getPostsRecursively(
id: string,
includeIntegration = false,
orgId?: string,
isFirst?: boolean
): Promise<PostWithConditionals[]> {
const post = await this._postRepository.getPost(
id,
includeIntegration,
orgId,
isFirst
);
if (!post) {
return [];
}
return [
post!,
...(post?.childrenPost?.length
? await this.getPostsRecursively(
post?.childrenPost?.[0]?.id,
false,
orgId,
false
)
: []),
];
}
async getPosts(orgId: string, query: GetPostsDto) {
return this._postRepository.getPosts(orgId, query);
}
async updateMedia(id: string, imagesList: any[], convertToJPEG = false) {
try {
let imageUpdateNeeded = false;
const getImageList = await Promise.all(
(
await Promise.all(
(imagesList || []).map(async (p: any) => {
if (!p.path && p.id) {
imageUpdateNeeded = true;
return this._mediaService.getMediaById(p.id);
}
return p;
})
)
)
.map((m) => {
return {
...m,
url:
m.path.indexOf('http') === -1
? process.env.FRONTEND_URL +
'/' +
process.env.NEXT_PUBLIC_UPLOAD_STATIC_DIRECTORY +
m.path
: m.path,
type: 'image',
path:
m.path.indexOf('http') === -1
? process.env.UPLOAD_DIRECTORY + m.path
: m.path,
};
})
.map(async (m) => {
if (!convertToJPEG) {
return m;
}
if (m.path.indexOf('.png') > -1) {
imageUpdateNeeded = true;
const response = await axios.get(m.url, {
responseType: 'arraybuffer',
});
const imageBuffer = Buffer.from(response.data);
// Use sharp to get the metadata of the image
const buffer = await sharp(imageBuffer)
.jpeg({ quality: 100 })
.toBuffer();
const { path, originalname } = await this.storage.uploadFile({
buffer,
mimetype: 'image/jpeg',
size: buffer.length,
path: '',
fieldname: '',
destination: '',
stream: new Readable(),
filename: '',
originalname: '',
encoding: '',
});
return {
...m,
name: originalname,
url:
path.indexOf('http') === -1
? process.env.FRONTEND_URL +
'/' +
process.env.NEXT_PUBLIC_UPLOAD_STATIC_DIRECTORY +
path
: path,
type: 'image',
path:
path.indexOf('http') === -1
? process.env.UPLOAD_DIRECTORY + path
: path,
};
}
return m;
})
);
if (imageUpdateNeeded) {
await this._postRepository.updateImages(
id,
JSON.stringify(getImageList)
);
}
return getImageList;
} catch (err: any) {
return imagesList;
}
}
async getPost(orgId: string, id: string, convertToJPEG = false) {
const posts = await this.getPostsRecursively(id, true, orgId, true);
const list = {
group: posts?.[0]?.group,
posts: await Promise.all(
(posts || []).map(async (post) => ({
...post,
image: await this.updateMedia(
post.id,
JSON.parse(post.image || '[]'),
convertToJPEG
),
}))
),
integrationPicture: posts[0]?.integration?.picture,
integration: posts[0].integrationId,
settings: JSON.parse(posts[0].settings || '{}'),
};
return list;
}
async getOldPosts(orgId: string, date: string) {
return this._postRepository.getOldPosts(orgId, date);
}
public async updateTags(orgId: string, post: Post[]): Promise<Post[]> {
const plainText = JSON.stringify(post);
const extract = Array.from(
plainText.match(/\(post:[a-zA-Z0-9-_]+\)/g) || []
);
if (!extract.length) {
return post;
}
const ids = (extract || []).map((e) =>
e.replace('(post:', '').replace(')', '')
);
const urls = await this._postRepository.getPostUrls(orgId, ids);
const newPlainText = ids.reduce((acc, value) => {
const findUrl = urls?.find?.((u) => u.id === value)?.releaseURL || '';
return acc.replace(
new RegExp(`\\(post:${value}\\)`, 'g'),
findUrl.split(',')[0]
);
}, plainText);
return this.updateTags(orgId, JSON.parse(newPlainText) as Post[]);
}
public async checkInternalPlug(
integration: Integration,
orgId: string,
id: string,
settings: any
) {
const plugs = Object.entries(settings).filter(([key]) => {
return key.indexOf('plug-') > -1;
});
if (plugs.length === 0) {
return [];
}
const parsePlugs = plugs.reduce((all, [key, value]) => {
const [_, name, identifier] = key.split('--');
all[name] = all[name] || { name };
all[name][identifier] = value;
return all;
}, {} as any);
const list: {
name: string;
integrations: { id: string }[];
delay: string;
active: boolean;
}[] = Object.values(parsePlugs);
return (list || []).flatMap((trigger) => {
return (trigger?.integrations || []).flatMap((int) => ({
type: 'internal-plug',
post: id,
originalIntegration: integration.id,
integration: int.id,
plugName: trigger.name,
orgId: orgId,
delay: +trigger.delay,
information: trigger,
}));
});
}
public async checkPlugs(
orgId: string,
providerName: string,
integrationId: string
) {
const loadAllPlugs = this._integrationManager.getAllPlugs();
const getPlugs = await this._integrationService.getPlugs(
orgId,
integrationId
);
const currentPlug = loadAllPlugs.find((p) => p.identifier === providerName);
return getPlugs
.filter((plug) => {
return currentPlug?.plugs?.some(
(p: any) => p.methodName === plug.plugFunction
);
})
.map((plug) => {
const runPlug = currentPlug?.plugs?.find(
(p: any) => p.methodName === plug.plugFunction
)!;
return {
type: 'global',
plugId: plug.id,
delay: runPlug.runEveryMilliseconds,
totalRuns: runPlug.totalRuns,
};
});
}
async deletePost(orgId: string, group: string) {
const post = await this._postRepository.deletePost(orgId, group);
if (post?.id) {
try {
const workflows = this._temporalService.client
.getRawClient()
?.workflow.list({
query: `WorkflowType="postWorkflow" AND postId="${post.id}" AND ExecutionStatus="Running"`,
});
for await (const executionInfo of workflows) {
try {
const workflow =
await this._temporalService.client.getWorkflowHandle(
executionInfo.workflowId
);
if (
workflow &&
(await workflow.describe()).status.name !== 'TERMINATED'
) {
await workflow.terminate();
}
} catch (err) {}
}
} catch (err) {}
}
return { error: true };
}
async countPostsFromDay(orgId: string, date: Date) {
return this._postRepository.countPostsFromDay(orgId, date);
}
getPostByForWebhookId(id: string) {
return this._postRepository.getPostByForWebhookId(id);
}
async createPost(orgId: string, body: CreatePostDto): Promise<any[]> {
const postList = [];
for (const post of body.posts) {
const messages = (post.value || []).map((p) => p.content);
const updateContent = !body.shortLink
? messages
: await this._shortLinkService.convertTextToShortLinks(orgId, messages);
post.value = (post.value || []).map((p, i) => ({
...p,
content: updateContent[i],
}));
const { posts } = await this._postRepository.createOrUpdatePost(
body.type,
orgId,
body.type === 'now' ? dayjs().format('YYYY-MM-DDTHH:mm:00') : body.date,
post,
body.tags,
body.inter
);
if (!posts?.length) {
return [] as any[];
}
try {
const workflows = this._temporalService.client
.getRawClient()
?.workflow.list({
query: `WorkflowType="postWorkflow" AND postId="${posts[0].id}" AND ExecutionStatus="Running"`,
});
for await (const executionInfo of workflows) {
try {
const workflow =
await this._temporalService.client.getWorkflowHandle(
executionInfo.workflowId
);
if (
workflow &&
(await workflow.describe()).status.name !== 'TERMINATED'
) {
await workflow.terminate();
}
} catch (err) {}
}
} catch (err) {}
await this._temporalService.client
.getRawClient()
?.workflow.start('postWorkflowV101', {
workflowId: `post_${posts[0].id}`,
taskQueue: 'main',
args: [
{
taskQueue: post.settings.__type.split('-')[0].toLowerCase(),
postId: posts[0].id,
organizationId: orgId,
},
],
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: posts[0].id,
},
{
key: organizationId,
value: orgId,
},
]),
});
Sentry.metrics.count('post_created', 1);
postList.push({
postId: posts[0].id,
integration: post.integration.id,
});
}
return postList;
}
async separatePosts(content: string, len: number) {
return this._openaiService.separatePosts(content, len);
}
async changeState(id: string, state: State, err?: any, body?: any) {
return this._postRepository.changeState(id, state, err, body);
}
async changeDate(orgId: string, id: string, date: string) {
const getPostById = await this._postRepository.getPostById(id, orgId);
const newDate = await this._postRepository.changeDate(orgId, id, date);
try {
const workflows = this._temporalService.client
.getRawClient()
?.workflow.list({
query: `WorkflowType="postWorkflow" AND postId="${getPostById.id}" AND ExecutionStatus="Running"`,
});
for await (const executionInfo of workflows) {
try {
const workflow = await this._temporalService.client.getWorkflowHandle(
executionInfo.workflowId
);
if (
workflow &&
(await workflow.describe()).status.name !== 'TERMINATED'
) {
await workflow.terminate();
}
} catch (err) {}
}
} catch (err) {}
await this._temporalService.client
.getRawClient()
?.workflow.start('postWorkflowV101', {
workflowId: `post_${getPostById.id}`,
taskQueue: 'main',
args: [
{
taskQueue: getPostById.integration.providerIdentifier
.split('-')[0]
.toLowerCase(),
postId: getPostById.id,
organizationId: orgId,
},
],
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: getPostById.id,
},
{
key: organizationId,
value: orgId,
},
]),
});
return newDate;
}
async generatePostsDraft(orgId: string, body: CreateGeneratedPostsDto) {
const getAllIntegrations = (
await this._integrationService.getIntegrationsList(orgId)
).filter((f) => !f.disabled && f.providerIdentifier !== 'reddit');
// const posts = chunk(body.posts, getAllIntegrations.length);
const allDates = dayjs()
.isoWeek(body.week)
.year(body.year)
.startOf('isoWeek');
const dates = [...new Array(7)].map((_, i) => {
return allDates.add(i, 'day').format('YYYY-MM-DD');
});
const findTime = (): string => {
const totalMinutes = Math.floor(Math.random() * 144) * 10;
// Convert total minutes to hours and minutes
const hours = Math.floor(totalMinutes / 60);
const minutes = totalMinutes % 60;
// Format hours and minutes to always be two digits
const formattedHours = hours.toString().padStart(2, '0');
const formattedMinutes = minutes.toString().padStart(2, '0');
const randomDate =
shuffle(dates)[0] + 'T' + `${formattedHours}:${formattedMinutes}:00`;
if (dayjs(randomDate).isBefore(dayjs())) {
return findTime();
}
return randomDate;
};
for (const integration of getAllIntegrations) {
for (const toPost of body.posts) {
const group = makeId(10);
const randomDate = findTime();
await this.createPost(orgId, {
type: 'draft',
date: randomDate,
order: '',
shortLink: false,
tags: [],
posts: [
{
group,
integration: {
id: integration.id,
},
settings: {
__type: integration.providerIdentifier as any,
title: '',
tags: [],
subreddit: [],
},
value: [
...toPost.list.map((l) => ({
id: '',
content: l.post,
delay: 0,
image: [],
})),
{
id: '',
delay: 0,
content: `Check out the full story here:\n${
body.postId || body.url
}`,
image: [],
},
],
},
],
});
}
}
}
findAllExistingCategories() {
return this._postRepository.findAllExistingCategories();
}
findAllExistingTopicsOfCategory(category: string) {
return this._postRepository.findAllExistingTopicsOfCategory(category);
}
findPopularPosts(category: string, topic?: string) {
return this._postRepository.findPopularPosts(category, topic);
}
async findFreeDateTime(orgId: string, integrationId?: string) {
const findTimes = await this._integrationService.findFreeDateTime(
orgId,
integrationId
);
return this.findFreeDateTimeRecursive(
orgId,
findTimes,
dayjs.utc().startOf('day')
);
}
async createPopularPosts(post: {
category: string;
topic: string;
content: string;
hook: string;
}) {
return this._postRepository.createPopularPosts(post);
}
private async findFreeDateTimeRecursive(
orgId: string,
times: number[],
date: dayjs.Dayjs
): Promise<string> {
const list = await this._postRepository.getPostsCountsByDates(
orgId,
times,
date
);
if (!list.length) {
return this.findFreeDateTimeRecursive(orgId, times, date.add(1, 'day'));
}
const num = list.reduce<null | number>((prev, curr) => {
if (prev === null || prev > curr) {
return curr;
}
return prev;
}, null) as number;
return date.clone().add(num, 'minutes').format('YYYY-MM-DDTHH:mm:00');
}
getComments(postId: string) {
return this._postRepository.getComments(postId);
}
getTags(orgId: string) {
return this._postRepository.getTags(orgId);
}
createTag(orgId: string, body: CreateTagDto) {
return this._postRepository.createTag(orgId, body);
}
editTag(id: string, orgId: string, body: CreateTagDto) {
return this._postRepository.editTag(id, orgId, body);
}
createComment(
orgId: string,
userId: string,
postId: string,
comment: string
) {
return this._postRepository.createComment(orgId, userId, postId, comment);
}
}