feat: big workflow change

This commit is contained in:
Nevo David 2026-01-08 21:26:29 +07:00
parent 96661cc30d
commit 07238a06a6
2 changed files with 40 additions and 13 deletions

View File

@ -36,6 +36,11 @@ export class PostActivity {
private _temporalService: TemporalService
) {}
@ActivityMethod()
async getIntegrationById(orgId: string, id: string) {
return this._integrationService.getIntegrationById(orgId, id);
}
@ActivityMethod()
async searchForMissingThreeHoursPosts() {
const list = await this._postService.searchForMissingThreeHoursPosts();

View File

@ -46,6 +46,8 @@ const {
const poke = defineSignal('poke');
const iterate = ['post', 'afterRefresh', 'retry1', 'retry2', 'retry3'];
export async function postWorkflowV101({
taskQueue,
postId,
@ -61,6 +63,7 @@ export async function postWorkflowV101({
const {
postSocial,
postComment,
getIntegrationById,
refreshToken,
internalPlugs,
globalPlugs,
@ -128,7 +131,7 @@ export async function postWorkflowV101({
// iterate over the posts
for (let i = 0; i < postsList.length; i++) {
// this is a small trick to repeat an action in case of token refresh
while (true) {
for (const _ of iterate) {
try {
// first post the main post
if (i === 0) {
@ -212,7 +215,9 @@ export async function postWorkflowV101({
) {
await inAppNotification(
post.organizationId,
`Error posting${i === 0 ? ' ' : ' comments '}on ${post.integration?.providerIdentifier} for ${post?.integration?.name}`,
`Error posting${i === 0 ? ' ' : ' comments '}on ${
post.integration?.providerIdentifier
} for ${post?.integration?.name}`,
`An error occurred while posting${i === 0 ? ' ' : ' comments '}on ${
post.integration?.providerIdentifier
}${err?.cause?.message ? `: ${err?.cause?.message}` : ``}`,
@ -222,8 +227,6 @@ export async function postWorkflowV101({
);
return false;
}
return false;
}
}
}
@ -284,7 +287,7 @@ export async function postWorkflowV101({
// process internal plug
if (todo.type === 'internal-plug') {
while (true) {
for (const _ of iterate) {
try {
await processInternalPlug({ ...todo, post: postsResults[0].postId });
} catch (err) {
@ -293,15 +296,25 @@ export async function postWorkflowV101({
err.cause instanceof ApplicationFailure &&
err.cause.type === 'refresh_token'
) {
const refresh = await refreshToken(post.integration);
const refresh = await refreshToken(
await getIntegrationById(organizationId, todo.integration)
);
if (!refresh || !refresh.accessToken) {
await changeState(postsList[0].id, 'ERROR', err, postsList);
return false;
break;
}
post.integration.token = refresh.accessToken;
continue;
}
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
break;
}
continue;
}
break;
}
@ -309,7 +322,7 @@ export async function postWorkflowV101({
// process global plug
if (todo.type === 'global') {
while (true) {
for (const _ of iterate) {
try {
const process = await processPlug({
...todo,
@ -338,14 +351,23 @@ export async function postWorkflowV101({
) {
const refresh = await refreshToken(post.integration);
if (!refresh || !refresh.accessToken) {
await changeState(postsList[0].id, 'ERROR', err, postsList);
return false;
break;
}
post.integration.token = refresh.accessToken;
continue;
}
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
break;
}
continue;
}
break;
}
}