From 6633fab9249d6a38911a74e37482e932b234e8d6 Mon Sep 17 00:00:00 2001 From: Nevo David Date: Mon, 5 Jan 2026 17:28:53 +0700 Subject: [PATCH] feat: final temporal touches --- apps/backend/src/app.module.ts | 2 + .../src/activities/post.activity.ts | 49 ++++++++++- apps/orchestrator/src/workflows/index.ts | 1 + .../src/workflows/missing.post.workflow.ts | 19 +++++ .../src/workflows/post.workflow.ts | 13 ++- docker-compose.dev.yaml | 84 +++++++++++++++++++ dynamicconfig/development-cass.yaml | 3 + dynamicconfig/development-sql.yaml | 6 ++ .../notifications/notification.service.ts | 73 +++------------- .../database/prisma/posts/posts.repository.ts | 28 ++----- .../database/prisma/posts/posts.service.ts | 4 + .../temporal/infinite.workflow.register.ts | 31 +++++++ package.json | 3 +- var/docker/create-namespace-default.sh | 7 ++ 14 files changed, 236 insertions(+), 87 deletions(-) create mode 100644 apps/orchestrator/src/workflows/missing.post.workflow.ts create mode 100644 dynamicconfig/development-cass.yaml create mode 100644 dynamicconfig/development-sql.yaml create mode 100644 libraries/nestjs-libraries/src/temporal/infinite.workflow.register.ts create mode 100755 var/docker/create-namespace-default.sh diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 1a5f89b0..64135061 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -14,6 +14,7 @@ import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception'; import { ChatModule } from '@gitroom/nestjs-libraries/chat/chat.module'; import { getTemporalModule } from '@gitroom/nestjs-libraries/temporal/temporal.module'; import { TemporalRegisterMissingSearchAttributesModule } from '@gitroom/nestjs-libraries/temporal/temporal.register'; +import { InfiniteWorkflowRegisterModule } from '@gitroom/nestjs-libraries/temporal/infinite.workflow.register'; @Global() @Module({ @@ -28,6 +29,7 @@ import { TemporalRegisterMissingSearchAttributesModule } from '@gitroom/nestjs-l ChatModule, getTemporalModule(false), TemporalRegisterMissingSearchAttributesModule, + InfiniteWorkflowRegisterModule, ThrottlerModule.forRoot([ { ttl: 3600000, diff --git a/apps/orchestrator/src/activities/post.activity.ts b/apps/orchestrator/src/activities/post.activity.ts index 88fdcef0..b7e3e027 100644 --- a/apps/orchestrator/src/activities/post.activity.ts +++ b/apps/orchestrator/src/activities/post.activity.ts @@ -1,5 +1,9 @@ import { Injectable } from '@nestjs/common'; -import { Activity, ActivityMethod } from 'nestjs-temporal-core'; +import { + Activity, + ActivityMethod, + TemporalService, +} from 'nestjs-temporal-core'; import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service'; import { NotificationService, @@ -13,6 +17,12 @@ import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integration import { timer } from '@gitroom/helpers/utils/timer'; import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service'; import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service'; +import { TypedSearchAttributes } from '@temporalio/common'; +import { + organizationId, + postId as postIdSearchParam, +} from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute'; +import { postWorkflow } from '@gitroom/orchestrator/workflows'; @Injectable() @Activity() @@ -23,9 +33,44 @@ export class PostActivity { private _integrationManager: IntegrationManager, private _integrationService: IntegrationService, private _refreshIntegrationService: RefreshIntegrationService, - private _webhookService: WebhooksService + private _webhookService: WebhooksService, + private _temporalService: TemporalService ) {} + @ActivityMethod() + async searchForMissingThreeHoursPosts() { + const list = await this._postService.searchForMissingThreeHoursPosts(); + for (const post of list) { + await this._temporalService.client + .getRawClient() + .workflow.signalWithStart('postWorkflow', { + workflowId: `post_${post.id}`, + taskQueue: 'main', + signal: 'poke', + signalArgs: [], + args: [ + { + taskQueue: post.integration.providerIdentifier + .split('-')[0] + .toLowerCase(), + postId: post.id, + organizationId: post.organizationId, + }, + ], + typedSearchAttributes: new TypedSearchAttributes([ + { + key: postIdSearchParam, + value: post.id, + }, + { + key: organizationId, + value: post.organizationId, + }, + ]), + }); + } + } + @ActivityMethod() async updatePost(id: string, postId: string, releaseURL: string) { return this._postService.updatePost(id, postId, releaseURL); diff --git a/apps/orchestrator/src/workflows/index.ts b/apps/orchestrator/src/workflows/index.ts index f6c0dabc..1781c2ea 100644 --- a/apps/orchestrator/src/workflows/index.ts +++ b/apps/orchestrator/src/workflows/index.ts @@ -1,3 +1,4 @@ export * from './post.workflow'; export * from './autopost.workflow'; export * from './digest.email.workflow'; +export * from './missing.post.workflow'; diff --git a/apps/orchestrator/src/workflows/missing.post.workflow.ts b/apps/orchestrator/src/workflows/missing.post.workflow.ts new file mode 100644 index 00000000..17babfd2 --- /dev/null +++ b/apps/orchestrator/src/workflows/missing.post.workflow.ts @@ -0,0 +1,19 @@ +import { proxyActivities, sleep } from '@temporalio/workflow'; +import { PostActivity } from '@gitroom/orchestrator/activities/post.activity'; + +const { searchForMissingThreeHoursPosts } = proxyActivities({ + startToCloseTimeout: '10 minute', + retry: { + maximumAttempts: 3, + backoffCoefficient: 1, + initialInterval: '2 minutes', + }, +}); + +export async function missingPostWorkflow() { + await searchForMissingThreeHoursPosts(); + while (true) { + await sleep('1 hour'); + await searchForMissingThreeHoursPosts(); + } +} diff --git a/apps/orchestrator/src/workflows/post.workflow.ts b/apps/orchestrator/src/workflows/post.workflow.ts index 8705519f..8fe685b2 100644 --- a/apps/orchestrator/src/workflows/post.workflow.ts +++ b/apps/orchestrator/src/workflows/post.workflow.ts @@ -5,6 +5,8 @@ import { startChild, proxyActivities, sleep, + defineSignal, + setHandler, } from '@temporalio/workflow'; import dayjs from 'dayjs'; import { Integration } from '@prisma/client'; @@ -42,6 +44,8 @@ const { }, }); +const poke = defineSignal('poke'); + export async function postWorkflow({ taskQueue, postId, @@ -53,7 +57,6 @@ export async function postWorkflow({ organizationId: string; postNow?: boolean; }) { - // Dynamic task queue, for concurrency const { postSocial, @@ -65,6 +68,11 @@ export async function postWorkflow({ processPlug, } = proxyTaskQueue(taskQueue); + let poked = false; + setHandler(poke, () => { + poked = true; + }); + const startTime = new Date(); // get all the posts and comments to post const postsList = await getPostsList(organizationId, postId); @@ -77,6 +85,9 @@ export async function postWorkflow({ // if it's a repeatable post, we should ignore this if (!postNow) { + if (dayjs(post.publishDate).isBefore(dayjs())) { + return; + } await sleep(dayjs(post.publishDate).diff(dayjs(), 'millisecond')); } diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml index e0f06084..38ee985b 100644 --- a/docker-compose.dev.yaml +++ b/docker-compose.dev.yaml @@ -53,6 +53,87 @@ services: - postiz-network restart: always + temporal-elasticsearch: + container_name: temporal-elasticsearch + image: elasticsearch:7.17.27 + environment: + - cluster.routing.allocation.disk.threshold_enabled=true + - cluster.routing.allocation.disk.watermark.low=512mb + - cluster.routing.allocation.disk.watermark.high=256mb + - cluster.routing.allocation.disk.watermark.flood_stage=128mb + - discovery.type=single-node + - ES_JAVA_OPTS=-Xms256m -Xmx256m + - xpack.security.enabled=false + networks: + - temporal-network + expose: + - 9200 + volumes: + - /var/lib/elasticsearch/data + + temporal-postgresql: + container_name: temporal-postgresql + image: postgres:16 + environment: + POSTGRES_PASSWORD: temporal + POSTGRES_USER: temporal + networks: + - temporal-network + expose: + - 5432 + volumes: + - /var/lib/postgresql/data + + temporal: + container_name: temporal + ports: + - "7233:7233" + image: temporalio/auto-setup:1.28.1 + depends_on: + - temporal-postgresql + - temporal-elasticsearch + environment: + - DB=postgres12 + - DB_PORT=5432 + - POSTGRES_USER=temporal + - POSTGRES_PWD=temporal + - POSTGRES_SEEDS=temporal-postgresql + - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml + - ENABLE_ES=true + - ES_SEEDS=temporal-elasticsearch + - ES_VERSION=v7 + - TEMPORAL_NAMESPACE=default + networks: + - temporal-network + volumes: + - ./dynamicconfig:/etc/temporal/config/dynamicconfig + labels: + kompose.volume.type: configMap + + temporal-admin-tools: + container_name: temporal-admin-tools + image: temporalio/admin-tools:1.28.1-tctl-1.18.4-cli-1.4.1 + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CLI_ADDRESS=temporal:7233 + networks: + - temporal-network + stdin_open: true + depends_on: + - temporal + tty: true + + temporal-ui: + container_name: temporal-ui + image: temporalio/ui:2.34.0 + environment: + - TEMPORAL_ADDRESS=temporal:7233 + - TEMPORAL_CORS_ORIGINS=http://127.0.0.1:3000 + networks: + - temporal-network + ports: + - "8080:8080" + volumes: redisinsight: postgres-volume: @@ -61,3 +142,6 @@ volumes: networks: postiz-network: external: false + temporal-network: + driver: bridge + name: temporal-network diff --git a/dynamicconfig/development-cass.yaml b/dynamicconfig/development-cass.yaml new file mode 100644 index 00000000..4b916163 --- /dev/null +++ b/dynamicconfig/development-cass.yaml @@ -0,0 +1,3 @@ +system.forceSearchAttributesCacheRefreshOnRead: + - value: true # Dev setup only. Please don't turn this on in production. + constraints: {} diff --git a/dynamicconfig/development-sql.yaml b/dynamicconfig/development-sql.yaml new file mode 100644 index 00000000..228c752f --- /dev/null +++ b/dynamicconfig/development-sql.yaml @@ -0,0 +1,6 @@ +limit.maxIDLength: + - value: 255 + constraints: {} +system.forceSearchAttributesCacheRefreshOnRead: + - value: true # Dev setup only. Please don't turn this on in production. + constraints: {} \ No newline at end of file diff --git a/libraries/nestjs-libraries/src/database/prisma/notifications/notification.service.ts b/libraries/nestjs-libraries/src/database/prisma/notifications/notification.service.ts index 0a926318..a8ba9e12 100644 --- a/libraries/nestjs-libraries/src/database/prisma/notifications/notification.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/notifications/notification.service.ts @@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common'; import { NotificationsRepository } from '@gitroom/nestjs-libraries/database/prisma/notifications/notifications.repository'; import { EmailService } from '@gitroom/nestjs-libraries/services/email.service'; import { OrganizationRepository } from '@gitroom/nestjs-libraries/database/prisma/organizations/organization.repository'; -import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service'; import { TemporalService } from 'nestjs-temporal-core'; import { TypedSearchAttributes } from '@temporalio/common'; import { organizationId } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute'; @@ -32,13 +31,6 @@ export class NotificationService { ); } - getNotificationsSince(organizationId: string, since: string) { - return this._notificationRepository.getNotificationsSince( - organizationId, - since - ); - } - async inAppNotification( orgId: string, subject: string, @@ -56,8 +48,18 @@ export class NotificationService { try { await this._temporalService.client .getRawClient() - ?.workflow.start('digestEmailWorkflow', { + ?.workflow.signalWithStart('digestEmailWorkflow', { workflowId: 'digest_email_workflow_' + orgId, + signal: 'email', + signalArgs: [ + [ + { + title: subject, + message, + type, + }, + ], + ], taskQueue: 'main', args: [{ organizationId: orgId }], typedSearchAttributes: new TypedSearchAttributes([ @@ -69,20 +71,6 @@ export class NotificationService { }); } catch (err) {} - await this._temporalService.signalWorkflow( - 'digest_email_workflow_' + orgId, - 'email', - [ - [ - { - title: subject, - message, - type, - }, - ], - ] - ); - return; } @@ -111,45 +99,6 @@ export class NotificationService { } } - async getDigestTypes(orgId: string): Promise { - const typesKey = 'digest_types_' + orgId; - const types = await ioRedis.smembers(typesKey); - // Clean up the types key after reading - await ioRedis.del(typesKey); - return types as NotificationType[]; - } - - async sendDigestEmailsToOrg( - orgId: string, - subject: string, - message: string, - types: NotificationType[] - ) { - const userOrg = await this._organizationRepository.getAllUsersOrgs(orgId); - const hasInfo = types.includes('info'); - const hasSuccess = types.includes('success'); - const hasFail = types.includes('fail'); - - for (const user of userOrg?.users || []) { - // 'info' type is always sent regardless of preferences - if (hasInfo) { - await this.sendEmail(user.user.email, subject, message); - continue; - } - - // For digest, check if user wants any of the notification types in the digest - const wantsSuccess = hasSuccess && user.user.sendSuccessEmails; - const wantsFail = hasFail && user.user.sendFailureEmails; - - // Only send if user wants at least one type of notification in the digest - if (!wantsSuccess && !wantsFail) { - continue; - } - - await this.sendEmail(user.user.email, subject, message); - } - } - async sendEmail(to: string, subject: string, html: string, replyTo?: string) { await this._emailService.sendEmail(to, subject, html, replyTo); } diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts index 5dba914f..937a9d87 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.repository.ts @@ -27,24 +27,6 @@ export class PostsRepository { private _errors: PrismaRepository<'errors'> ) {} - checkPending15minutesBack() { - return this._post.model.post.findMany({ - where: { - publishDate: { - lte: dayjs.utc().subtract(15, 'minute').toDate(), - gte: dayjs.utc().subtract(30, 'minute').toDate(), - }, - state: 'QUEUE', - deletedAt: null, - parentPostId: null, - }, - select: { - id: true, - publishDate: true, - }, - }); - } - searchForMissingThreeHoursPosts() { return this._post.model.post.findMany({ where: { @@ -54,8 +36,8 @@ export class PostsRepository { disabled: false, }, publishDate: { - gte: dayjs.utc().toDate(), - lt: dayjs.utc().add(3, 'hour').toDate(), + gte: dayjs.utc().subtract(2, 'hour').toDate(), + lt: dayjs.utc().add(2, 'hour').toDate(), }, state: 'QUEUE', deletedAt: null, @@ -63,6 +45,12 @@ export class PostsRepository { }, select: { id: true, + organizationId: true, + integration: { + select: { + providerIdentifier: true, + } + }, publishDate: true, }, }); diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts index 8196345d..36f56392 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts @@ -49,6 +49,10 @@ export class PostsService { private _temporalService: TemporalService ) {} + searchForMissingThreeHoursPosts() { + return this._postRepository.searchForMissingThreeHoursPosts(); + } + updatePost(id: string, postId: string, releaseURL: string) { return this._postRepository.updatePost(id, postId, releaseURL); } diff --git a/libraries/nestjs-libraries/src/temporal/infinite.workflow.register.ts b/libraries/nestjs-libraries/src/temporal/infinite.workflow.register.ts new file mode 100644 index 00000000..405bac5e --- /dev/null +++ b/libraries/nestjs-libraries/src/temporal/infinite.workflow.register.ts @@ -0,0 +1,31 @@ +import { Global, Injectable, Module, OnModuleInit } from '@nestjs/common'; +import { TemporalService } from 'nestjs-temporal-core'; + +@Injectable() +export class InfiniteWorkflowRegister implements OnModuleInit { + constructor(private _temporalService: TemporalService) {} + + async onModuleInit(): Promise { + if (!!process.env.RUN_CRON) { + try { + await this._temporalService.client + ?.getRawClient() + ?.workflow?.start('missingPostWorkflow', { + workflowId: 'missing-post-workflow', + taskQueue: 'main', + }); + } catch (err) {} + } + } +} + +@Global() +@Module({ + imports: [], + controllers: [], + providers: [InfiniteWorkflowRegister], + get exports() { + return this.providers; + }, +}) +export class InfiniteWorkflowRegisterModule {} diff --git a/package.json b/package.json index 546d680f..01dcdf6b 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "publish-sdk": "pnpm run --filter ./apps/sdk publish", "pm2-run": "pm2 delete all || true && pnpm run prisma-db-push && pnpm run --parallel pm2 && pm2 logs", "dev:stripe": "pnpm dlx concurrently \"stripe listen --forward-to localhost:3000/stripe\" \"pnpm run dev\"", - "build": "pnpm -r --workspace-concurrency=1 --filter ./apps/frontend --filter ./apps/backend --filter ./apps/cron run build", + "build": "pnpm -r --workspace-concurrency=1 --filter ./apps/frontend --filter ./apps/backend --filter ./apps/orchestrator run build", "build:backend": "rm -rf apps/backend/dist && pnpm --filter ./apps/backend run build", "build:frontend": "rm -rf apps/frontend/dist && pnpm --filter ./apps/frontend run build", "build:orchestrator": "rm -rf apps/orchestrator/dist && pnpm --filter ./apps/orchestrator run build", @@ -26,7 +26,6 @@ "dev:orchestrator": "rm -rf apps/orchestrator/dist && pnpm --filter ./apps/orchestrator run dev", "start:prod:backend": "pnpm --filter ./apps/backend run start", "start:prod:frontend": "pnpm --filter ./apps/frontend run start", - "start:prod:cron": "pnpm --filter ./apps/cron run start", "dev:docker": "docker compose -f ./docker-compose.dev.yaml up -d", "commands:build:development": "pnpm --filter ./apps/commands run build", "prisma-generate": "pnpm dlx prisma@6.5.0 generate --schema ./libraries/nestjs-libraries/src/database/prisma/schema.prisma", diff --git a/var/docker/create-namespace-default.sh b/var/docker/create-namespace-default.sh new file mode 100755 index 00000000..79eea34f --- /dev/null +++ b/var/docker/create-namespace-default.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +sleep 5 + +tctl namespace create --namespace "default" --description 'Default namespace' --rd 1 + +tini -s -- sleep infinity \ No newline at end of file