diff --git a/apps/orchestrator/src/workflows/index.ts b/apps/orchestrator/src/workflows/index.ts index 22f5fce3..9796a976 100644 --- a/apps/orchestrator/src/workflows/index.ts +++ b/apps/orchestrator/src/workflows/index.ts @@ -1,4 +1,3 @@ -export * from './post-workflows/post.workflow'; export * from './post-workflows/post.workflow.v1.0.1'; export * from './autopost.workflow'; export * from './digest.email.workflow'; diff --git a/apps/orchestrator/src/workflows/post-workflows/post.workflow.ts b/apps/orchestrator/src/workflows/post-workflows/post.workflow.ts deleted file mode 100644 index 96e3b06d..00000000 --- a/apps/orchestrator/src/workflows/post-workflows/post.workflow.ts +++ /dev/null @@ -1,368 +0,0 @@ -import { PostActivity } from '@gitroom/orchestrator/activities/post.activity'; -import { - ActivityFailure, - ApplicationFailure, - startChild, - proxyActivities, - sleep, - defineSignal, - setHandler, -} from '@temporalio/workflow'; -import dayjs from 'dayjs'; -import { Integration } from '@prisma/client'; -import { capitalize, sortBy } from 'lodash'; -import { PostResponse } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface'; -import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; -import { TypedSearchAttributes } from '@temporalio/common'; -import { postId as postIdSearchParam } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute'; - -const proxyTaskQueue = (taskQueue: string) => { - return proxyActivities({ - startToCloseTimeout: '10 minute', - taskQueue, - retry: { - maximumAttempts: 3, - backoffCoefficient: 1, - initialInterval: '2 minutes', - }, - }); -}; - -const { - getPostsList, - inAppNotification, - changeState, - updatePost, - sendWebhooks, - isCommentable, -} = proxyActivities({ - startToCloseTimeout: '10 minute', - retry: { - maximumAttempts: 3, - backoffCoefficient: 1, - initialInterval: '2 minutes', - }, -}); - -const poke = defineSignal('poke'); - -export async function postWorkflow({ - taskQueue, - postId, - organizationId, - postNow = false, -}: { - taskQueue: string; - postId: string; - organizationId: string; - postNow?: boolean; -}) { - // Dynamic task queue, for concurrency - const { - postSocial, - postComment, - refreshToken, - internalPlugs, - globalPlugs, - processInternalPlug, - processPlug, - } = proxyTaskQueue(taskQueue); - - let poked = false; - setHandler(poke, () => { - poked = true; - }); - - const startTime = new Date(); - // get all the posts and comments to post - const postsList = await getPostsList(organizationId, postId); - const [post] = postsList; - - // in case doesn't exists for some reason, fail it - if (!post || (!postNow && post.state !== 'QUEUE')) { - return; - } - - // if it's a repeatable post, we should ignore this. - if (!postNow) { - await sleep( - dayjs(post.publishDate).isBefore(dayjs()) - ? 0 - : dayjs(post.publishDate).diff(dayjs(), 'millisecond') - ); - } - - // if refresh is needed from last time, let's inform the user - if (post.integration?.refreshNeeded) { - await inAppNotification( - post.organizationId, - `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, - `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because you need to reconnect it. Please enable it and try again.`, - true, - false, - 'info' - ); - return; - } - - // if it's disabled, inform the user - if (post.integration?.disabled) { - await inAppNotification( - post.organizationId, - `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, - `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because it's disabled. Please enable it and try again.`, - true, - false, - 'info' - ); - return; - } - - // Do we need to post comment for this social? - const toComment = - postsList.length === 1 ? false : await isCommentable(post.integration); - - // list of all the saved results - const postsResults: PostResponse[] = []; - - // iterate over the posts - for (let i = 0; i < postsList.length; i++) { - // this is a small trick to repeat an action in case of token refresh - while (true) { - try { - // first post the main post - if (i === 0) { - postsResults.push( - ...(await postSocial(post.integration as Integration, [ - postsList[i], - ])) - ); - - // then post the comments if any - } else { - if (!toComment) { - break; - } - postsResults.push( - ...(await postComment( - postsResults[0].postId, - postsResults.length === 1 - ? undefined - : postsResults[i - 1].postId, - post.integration, - [postsList[i]] - )) - ); - } - - // mark post as successful - await updatePost( - postsList[i].id, - postsResults[i].postId, - postsResults[i].releaseURL - ); - - // break the current while to move to the next post - break; - } catch (err) { - // if token refresh is needed, do it and repeat - if ( - err instanceof ActivityFailure && - err.cause instanceof ApplicationFailure && - err.cause.type === 'refresh_token' - ) { - const refresh = await refreshToken(post.integration); - if (!refresh || !refresh.accessToken) { - await changeState(postsList[0].id, 'ERROR', err, postsList); - return false; - } - - post.integration.token = refresh.accessToken; - continue; - } - - // for other errors, change state and inform the user if needed - await changeState(postsList[0].id, 'ERROR', err, postsList); - - // specific case for bad body errors - if ( - err instanceof ActivityFailure && - err.cause instanceof ApplicationFailure && - err.cause.type === 'bad_body' - ) { - await inAppNotification( - post.organizationId, - `Error posting on ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, - `An error occurred while posting on ${ - post.integration?.providerIdentifier - }${err?.cause?.message ? `: ${err?.cause?.message}` : ``}`, - true, - false, - 'fail' - ); - return false; - } - - return false; - } - } - } - - // send notification on a sucessful post - await inAppNotification( - post.integration.organizationId, - `Your post has been published on ${capitalize( - post.integration.providerIdentifier - )}`, - `Your post has been published on ${capitalize( - post.integration.providerIdentifier - )} at ${postsResults[0].releaseURL}`, - true, - true - ); - - // send webhooks for the post - await sendWebhooks( - postsResults[0].postId, - post.organizationId, - post.integration.id - ); - - // load internal plugs like repost by other users - const internalPlugsList = await internalPlugs( - post.integration, - JSON.parse(post.settings) - ); - - // load global plugs, like repost a post if it gets to a certain number of likes - const globalPlugsList = (await globalPlugs(post.integration)).reduce( - (all, current) => { - for (let i = 1; i <= current.totalRuns; i++) { - all.push({ - ...current, - delay: current.delay * i, - }); - } - - return all; - }, - [] - ); - - // Check if the post is repeatable - const repeatPost = !post.intervalInDays - ? [] - : [ - { - type: 'repeat-post', - delay: - post.intervalInDays * 24 * 60 * 60 * 1000 - - (new Date().getTime() - startTime.getTime()), - }, - ]; - - // Sort all the actions by delay, so we can process them in order - const list = sortBy( - [...internalPlugsList, ...globalPlugsList, ...repeatPost], - 'delay' - ); - - // process all the plugs in order, we are using while because in some cases we need to remove items from the list - while (list.length > 0) { - // get the next to process - const todo = list.shift(); - - // wait for the delay - await sleep(todo.delay); - - // process internal plug - if (todo.type === 'internal-plug') { - while (true) { - try { - await processInternalPlug({ ...todo, post: postsResults[0].postId }); - } catch (err) { - if ( - err instanceof ActivityFailure && - err.cause instanceof ApplicationFailure && - err.cause.type === 'refresh_token' - ) { - const refresh = await refreshToken(post.integration); - if (!refresh || !refresh.accessToken) { - await changeState(postsList[0].id, 'ERROR', err, postsList); - return false; - } - - post.integration.token = refresh.accessToken; - continue; - } - } - break; - } - } - - // process global plug - if (todo.type === 'global') { - while (true) { - try { - const process = await processPlug({ - ...todo, - postId: postsResults[0].postId, - }); - if (process) { - const toDelete = list - .reduce((all, current, index) => { - if (current.plugId === todo.plugId) { - all.push(index); - } - - return all; - }, []) - .reverse(); - - for (const index of toDelete) { - list.splice(index, 1); - } - } - } catch (err) { - if ( - err instanceof ActivityFailure && - err.cause instanceof ApplicationFailure && - err.cause.type === 'refresh_token' - ) { - const refresh = await refreshToken(post.integration); - if (!refresh || !refresh.accessToken) { - await changeState(postsList[0].id, 'ERROR', err, postsList); - return false; - } - - post.integration.token = refresh.accessToken; - continue; - } - } - break; - } - } - - // process repeat post in a new workflow, this is important so the other plugs can keep running - if (todo.type === 'repeat-post') { - await startChild(postWorkflow, { - parentClosePolicy: 'ABANDON', - args: [ - { - taskQueue, - postId, - organizationId, - postNow: true, - }, - ], - workflowId: `post_${post.id}_${makeId(10)}`, - typedSearchAttributes: new TypedSearchAttributes([ - { - key: postIdSearchParam, - value: postId, - }, - ]), - }); - } - } -} diff --git a/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts index 7ca6eafa..b8fce6ac 100644 --- a/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts +++ b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.1.ts @@ -46,7 +46,7 @@ const { const poke = defineSignal('poke'); -const iterate = ['post', 'afterRefresh', 'retry1', 'retry2', 'retry3']; +const iterate = Array.from({ length: 5 }); export async function postWorkflowV101({ taskQueue, @@ -78,8 +78,8 @@ export async function postWorkflowV101({ const startTime = new Date(); // get all the posts and comments to post - const postsList = await getPostsList(organizationId, postId); - const [post] = postsList; + const postsListBefore = await getPostsList(organizationId, postId); + const [post] = postsListBefore; // in case doesn't exists for some reason, fail it if (!post || (!postNow && post.state !== 'QUEUE')) { @@ -122,14 +122,19 @@ export async function postWorkflowV101({ } // Do we need to post comment for this social? - const toComment = - postsList.length === 1 ? false : await isCommentable(post.integration); + const toComment: boolean = + postsListBefore.length === 1 + ? false + : await isCommentable(post.integration); + + const postsList = toComment ? postsListBefore : [postsListBefore[0]]; // list of all the saved results const postsResults: PostResponse[] = []; // iterate over the posts for (let i = 0; i < postsList.length; i++) { + const before = postsResults.length; // this is a small trick to repeat an action in case of token refresh for (const _ of iterate) { try { @@ -143,12 +148,8 @@ export async function postWorkflowV101({ // then post the comments if any } else { - if (!toComment) { - break; - } - if (postsList[i].delay) { - await sleep(60000 * postsList[i].delay); + await sleep(60000 * Math.max(0, Number(postsList[i].delay ?? 0))); } postsResults.push( @@ -229,6 +230,11 @@ export async function postWorkflowV101({ } } } + + if (postsResults.length === before) { + // all retries exhausted without success + return false; + } } // send webhooks for the post @@ -283,7 +289,7 @@ export async function postWorkflowV101({ const todo = list.shift(); // wait for the delay - await sleep(todo.delay); + await sleep(Math.max(0, Number(todo.delay ?? 0))); // process internal plug if (todo.type === 'internal-plug') {