import { Injectable } from '@nestjs/common'; import { PostsRepository } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.repository'; import { CreatePostDto } from '@gitroom/nestjs-libraries/dtos/posts/create.post.dto'; import dayjs from 'dayjs'; import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager'; import { Integration, Post, Media, From } from '@prisma/client'; import { GetPostsDto } from '@gitroom/nestjs-libraries/dtos/posts/get.posts.dto'; import { NotificationService } from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service'; import { capitalize, shuffle, uniq } from 'lodash'; import { MessagesService } from '@gitroom/nestjs-libraries/database/prisma/marketplace/messages.service'; import { StripeService } from '@gitroom/nestjs-libraries/services/stripe.service'; import { CreateGeneratedPostsDto } from '@gitroom/nestjs-libraries/dtos/generator/create.generated.posts.dto'; import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service'; import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; import { BadBody, RefreshToken, } from '@gitroom/nestjs-libraries/integrations/social.abstract'; import { BullMqClient } from '@gitroom/nestjs-libraries/bull-mq-transport-new/client'; import { timer } from '@gitroom/helpers/utils/timer'; import { AuthTokenDetails } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface'; import utc from 'dayjs/plugin/utc'; import { MediaService } from '@gitroom/nestjs-libraries/database/prisma/media/media.service'; import { ShortLinkService } from '@gitroom/nestjs-libraries/short-linking/short.link.service'; dayjs.extend(utc); type PostWithConditionals = Post & { integration?: Integration; childrenPost: Post[]; }; @Injectable() export class PostsService { constructor( private _postRepository: PostsRepository, private _workerServiceProducer: BullMqClient, private _integrationManager: IntegrationManager, private _notificationService: NotificationService, private _messagesService: MessagesService, private _stripeService: StripeService, private _integrationService: IntegrationService, private _mediaService: MediaService, private _shortLinkService: ShortLinkService ) {} async getStatistics(orgId: string, id: string) { const getPost = await this.getPostsRecursively(id, true, orgId, true); const content = getPost.map((p) => p.content); const shortLinksTracking = await this._shortLinkService.getStatistics(content); return { clicks: shortLinksTracking } } async getPostsRecursively( id: string, includeIntegration = false, orgId?: string, isFirst?: boolean ): Promise { const post = await this._postRepository.getPost( id, includeIntegration, orgId, isFirst ); if (!post) { return []; } return [ post!, ...(post?.childrenPost?.length ? await this.getPostsRecursively( post?.childrenPost?.[0]?.id, false, orgId, false ) : []), ]; } getPosts(orgId: string, query: GetPostsDto) { return this._postRepository.getPosts(orgId, query); } async updateMedia(id: string, imagesList: any[]) { let imageUpdateNeeded = false; const getImageList = ( await Promise.all( imagesList.map(async (p: any) => { if (!p.path && p.id) { imageUpdateNeeded = true; return this._mediaService.getMediaById(p.id); } return p; }) ) ).map((m) => { return { ...m, url: m.path.indexOf('http') === -1 ? process.env.FRONTEND_URL + '/' + process.env.NEXT_PUBLIC_UPLOAD_STATIC_DIRECTORY + m.path : m.path, type: 'image', path: m.path.indexOf('http') === -1 ? process.env.UPLOAD_DIRECTORY + m.path : m.path, }; }); if (imageUpdateNeeded) { await this._postRepository.updateImages(id, JSON.stringify(getImageList)); } return getImageList; } async getPost(orgId: string, id: string) { const posts = await this.getPostsRecursively(id, true, orgId, true); const list = { group: posts?.[0]?.group, posts: await Promise.all( posts.map(async (post) => ({ ...post, image: await this.updateMedia( post.id, JSON.parse(post.image || '[]') ), })) ), integrationPicture: posts[0]?.integration?.picture, integration: posts[0].integrationId, settings: JSON.parse(posts[0].settings || '{}'), }; return list; } async getOldPosts(orgId: string, date: string) { return this._postRepository.getOldPosts(orgId, date); } async post(id: string) { const [firstPost, ...morePosts] = await this.getPostsRecursively(id, true); if (!firstPost) { return; } if (firstPost.integration?.refreshNeeded) { await this._notificationService.inAppNotification( firstPost.organizationId, `We couldn't post to ${firstPost.integration?.providerIdentifier} for ${firstPost?.integration?.name}`, `We couldn't post to ${firstPost.integration?.providerIdentifier} for ${firstPost?.integration?.name} because you need to reconnect it. Please enable it and try again.`, true ); return; } if (firstPost.integration?.disabled) { await this._notificationService.inAppNotification( firstPost.organizationId, `We couldn't post to ${firstPost.integration?.providerIdentifier} for ${firstPost?.integration?.name}`, `We couldn't post to ${firstPost.integration?.providerIdentifier} for ${firstPost?.integration?.name} because it's disabled. Please enable it and try again.`, true ); return; } try { const finalPost = firstPost.integration?.type === 'article' ? await this.postArticle(firstPost.integration!, [ firstPost, ...morePosts, ]) : await this.postSocial(firstPost.integration!, [ firstPost, ...morePosts, ]); if (!finalPost?.postId || !finalPost?.releaseURL) { await this._postRepository.changeState(firstPost.id, 'ERROR'); await this._notificationService.inAppNotification( firstPost.organizationId, `Error posting on ${firstPost.integration?.providerIdentifier} for ${firstPost?.integration?.name}`, `An error occurred while posting on ${firstPost.integration?.providerIdentifier}`, true ); return; } if (firstPost.submittedForOrderId) { this._workerServiceProducer.emit('submit', { payload: { id: firstPost.id, releaseURL: finalPost.releaseURL, }, }); } } catch (err: any) { await this._postRepository.changeState(firstPost.id, 'ERROR', err); await this._notificationService.inAppNotification( firstPost.organizationId, `Error posting on ${firstPost.integration?.providerIdentifier} for ${firstPost?.integration?.name}`, `An error occurred while posting on ${ firstPost.integration?.providerIdentifier } ${ !process.env.NODE_ENV || process.env.NODE_ENV === 'development' ? err : '' }`, true ); if (err instanceof BadBody) { console.error( '[Error] posting on', firstPost.integration?.providerIdentifier, err.identifier, err.json, err.body, err ); return; } console.error( '[Error] posting on', firstPost.integration?.providerIdentifier, err ); } } private async updateTags(orgId: string, post: Post[]): Promise { const plainText = JSON.stringify(post); const extract = Array.from( plainText.match(/\(post:[a-zA-Z0-9-_]+\)/g) || [] ); if (!extract.length) { return post; } const ids = extract.map((e) => e.replace('(post:', '').replace(')', '')); const urls = await this._postRepository.getPostUrls(orgId, ids); const newPlainText = ids.reduce((acc, value) => { const findUrl = urls?.find?.((u) => u.id === value)?.releaseURL || ''; return acc.replace( new RegExp(`\\(post:${value}\\)`, 'g'), findUrl.split(',')[0] ); }, plainText); return this.updateTags(orgId, JSON.parse(newPlainText) as Post[]); } private async postSocial( integration: Integration, posts: Post[], forceRefresh = false ): Promise> { const getIntegration = this._integrationManager.getSocialIntegration( integration.providerIdentifier ); if (!getIntegration) { return {}; } if (dayjs(integration?.tokenExpiration).isBefore(dayjs()) || forceRefresh) { const { accessToken, expiresIn, refreshToken, additionalSettings } = await new Promise((res) => { getIntegration .refreshToken(integration.refreshToken!) .then((r) => res(r)) .catch(() => res({ accessToken: '', expiresIn: 0, refreshToken: '', id: '', name: '', username: '', picture: '', additionalSettings: undefined, }) ); }); if (!accessToken) { await this._integrationService.refreshNeeded( integration.organizationId, integration.id ); await this._integrationService.informAboutRefreshError( integration.organizationId, integration ); return {}; } await this._integrationService.createOrUpdateIntegration( additionalSettings, !!getIntegration.oneTimeToken, integration.organizationId, integration.name, integration.picture!, 'social', integration.internalId, integration.providerIdentifier, accessToken, refreshToken, expiresIn ); integration.token = accessToken; if (getIntegration.refreshWait) { await timer(10000); } } const newPosts = await this.updateTags(integration.organizationId, posts); try { const publishedPosts = await getIntegration.post( integration.internalId, integration.token, await Promise.all( newPosts.map(async (p) => ({ id: p.id, message: p.content, settings: JSON.parse(p.settings || '{}'), media: await this.updateMedia(p.id, JSON.parse(p.image || '[]')), })) ), integration ); for (const post of publishedPosts) { await this._postRepository.updatePost( post.id, post.postId, post.releaseURL ); } await this._notificationService.inAppNotification( integration.organizationId, `Your post has been published on ${capitalize( integration.providerIdentifier )}`, `Your post has been published at ${publishedPosts[0].releaseURL}`, true ); await this.checkPlugs( integration.organizationId, getIntegration.identifier, integration.id, publishedPosts[0].postId ); await this.checkInternalPlug( integration, integration.organizationId, publishedPosts[0].postId, JSON.parse(newPosts[0].settings || '{}') ); return { postId: publishedPosts[0].postId, releaseURL: publishedPosts[0].releaseURL, }; } catch (err) { if (err instanceof RefreshToken) { return this.postSocial(integration, posts, true); } throw err; } } private async checkInternalPlug( integration: Integration, orgId: string, id: string, settings: any ) { const plugs = Object.entries(settings).filter(([key]) => { return key.indexOf('plug-') > -1; }); if (plugs.length === 0) { return; } const parsePlugs = plugs.reduce((all, [key, value]) => { const [_, name, identifier] = key.split('--'); all[name] = all[name] || { name }; all[name][identifier] = value; return all; }, {} as any); const list: { name: string; integrations: { id: string }[]; delay: string; active: boolean; }[] = Object.values(parsePlugs); for (const trigger of list || []) { for (const int of trigger?.integrations || []) { this._workerServiceProducer.emit('internal-plugs', { id: 'plug_' + id + '_' + trigger.name + '_' + int.id, options: { delay: +trigger.delay, }, payload: { post: id, originalIntegration: integration.id, integration: int.id, plugName: trigger.name, orgId: orgId, delay: +trigger.delay, information: trigger, }, }); } } } private async checkPlugs( orgId: string, providerName: string, integrationId: string, postId: string ) { const loadAllPlugs = this._integrationManager.getAllPlugs(); const getPlugs = await this._integrationService.getPlugs( orgId, integrationId ); const currentPlug = loadAllPlugs.find((p) => p.identifier === providerName); for (const plug of getPlugs) { const runPlug = currentPlug?.plugs?.find( (p: any) => p.methodName === plug.plugFunction )!; if (!runPlug) { continue; } this._workerServiceProducer.emit('plugs', { id: 'plug_' + postId + '_' + runPlug.identifier, options: { delay: runPlug.runEveryMilliseconds, }, payload: { plugId: plug.id, postId, delay: runPlug.runEveryMilliseconds, totalRuns: runPlug.totalRuns, currentRun: 1, }, }); } } private async postArticle(integration: Integration, posts: Post[]) { const getIntegration = this._integrationManager.getArticlesIntegration( integration.providerIdentifier ); if (!getIntegration) { return; } const newPosts = await this.updateTags(integration.organizationId, posts); const { postId, releaseURL } = await getIntegration.post( integration.token, newPosts.map((p) => p.content).join('\n\n'), JSON.parse(newPosts[0].settings || '{}') ); await this._notificationService.inAppNotification( integration.organizationId, `Your article has been published on ${capitalize( integration.providerIdentifier )}`, `Your article has been published at ${releaseURL}`, true ); await this._postRepository.updatePost(newPosts[0].id, postId, releaseURL); return { postId, releaseURL, }; } async deletePost(orgId: string, group: string) { const post = await this._postRepository.deletePost(orgId, group); if (post?.id) { await this._workerServiceProducer.delete('post', post.id); return {id: post.id}; } return {error: true}; } async countPostsFromDay(orgId: string, date: Date) { return this._postRepository.countPostsFromDay(orgId, date); } async submit( id: string, order: string, message: string, integrationId: string ) { if (!(await this._messagesService.canAddPost(id, order, integrationId))) { throw new Error('You can not add a post to this publication'); } const getOrgByOrder = await this._messagesService.getOrgByOrder(order); const submit = await this._postRepository.submit( id, order, getOrgByOrder?.messageGroup?.buyerOrganizationId! ); const messageModel = await this._messagesService.createNewMessage( submit?.submittedForOrder?.messageGroupId || '', From.SELLER, '', { type: 'post', data: { id: order, postId: id, status: 'PENDING', integration: integrationId, description: message.slice(0, 300) + '...', }, } ); await this._postRepository.updateMessage(id, messageModel.id); return messageModel; } async createPost(orgId: string, body: CreatePostDto) { const postList = []; for (const post of body.posts) { const messages = post.value.map(p => p.content); const updateContent = !body.shortLink ? messages : await this._shortLinkService.convertTextToShortLinks(orgId, messages); post.value = post.value.map((p, i) => ({ ...p, content: updateContent[i], })); const { previousPost, posts } = await this._postRepository.createOrUpdatePost( body.type, orgId, body.type === 'now' ? dayjs().format('YYYY-MM-DDTHH:mm:00') : body.date, post ); if (!posts?.length) { return; } await this._workerServiceProducer.delete( 'post', previousPost ? previousPost : posts?.[0]?.id ); if (body.order && body.type !== 'draft') { await this.submit( posts[0].id, body.order, post.value[0].content, post.integration.id ); continue; } if ( body.type === 'now' || (body.type === 'schedule' && dayjs(body.date).isAfter(dayjs())) ) { this._workerServiceProducer.emit('post', { id: posts[0].id, options: { delay: body.type === 'now' ? 0 : dayjs(posts[0].publishDate).diff(dayjs(), 'millisecond'), }, payload: { id: posts[0].id, }, }); } postList.push({ postId: posts[0].id, integration: post.integration.id, }) } return postList; } async changeDate(orgId: string, id: string, date: string) { const getPostById = await this._postRepository.getPostById(id, orgId); if ( getPostById?.submittedForOrderId && getPostById.approvedSubmitForOrder !== 'NO' ) { throw new Error( 'You can not change the date of a post that has been submitted' ); } await this._workerServiceProducer.delete('post', id); if (getPostById?.state !== 'DRAFT' && !getPostById?.submittedForOrderId) { this._workerServiceProducer.emit('post', { id: id, options: { delay: dayjs(date).diff(dayjs(), 'millisecond'), }, payload: { id: id, }, }); } return this._postRepository.changeDate(orgId, id, date); } async payout(id: string, url: string) { const getPost = await this._postRepository.getPostById(id); if (!getPost || !getPost.submittedForOrder) { return; } const findPrice = getPost.submittedForOrder.ordersItems.find( (orderItem) => orderItem.integrationId === getPost.integrationId )!; await this._messagesService.createNewMessage( getPost.submittedForOrder.messageGroupId, From.SELLER, '', { type: 'published', data: { id: getPost.submittedForOrder.id, postId: id, status: 'PUBLISHED', integrationId: getPost.integrationId, integration: getPost.integration.providerIdentifier, picture: getPost.integration.picture, name: getPost.integration.name, url, }, } ); const totalItems = getPost.submittedForOrder.ordersItems.reduce( (all, p) => all + p.quantity, 0 ); const totalPosts = getPost.submittedForOrder.posts.length; if (totalItems === totalPosts) { await this._messagesService.completeOrder(getPost.submittedForOrder.id); await this._messagesService.createNewMessage( getPost.submittedForOrder.messageGroupId, From.SELLER, '', { type: 'order-completed', data: { id: getPost.submittedForOrder.id, postId: id, status: 'PUBLISHED', }, } ); } try { await this._stripeService.payout( getPost.submittedForOrder.id, getPost.submittedForOrder.captureId!, getPost.submittedForOrder.seller.account!, findPrice.price ); return this._notificationService.inAppNotification( getPost.integration.organizationId, 'Payout completed', `You have received a payout of $${findPrice.price}`, true ); } catch (err) { await this._messagesService.payoutProblem( getPost.submittedForOrder.id, getPost.submittedForOrder.seller.id, findPrice.price, id ); } } async generatePostsDraft(orgId: string, body: CreateGeneratedPostsDto) { const getAllIntegrations = ( await this._integrationService.getIntegrationsList(orgId) ).filter((f) => !f.disabled && f.providerIdentifier !== 'reddit'); // const posts = chunk(body.posts, getAllIntegrations.length); const allDates = dayjs() .isoWeek(body.week) .year(body.year) .startOf('isoWeek'); const dates = [...new Array(7)].map((_, i) => { return allDates.add(i, 'day').format('YYYY-MM-DD'); }); const findTime = (): string => { const totalMinutes = Math.floor(Math.random() * 144) * 10; // Convert total minutes to hours and minutes const hours = Math.floor(totalMinutes / 60); const minutes = totalMinutes % 60; // Format hours and minutes to always be two digits const formattedHours = hours.toString().padStart(2, '0'); const formattedMinutes = minutes.toString().padStart(2, '0'); const randomDate = shuffle(dates)[0] + 'T' + `${formattedHours}:${formattedMinutes}:00`; if (dayjs(randomDate).isBefore(dayjs())) { return findTime(); } return randomDate; }; for (const integration of getAllIntegrations) { for (const toPost of body.posts) { const group = makeId(10); const randomDate = findTime(); await this.createPost(orgId, { type: 'draft', date: randomDate, order: '', shortLink: false, posts: [ { group, integration: { id: integration.id, }, settings: { subtitle: '', title: '', tags: [], subreddit: [], }, value: [ ...toPost.list.map((l) => ({ id: '', content: l.post, image: [], })), { id: '', content: `Check out the full story here:\n${ body.postId || body.url }`, image: [], }, ], }, ], }); } } } findAllExistingCategories() { return this._postRepository.findAllExistingCategories(); } findAllExistingTopicsOfCategory(category: string) { return this._postRepository.findAllExistingTopicsOfCategory(category); } findPopularPosts(category: string, topic?: string) { return this._postRepository.findPopularPosts(category, topic); } async findFreeDateTime(orgId: string) { const findTimes = await this._integrationService.findFreeDateTime(orgId); return this.findFreeDateTimeRecursive( orgId, findTimes, dayjs.utc().startOf('day') ); } async createPopularPosts(post: { category: string; topic: string; content: string; hook: string; }) { return this._postRepository.createPopularPosts(post); } private async findFreeDateTimeRecursive( orgId: string, times: number[], date: dayjs.Dayjs ): Promise { const list = await this._postRepository.getPostsCountsByDates( orgId, times, date ); if (!list.length) { return this.findFreeDateTimeRecursive(orgId, times, date.add(1, 'day')); } const num = list.reduce((prev, curr) => { if (prev === null || prev > curr) { return curr; } return prev; }, null) as number; return date.clone().add(num, 'minutes').format('YYYY-MM-DDTHH:mm:00'); } getComments(postId: string) { return this._postRepository.getComments(postId); } createComment( orgId: string, userId: string, postId: string, comment: string ) { return this._postRepository.createComment(orgId, userId, postId, comment); } }