postiz/apps/orchestrator/src/activities/post.activity.ts

295 lines
8.0 KiB
TypeScript

import { Injectable } from '@nestjs/common';
import {
Activity,
ActivityMethod,
TemporalService,
} from 'nestjs-temporal-core';
import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service';
import {
NotificationService,
NotificationType,
} from '@gitroom/nestjs-libraries/database/prisma/notifications/notification.service';
import { Integration, Post, State } from '@prisma/client';
import { stripHtmlValidation } from '@gitroom/helpers/utils/strip.html.validation';
import { IntegrationManager } from '@gitroom/nestjs-libraries/integrations/integration.manager';
import { AuthTokenDetails } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface';
import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integrations/refresh.integration.service';
import { timer } from '@gitroom/helpers/utils/timer';
import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service';
import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service';
import { TypedSearchAttributes } from '@temporalio/common';
import {
organizationId,
postId as postIdSearchParam,
} from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
import { postWorkflow } from '@gitroom/orchestrator/workflows';
@Injectable()
@Activity()
export class PostActivity {
constructor(
private _postService: PostsService,
private _notificationService: NotificationService,
private _integrationManager: IntegrationManager,
private _integrationService: IntegrationService,
private _refreshIntegrationService: RefreshIntegrationService,
private _webhookService: WebhooksService,
private _temporalService: TemporalService
) {}
@ActivityMethod()
async searchForMissingThreeHoursPosts() {
const list = await this._postService.searchForMissingThreeHoursPosts();
for (const post of list) {
await this._temporalService.client
.getRawClient()
.workflow.signalWithStart('postWorkflow', {
workflowId: `post_${post.id}`,
taskQueue: 'main',
signal: 'poke',
signalArgs: [],
args: [
{
taskQueue: post.integration.providerIdentifier
.split('-')[0]
.toLowerCase(),
postId: post.id,
organizationId: post.organizationId,
},
],
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: post.id,
},
{
key: organizationId,
value: post.organizationId,
},
]),
});
}
}
@ActivityMethod()
async updatePost(id: string, postId: string, releaseURL: string) {
return this._postService.updatePost(id, postId, releaseURL);
}
@ActivityMethod()
async getPostsList(orgId: string, postId: string) {
return this._postService.getPostsRecursively(postId, true, orgId);
}
@ActivityMethod()
async isCommentable(integration: Integration) {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
);
return !!getIntegration.comment;
}
@ActivityMethod()
async postComment(
postId: string,
lastPostId: string | undefined,
integration: Integration,
posts: Post[]
) {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
);
const newPosts = await this._postService.updateTags(
integration.organizationId,
posts
);
return getIntegration.comment(
integration.internalId,
postId,
lastPostId,
integration.token,
await Promise.all(
(newPosts || []).map(async (p) => ({
id: p.id,
message: stripHtmlValidation(
getIntegration.editor,
p.content,
true,
false,
!/<\/?[a-z][\s\S]*>/i.test(p.content),
getIntegration.mentionFormat
),
settings: JSON.parse(p.settings || '{}'),
media: await this._postService.updateMedia(
p.id,
JSON.parse(p.image || '[]'),
getIntegration?.convertToJPEG || false
),
}))
),
integration
);
}
@ActivityMethod()
async postSocial(integration: Integration, posts: Post[]) {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
);
const newPosts = await this._postService.updateTags(
integration.organizationId,
posts
);
return getIntegration.post(
integration.internalId,
integration.token,
await Promise.all(
(newPosts || []).map(async (p) => ({
id: p.id,
message: stripHtmlValidation(
getIntegration.editor,
p.content,
true,
false,
!/<\/?[a-z][\s\S]*>/i.test(p.content),
getIntegration.mentionFormat
),
settings: JSON.parse(p.settings || '{}'),
media: await this._postService.updateMedia(
p.id,
JSON.parse(p.image || '[]'),
getIntegration?.convertToJPEG || false
),
}))
),
integration
);
}
@ActivityMethod()
async inAppNotification(
orgId: string,
subject: string,
message: string,
sendEmail = false,
digest = false,
type: NotificationType = 'success'
) {
return this._notificationService.inAppNotification(
orgId,
subject,
message,
sendEmail,
digest,
type
);
}
@ActivityMethod()
async globalPlugs(integration: Integration) {
return this._postService.checkPlugs(
integration.organizationId,
integration.providerIdentifier,
integration.id
);
}
@ActivityMethod()
async changeState(id: string, state: State, err?: any, body?: any) {
return this._postService.changeState(id, state, err, body);
}
@ActivityMethod()
async internalPlugs(integration: Integration, settings: any) {
return this._postService.checkInternalPlug(
integration,
integration.organizationId,
integration.id,
settings
);
}
@ActivityMethod()
async sendWebhooks(postId: string, orgId: string, integrationId: string) {
const webhooks = (await this._webhookService.getWebhooks(orgId)).filter(
(f) => {
return (
f.integrations.length === 0 ||
f.integrations.some((i) => i.integration.id === integrationId)
);
}
);
const post = await this._postService.getPostByForWebhookId(postId);
return Promise.all(
webhooks.map(async (webhook) => {
try {
await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(post),
});
} catch (e) {
/**empty**/
}
})
);
}
@ActivityMethod()
async processPlug(data: {
plugId: string;
postId: string;
delay: number;
totalRuns: number;
currentRun: number;
}) {
return this._integrationService.processPlugs(data);
}
@ActivityMethod()
async processInternalPlug(data: {
post: string;
originalIntegration: string;
integration: string;
plugName: string;
orgId: string;
delay: number;
information: any;
}) {
return this._integrationService.processInternalPlug(data);
}
@ActivityMethod()
async refreshToken(
integration: Integration
): Promise<false | AuthTokenDetails> {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
);
try {
const refresh = await this._refreshIntegrationService.refresh(
integration
);
if (!refresh) {
return false;
}
if (getIntegration.refreshWait) {
await timer(10000);
}
return refresh;
} catch (err) {
return false;
}
}
}