diff --git a/apps/orchestrator/src/activities/post.activity.ts b/apps/orchestrator/src/activities/post.activity.ts index f89ecaca..c4514ec5 100644 --- a/apps/orchestrator/src/activities/post.activity.ts +++ b/apps/orchestrator/src/activities/post.activity.ts @@ -36,6 +36,11 @@ export class PostActivity { private _temporalService: TemporalService ) {} + @ActivityMethod() + async getIntegrationById(orgId: string, id: string) { + return this._integrationService.getIntegrationById(orgId, id); + } + @ActivityMethod() async searchForMissingThreeHoursPosts() { const list = await this._postService.searchForMissingThreeHoursPosts(); diff --git a/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts index ed6a167f..7ca6eafa 100644 --- a/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts +++ b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts @@ -46,6 +46,8 @@ const { const poke = defineSignal('poke'); +const iterate = ['post', 'afterRefresh', 'retry1', 'retry2', 'retry3']; + export async function postWorkflowV101({ taskQueue, postId, @@ -61,6 +63,7 @@ export async function postWorkflowV101({ const { postSocial, postComment, + getIntegrationById, refreshToken, internalPlugs, globalPlugs, @@ -128,7 +131,7 @@ export async function postWorkflowV101({ // iterate over the posts for (let i = 0; i < postsList.length; i++) { // this is a small trick to repeat an action in case of token refresh - while (true) { + for (const _ of iterate) { try { // first post the main post if (i === 0) { @@ -212,7 +215,9 @@ export async function postWorkflowV101({ ) { await inAppNotification( post.organizationId, - `Error posting${i === 0 ? ' ' : ' comments '}on ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, + `Error posting${i === 0 ? ' ' : ' comments '}on ${ + post.integration?.providerIdentifier + } for ${post?.integration?.name}`, `An error occurred while posting${i === 0 ? ' ' : ' comments '}on ${ post.integration?.providerIdentifier }${err?.cause?.message ? `: ${err?.cause?.message}` : ``}`, @@ -222,8 +227,6 @@ export async function postWorkflowV101({ ); return false; } - - return false; } } } @@ -284,7 +287,7 @@ export async function postWorkflowV101({ // process internal plug if (todo.type === 'internal-plug') { - while (true) { + for (const _ of iterate) { try { await processInternalPlug({ ...todo, post: postsResults[0].postId }); } catch (err) { @@ -293,15 +296,25 @@ export async function postWorkflowV101({ err.cause instanceof ApplicationFailure && err.cause.type === 'refresh_token' ) { - const refresh = await refreshToken(post.integration); + const refresh = await refreshToken( + await getIntegrationById(organizationId, todo.integration) + ); if (!refresh || !refresh.accessToken) { - await changeState(postsList[0].id, 'ERROR', err, postsList); - return false; + break; } - post.integration.token = refresh.accessToken; continue; } + + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + break; + } + + continue; } break; } @@ -309,7 +322,7 @@ export async function postWorkflowV101({ // process global plug if (todo.type === 'global') { - while (true) { + for (const _ of iterate) { try { const process = await processPlug({ ...todo, @@ -338,14 +351,23 @@ export async function postWorkflowV101({ ) { const refresh = await refreshToken(post.integration); if (!refresh || !refresh.accessToken) { - await changeState(postsList[0].id, 'ERROR', err, postsList); - return false; + break; } - post.integration.token = refresh.accessToken; continue; } + + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + break; + } + + continue; } + break; } }