import { BadRequestException, Injectable, ValidationPipe, } from '@nestjs/common'; import { PostsRepository } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.repository'; import { CreatePostDto } from '@gitroom/nestjs-libraries/dtos/posts/create.post.dto'; import dayjs from 'dayjs'; import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager'; import { Integration, Post, Media, From, State } from '@prisma/client'; import { GetPostsDto } from '@gitroom/nestjs-libraries/dtos/posts/get.posts.dto'; import { shuffle } from 'lodash'; import { CreateGeneratedPostsDto } from '@gitroom/nestjs-libraries/dtos/generator/create.generated.posts.dto'; import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service'; import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; import utc from 'dayjs/plugin/utc'; import { MediaService } from '@gitroom/nestjs-libraries/database/prisma/media/media.service'; import { ShortLinkService } from '@gitroom/nestjs-libraries/short-linking/short.link.service'; import { CreateTagDto } from '@gitroom/nestjs-libraries/dtos/posts/create.tag.dto'; import axios from 'axios'; import sharp from 'sharp'; import { UploadFactory } from '@gitroom/nestjs-libraries/upload/upload.factory'; import { Readable } from 'stream'; import { OpenaiService } from '@gitroom/nestjs-libraries/openai/openai.service'; dayjs.extend(utc); import * as Sentry from '@sentry/nestjs'; import { TemporalService } from 'nestjs-temporal-core'; import { TypedSearchAttributes } from '@temporalio/common'; import { organizationId, postId as postIdSearchParam, } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute'; import { AnalyticsData } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface'; import { timer } from '@gitroom/helpers/utils/timer'; import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service'; import { RefreshToken } from '@gitroom/nestjs-libraries/integrations/social.abstract'; import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integrations/refresh.integration.service'; type PostWithConditionals = Post & { integration?: Integration; childrenPost: Post[]; }; @Injectable() export class PostsService { private storage = UploadFactory.createStorage(); constructor( private _postRepository: PostsRepository, private _integrationManager: IntegrationManager, private _integrationService: IntegrationService, private _mediaService: MediaService, private _shortLinkService: ShortLinkService, private _openaiService: OpenaiService, private _temporalService: TemporalService, private _refreshIntegrationService: RefreshIntegrationService ) {} searchForMissingThreeHoursPosts() { return this._postRepository.searchForMissingThreeHoursPosts(); } updatePost(id: string, postId: string, releaseURL: string) { return this._postRepository.updatePost(id, postId, releaseURL); } async checkPostAnalytics( orgId: string, postId: string, date: number, forceRefresh = false ): Promise { const post = await this._postRepository.getPostById(postId, orgId); if (!post || !post.releaseId) { return []; } const integrationProvider = this._integrationManager.getSocialIntegration( post.integration.providerIdentifier ); if (!integrationProvider.postAnalytics) { return []; } const getIntegration = post.integration!; if ( dayjs(getIntegration?.tokenExpiration).isBefore(dayjs()) || forceRefresh ) { const data = await this._refreshIntegrationService.refresh( getIntegration ); if (!data) { return []; } const { accessToken } = data; if (accessToken) { getIntegration.token = accessToken; if (integrationProvider.refreshWait) { await timer(10000); } } else { await this._integrationService.disconnectChannel(orgId, getIntegration); return []; } } const getIntegrationData = await ioRedis.get( `integration:${orgId}:${post.id}:${date}` ); if (getIntegrationData) { return JSON.parse(getIntegrationData); } try { const loadAnalytics = await integrationProvider.postAnalytics( getIntegration.internalId, getIntegration.token, post.releaseId, date ); await ioRedis.set( `integration:${orgId}:${post.id}:${date}`, JSON.stringify(loadAnalytics), 'EX', !process.env.NODE_ENV || process.env.NODE_ENV === 'development' ? 1 : 3600 ); return loadAnalytics; } catch (e) { console.log(e); if (e instanceof RefreshToken) { return this.checkPostAnalytics(orgId, postId, date, true); } } return []; } async getStatistics(orgId: string, id: string) { const getPost = await this.getPostsRecursively(id, true, orgId, true); const content = getPost.map((p) => p.content); const shortLinksTracking = await this._shortLinkService.getStatistics( content ); return { clicks: shortLinksTracking, }; } async mapTypeToPost( body: CreatePostDto, organization: string, replaceDraft: boolean = false ): Promise { if (!body?.posts?.every((p) => p?.integration?.id)) { throw new BadRequestException('All posts must have an integration id'); } const mappedValues = { ...body, type: replaceDraft ? 'schedule' : body.type, posts: await Promise.all( body.posts.map(async (post) => { const integration = await this._integrationService.getIntegrationById( organization, post.integration.id ); if (!integration) { throw new BadRequestException( `Integration with id ${post.integration.id} not found` ); } return { ...post, settings: { ...(post.settings || ({} as any)), __type: integration.providerIdentifier, }, }; }) ), }; const validationPipe = new ValidationPipe({ skipMissingProperties: false, transform: true, transformOptions: { enableImplicitConversion: true, }, }); return await validationPipe.transform(mappedValues, { type: 'body', metatype: CreatePostDto, }); } async getPostsRecursively( id: string, includeIntegration = false, orgId?: string, isFirst?: boolean ): Promise { const post = await this._postRepository.getPost( id, includeIntegration, orgId, isFirst ); if (!post) { return []; } return [ post!, ...(post?.childrenPost?.length ? await this.getPostsRecursively( post?.childrenPost?.[0]?.id, false, orgId, false ) : []), ]; } async getPosts(orgId: string, query: GetPostsDto) { return this._postRepository.getPosts(orgId, query); } async updateMedia(id: string, imagesList: any[], convertToJPEG = false) { try { let imageUpdateNeeded = false; const getImageList = await Promise.all( ( await Promise.all( (imagesList || []).map(async (p: any) => { if (!p.path && p.id) { imageUpdateNeeded = true; return this._mediaService.getMediaById(p.id); } return p; }) ) ) .map((m) => { return { ...m, url: m.path.indexOf('http') === -1 ? process.env.FRONTEND_URL + '/' + process.env.NEXT_PUBLIC_UPLOAD_STATIC_DIRECTORY + m.path : m.path, type: 'image', path: m.path.indexOf('http') === -1 ? process.env.UPLOAD_DIRECTORY + m.path : m.path, }; }) .map(async (m) => { if (!convertToJPEG) { return m; } if (m.path.indexOf('.png') > -1) { imageUpdateNeeded = true; const response = await axios.get(m.url, { responseType: 'arraybuffer', }); const imageBuffer = Buffer.from(response.data); // Use sharp to get the metadata of the image const buffer = await sharp(imageBuffer) .jpeg({ quality: 100 }) .toBuffer(); const { path, originalname } = await this.storage.uploadFile({ buffer, mimetype: 'image/jpeg', size: buffer.length, path: '', fieldname: '', destination: '', stream: new Readable(), filename: '', originalname: '', encoding: '', }); return { ...m, name: originalname, url: path.indexOf('http') === -1 ? process.env.FRONTEND_URL + '/' + process.env.NEXT_PUBLIC_UPLOAD_STATIC_DIRECTORY + path : path, type: 'image', path: path.indexOf('http') === -1 ? process.env.UPLOAD_DIRECTORY + path : path, }; } return m; }) ); if (imageUpdateNeeded) { await this._postRepository.updateImages( id, JSON.stringify(getImageList) ); } return getImageList; } catch (err: any) { return imagesList; } } async getPostsByGroup(orgId: string, group: string) { const convertToJPEG = false; const loadAll = await this._postRepository.getPostsByGroup(orgId, group); const posts = this.arrangePostsByGroup(loadAll, undefined); return { group: posts?.[0]?.group, posts: await Promise.all( (posts || []).map(async (post) => ({ ...post, image: await this.updateMedia( post.id, JSON.parse(post.image || '[]'), convertToJPEG ), })) ), integrationPicture: posts[0]?.integration?.picture, integration: posts[0].integrationId, settings: JSON.parse(posts[0].settings || '{}'), }; } arrangePostsByGroup(all: any, parent?: string): PostWithConditionals[] { const findAll = all .filter((p: any) => !parent ? !p.parentPostId : p.parentPostId === parent ) .map(({ integration, ...all }: any) => ({ ...all, ...(!parent ? { integration } : {}), })); return [ ...findAll, ...(findAll.length ? findAll.flatMap((p: any) => this.arrangePostsByGroup(all, p.id)) : []), ]; } async getPost(orgId: string, id: string, convertToJPEG = false) { const posts = await this.getPostsRecursively(id, true, orgId, true); const list = { group: posts?.[0]?.group, posts: await Promise.all( (posts || []).map(async (post) => ({ ...post, image: await this.updateMedia( post.id, JSON.parse(post.image || '[]'), convertToJPEG ), })) ), integrationPicture: posts[0]?.integration?.picture, integration: posts[0].integrationId, settings: JSON.parse(posts[0].settings || '{}'), }; return list; } async getOldPosts(orgId: string, date: string) { return this._postRepository.getOldPosts(orgId, date); } public async updateTags(orgId: string, post: Post[]): Promise { const plainText = JSON.stringify(post); const extract = Array.from( plainText.match(/\(post:[a-zA-Z0-9-_]+\)/g) || [] ); if (!extract.length) { return post; } const ids = (extract || []).map((e) => e.replace('(post:', '').replace(')', '') ); const urls = await this._postRepository.getPostUrls(orgId, ids); const newPlainText = ids.reduce((acc, value) => { const findUrl = urls?.find?.((u) => u.id === value)?.releaseURL || ''; return acc.replace( new RegExp(`\\(post:${value}\\)`, 'g'), findUrl.split(',')[0] ); }, plainText); return this.updateTags(orgId, JSON.parse(newPlainText) as Post[]); } public async checkInternalPlug( integration: Integration, orgId: string, id: string, settings: any ) { const plugs = Object.entries(settings).filter(([key]) => { return key.indexOf('plug-') > -1; }); if (plugs.length === 0) { return []; } const parsePlugs = plugs.reduce((all, [key, value]) => { const [_, name, identifier] = key.split('--'); all[name] = all[name] || { name }; all[name][identifier] = value; return all; }, {} as any); const list: { name: string; integrations: { id: string }[]; delay: string; active: boolean; }[] = Object.values(parsePlugs); return (list || []).flatMap((trigger) => { return (trigger?.integrations || []).flatMap((int) => ({ type: 'internal-plug', post: id, originalIntegration: integration.id, integration: int.id, plugName: trigger.name, orgId: orgId, delay: +trigger.delay, information: trigger, })); }); } public async checkPlugs( orgId: string, providerName: string, integrationId: string ) { const loadAllPlugs = this._integrationManager.getAllPlugs(); const getPlugs = await this._integrationService.getPlugs( orgId, integrationId ); const currentPlug = loadAllPlugs.find((p) => p.identifier === providerName); return getPlugs .filter((plug) => { return currentPlug?.plugs?.some( (p: any) => p.methodName === plug.plugFunction ); }) .map((plug) => { const runPlug = currentPlug?.plugs?.find( (p: any) => p.methodName === plug.plugFunction )!; return { type: 'global', plugId: plug.id, delay: runPlug.runEveryMilliseconds, totalRuns: runPlug.totalRuns, }; }); } async deletePost(orgId: string, group: string) { const post = await this._postRepository.deletePost(orgId, group); if (post?.id) { try { const workflows = this._temporalService.client .getRawClient() ?.workflow.list({ query: `postId="${post.id}" AND ExecutionStatus="Running"`, }); for await (const executionInfo of workflows) { try { const workflow = await this._temporalService.client.getWorkflowHandle( executionInfo.workflowId ); if ( workflow && (await workflow.describe()).status.name !== 'TERMINATED' ) { await workflow.terminate(); } } catch (err) {} } } catch (err) {} } return { error: true }; } async countPostsFromDay(orgId: string, date: Date) { return this._postRepository.countPostsFromDay(orgId, date); } getPostByForWebhookId(id: string) { return this._postRepository.getPostByForWebhookId(id); } async startWorkflow(taskQueue: string, postId: string, orgId: string) { try { const workflows = this._temporalService.client .getRawClient() ?.workflow.list({ query: `postId="${postId}" AND ExecutionStatus="Running"`, }); for await (const executionInfo of workflows) { try { const workflow = await this._temporalService.client.getWorkflowHandle( executionInfo.workflowId ); if ( workflow && (await workflow.describe()).status.name !== 'TERMINATED' ) { await workflow.terminate(); } } catch (err) {} } } catch (err) {} try { await this._temporalService.client .getRawClient() ?.workflow.start('postWorkflowV101', { workflowId: `post_${postId}`, taskQueue: 'main', args: [ { taskQueue: taskQueue, postId: postId, organizationId: orgId, }, ], typedSearchAttributes: new TypedSearchAttributes([ { key: postIdSearchParam, value: postId, }, { key: organizationId, value: orgId, }, ]), }); } catch (err) {} } async createPost(orgId: string, body: CreatePostDto): Promise { const postList = []; for (const post of body.posts) { const messages = (post.value || []).map((p) => p.content); const updateContent = !body.shortLink ? messages : await this._shortLinkService.convertTextToShortLinks(orgId, messages); post.value = (post.value || []).map((p, i) => ({ ...p, content: updateContent[i], })); const { posts } = await this._postRepository.createOrUpdatePost( body.type, orgId, body.type === 'now' ? dayjs().format('YYYY-MM-DDTHH:mm:00') : body.date, post, body.tags, body.inter ); if (!posts?.length) { return [] as any[]; } this.startWorkflow( post.settings.__type.split('-')[0].toLowerCase(), posts[0].id, orgId ).catch((err) => {}); Sentry.metrics.count('post_created', 1); postList.push({ postId: posts[0].id, integration: post.integration.id, }); } return postList; } async separatePosts(content: string, len: number) { return this._openaiService.separatePosts(content, len); } async changeState(id: string, state: State, err?: any, body?: any) { return this._postRepository.changeState(id, state, err, body); } async changeDate(orgId: string, id: string, date: string) { const getPostById = await this._postRepository.getPostById(id, orgId); const newDate = await this._postRepository.changeDate(orgId, id, date); try { await this.startWorkflow( getPostById.integration.providerIdentifier.split('-')[0].toLowerCase(), getPostById.id, orgId ); } catch (err) {} return newDate; } async generatePostsDraft(orgId: string, body: CreateGeneratedPostsDto) { const getAllIntegrations = ( await this._integrationService.getIntegrationsList(orgId) ).filter((f) => !f.disabled && f.providerIdentifier !== 'reddit'); // const posts = chunk(body.posts, getAllIntegrations.length); const allDates = dayjs() .isoWeek(body.week) .year(body.year) .startOf('isoWeek'); const dates = [...new Array(7)].map((_, i) => { return allDates.add(i, 'day').format('YYYY-MM-DD'); }); const findTime = (): string => { const totalMinutes = Math.floor(Math.random() * 144) * 10; // Convert total minutes to hours and minutes const hours = Math.floor(totalMinutes / 60); const minutes = totalMinutes % 60; // Format hours and minutes to always be two digits const formattedHours = hours.toString().padStart(2, '0'); const formattedMinutes = minutes.toString().padStart(2, '0'); const randomDate = shuffle(dates)[0] + 'T' + `${formattedHours}:${formattedMinutes}:00`; if (dayjs(randomDate).isBefore(dayjs())) { return findTime(); } return randomDate; }; for (const integration of getAllIntegrations) { for (const toPost of body.posts) { const group = makeId(10); const randomDate = findTime(); await this.createPost(orgId, { type: 'draft', date: randomDate, order: '', shortLink: false, tags: [], posts: [ { group, integration: { id: integration.id, }, settings: { __type: integration.providerIdentifier as any, title: '', tags: [], subreddit: [], }, value: [ ...toPost.list.map((l) => ({ id: '', content: l.post, delay: 0, image: [], })), { id: '', delay: 0, content: `Check out the full story here:\n${ body.postId || body.url }`, image: [], }, ], }, ], }); } } } findAllExistingCategories() { return this._postRepository.findAllExistingCategories(); } findAllExistingTopicsOfCategory(category: string) { return this._postRepository.findAllExistingTopicsOfCategory(category); } findPopularPosts(category: string, topic?: string) { return this._postRepository.findPopularPosts(category, topic); } async findFreeDateTime(orgId: string, integrationId?: string) { const findTimes = await this._integrationService.findFreeDateTime( orgId, integrationId ); return this.findFreeDateTimeRecursive( orgId, findTimes, dayjs.utc().startOf('day') ); } async createPopularPosts(post: { category: string; topic: string; content: string; hook: string; }) { return this._postRepository.createPopularPosts(post); } private async findFreeDateTimeRecursive( orgId: string, times: number[], date: dayjs.Dayjs ): Promise { const list = await this._postRepository.getPostsCountsByDates( orgId, times, date ); if (!list.length) { return this.findFreeDateTimeRecursive(orgId, times, date.add(1, 'day')); } const num = list.reduce((prev, curr) => { if (prev === null || prev > curr) { return curr; } return prev; }, null) as number; return date.clone().add(num, 'minutes').format('YYYY-MM-DDTHH:mm:00'); } getComments(postId: string) { return this._postRepository.getComments(postId); } getTags(orgId: string) { return this._postRepository.getTags(orgId); } createTag(orgId: string, body: CreateTagDto) { return this._postRepository.createTag(orgId, body); } editTag(id: string, orgId: string, body: CreateTagDto) { return this._postRepository.editTag(id, orgId, body); } createComment( orgId: string, userId: string, postId: string, comment: string ) { return this._postRepository.createComment(orgId, userId, postId, comment); } }