diff --git a/apps/backend/src/api/routes/integrations.controller.ts b/apps/backend/src/api/routes/integrations.controller.ts index 78179693..7f3c31b1 100644 --- a/apps/backend/src/api/routes/integrations.controller.ts +++ b/apps/backend/src/api/routes/integrations.controller.ts @@ -488,30 +488,39 @@ export class IntegrationsController { throw new HttpException('', 412); } - return this._integrationService.createOrUpdateIntegration( - additionalSettings, - !!integrationProvider.oneTimeToken, - org.id, - validName.trim(), - picture, - 'social', - String(id), - integration, - accessToken, - refreshToken, - expiresIn, - username, - refresh ? false : integrationProvider.isBetweenSteps, - body.refresh, - +body.timezone, - details - ? AuthService.fixedEncryption(details) - : integrationProvider.customFields - ? AuthService.fixedEncryption( - Buffer.from(body.code, 'base64').toString() - ) - : undefined - ); + const createUpdate = + await this._integrationService.createOrUpdateIntegration( + additionalSettings, + !!integrationProvider.oneTimeToken, + org.id, + validName.trim(), + picture, + 'social', + String(id), + integration, + accessToken, + refreshToken, + expiresIn, + username, + refresh ? false : integrationProvider.isBetweenSteps, + body.refresh, + +body.timezone, + details + ? AuthService.fixedEncryption(details) + : integrationProvider.customFields + ? AuthService.fixedEncryption( + Buffer.from(body.code, 'base64').toString() + ) + : undefined + ); + + this._refreshIntegrationService + .startRefreshWorkflow(org.id, createUpdate.id, integrationProvider) + .catch((err) => { + console.log(err); + }); + + return createUpdate; } @Post('/disable') diff --git a/apps/orchestrator/src/activities/integrations.activity.ts b/apps/orchestrator/src/activities/integrations.activity.ts new file mode 100644 index 00000000..4a668bb6 --- /dev/null +++ b/apps/orchestrator/src/activities/integrations.activity.ts @@ -0,0 +1,23 @@ +import { Injectable } from '@nestjs/common'; +import { Activity, ActivityMethod } from 'nestjs-temporal-core'; +import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service'; +import { Integration } from '@prisma/client'; +import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integrations/refresh.integration.service'; + +@Injectable() +@Activity() +export class IntegrationsActivity { + constructor( + private _integrationService: IntegrationService, + private _refreshIntegrationService: RefreshIntegrationService + ) {} + + @ActivityMethod() + async getIntegrationsById(id: string, orgId: string) { + return this._integrationService.getIntegrationById(orgId, id); + } + + async refreshToken(integration: Integration) { + return this._refreshIntegrationService.refresh(integration); + } +} diff --git a/apps/orchestrator/src/app.module.ts b/apps/orchestrator/src/app.module.ts index 1d2acf0b..a0c277db 100644 --- a/apps/orchestrator/src/app.module.ts +++ b/apps/orchestrator/src/app.module.ts @@ -4,8 +4,14 @@ import { getTemporalModule } from '@gitroom/nestjs-libraries/temporal/temporal.m import { DatabaseModule } from '@gitroom/nestjs-libraries/database/prisma/database.module'; import { AutopostService } from '@gitroom/nestjs-libraries/database/prisma/autopost/autopost.service'; import { EmailActivity } from '@gitroom/orchestrator/activities/email.activity'; +import { IntegrationsActivity } from '@gitroom/orchestrator/activities/integrations.activity'; -const activities = [PostActivity, AutopostService, EmailActivity]; +const activities = [ + PostActivity, + AutopostService, + EmailActivity, + IntegrationsActivity, +]; @Module({ imports: [ DatabaseModule, diff --git a/apps/orchestrator/src/workflows/index.ts b/apps/orchestrator/src/workflows/index.ts index 9796a976..72f10d68 100644 --- a/apps/orchestrator/src/workflows/index.ts +++ b/apps/orchestrator/src/workflows/index.ts @@ -3,3 +3,4 @@ export * from './autopost.workflow'; export * from './digest.email.workflow'; export * from './missing.post.workflow'; export * from './send.email.workflow'; +export * from './refresh.token.workflow'; diff --git a/apps/orchestrator/src/workflows/refresh.token.workflow.ts b/apps/orchestrator/src/workflows/refresh.token.workflow.ts new file mode 100644 index 00000000..dc189ff0 --- /dev/null +++ b/apps/orchestrator/src/workflows/refresh.token.workflow.ts @@ -0,0 +1,55 @@ +import { proxyActivities, sleep } from '@temporalio/workflow'; +import { IntegrationsActivity } from '@gitroom/orchestrator/activities/integrations.activity'; + +const { getIntegrationsById, refreshToken } = + proxyActivities({ + startToCloseTimeout: '10 minute', + retry: { + maximumAttempts: 3, + backoffCoefficient: 1, + initialInterval: '2 minutes', + }, + }); + +export async function refreshTokenWorkflow({ + organizationId, + integrationId, +}: { + integrationId: string; + organizationId: string; +}) { + while (true) { + let integration = await getIntegrationsById(integrationId, organizationId); + if ( + !integration || + integration.deletedAt || + integration.inBetweenSteps || + integration.refreshNeeded + ) { + return false; + } + + const today = new Date(); + const endDate = new Date(integration.tokenExpiration); + + const minMax = Math.max(0, endDate.getTime() - today.getTime()); + if (!minMax) { + return false; + } + + await sleep(minMax as number); + + // while we were sleeping, the integration might have been deleted + integration = await getIntegrationsById(integrationId, organizationId); + if ( + !integration || + integration.deletedAt || + integration.inBetweenSteps || + integration.refreshNeeded + ) { + return false; + } + + await refreshToken(integration); + } +} diff --git a/libraries/nestjs-libraries/src/integrations/refresh.integration.service.ts b/libraries/nestjs-libraries/src/integrations/refresh.integration.service.ts index 1966aed8..a14d990b 100644 --- a/libraries/nestjs-libraries/src/integrations/refresh.integration.service.ts +++ b/libraries/nestjs-libraries/src/integrations/refresh.integration.service.ts @@ -6,13 +6,15 @@ import { AuthTokenDetails, SocialProvider, } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface'; +import { TemporalService } from 'nestjs-temporal-core'; @Injectable() export class RefreshIntegrationService { constructor( private _integrationManager: IntegrationManager, @Inject(forwardRef(() => IntegrationService)) - private _integrationService: IntegrationService + private _integrationService: IntegrationService, + private _temporalService: TemporalService ) {} async refresh(integration: Integration): Promise { const socialProvider = this._integrationManager.getSocialIntegration( @@ -50,6 +52,21 @@ export class RefreshIntegrationService { ); } + public async startRefreshWorkflow(orgId: string, id: string, integration: SocialProvider) { + if (!integration.refreshCron) { + return false; + } + + return this._temporalService.client + .getRawClient() + ?.workflow.start(`refreshTokenWorkflow`, { + workflowId: `refresh_${id}`, + args: [{integrationId: id, organizationId: orgId}], + taskQueue: 'main', + workflowIdConflictPolicy: 'TERMINATE_EXISTING', + }); + } + private async refreshProcess( integration: Integration, socialProvider: SocialProvider diff --git a/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts b/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts index e0bac74f..fecd0470 100644 --- a/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts @@ -24,6 +24,7 @@ export class InstagramStandaloneProvider identifier = 'instagram-standalone'; name = 'Instagram\n(Standalone)'; isBetweenSteps = false; + refreshCron = true; scopes = [ 'instagram_business_basic', 'instagram_business_content_publish', @@ -69,7 +70,7 @@ export class InstagramStandaloneProvider name, accessToken: access_token, refreshToken: access_token, - expiresIn: dayjs().add(59, 'days').unix() - dayjs().unix(), + expiresIn: dayjs().add(58, 'days').unix() - dayjs().unix(), picture: profile_picture_url || '', username, }; @@ -144,7 +145,7 @@ export class InstagramStandaloneProvider name, accessToken: access_token, refreshToken: access_token, - expiresIn: dayjs().add(59, 'days').unix() - dayjs().unix(), + expiresIn: dayjs().add(58, 'days').unix() - dayjs().unix(), picture: profile_picture_url, username, }; diff --git a/libraries/nestjs-libraries/src/integrations/social/social.integrations.interface.ts b/libraries/nestjs-libraries/src/integrations/social/social.integrations.interface.ts index 8ade3a81..3ecfba88 100644 --- a/libraries/nestjs-libraries/src/integrations/social/social.integrations.interface.ts +++ b/libraries/nestjs-libraries/src/integrations/social/social.integrations.interface.ts @@ -130,6 +130,7 @@ export interface SocialProvider identifier: string; refreshWait?: boolean; convertToJPEG?: boolean; + refreshCron?: boolean; dto?: any; maxLength: (additionalSettings?: any) => number; isWeb3?: boolean; diff --git a/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts b/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts index fa8e0dc2..0087c34a 100644 --- a/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts @@ -26,6 +26,7 @@ export class ThreadsProvider extends SocialAbstract implements SocialProvider { // 'threads_profile_discovery', ]; override maxConcurrentJob = 2; // Threads has moderate rate limits + refreshCron = true; editor = 'normal' as const; maxLength() { @@ -61,7 +62,7 @@ export class ThreadsProvider extends SocialAbstract implements SocialProvider { name, accessToken: access_token, refreshToken: access_token, - expiresIn: dayjs().add(59, 'days').unix() - dayjs().unix(), + expiresIn: dayjs().add(58, 'days').unix() - dayjs().unix(), picture: picture || '', username: '', }; @@ -114,7 +115,7 @@ export class ThreadsProvider extends SocialAbstract implements SocialProvider { 'https://graph.threads.net/access_token' + '?grant_type=th_exchange_token' + `&client_secret=${process.env.THREADS_APP_SECRET}` + - `&access_token=${getAccessToken.access_token}&fields=access_token,expires_in` + `&access_token=${getAccessToken.access_token}` ) ).json(); @@ -127,7 +128,7 @@ export class ThreadsProvider extends SocialAbstract implements SocialProvider { name, accessToken: access_token, refreshToken: access_token, - expiresIn: dayjs().add(59, 'days').unix() - dayjs().unix(), + expiresIn: dayjs().add(58, 'days').unix() - dayjs().unix(), picture: picture || '', username: username, };