feat: final temporal touches

This commit is contained in:
Nevo David 2026-01-05 17:28:53 +07:00
parent da0045428a
commit 6633fab924
14 changed files with 236 additions and 87 deletions

View File

@ -14,6 +14,7 @@ import { FILTER } from '@gitroom/nestjs-libraries/sentry/sentry.exception';
import { ChatModule } from '@gitroom/nestjs-libraries/chat/chat.module'; import { ChatModule } from '@gitroom/nestjs-libraries/chat/chat.module';
import { getTemporalModule } from '@gitroom/nestjs-libraries/temporal/temporal.module'; import { getTemporalModule } from '@gitroom/nestjs-libraries/temporal/temporal.module';
import { TemporalRegisterMissingSearchAttributesModule } from '@gitroom/nestjs-libraries/temporal/temporal.register'; import { TemporalRegisterMissingSearchAttributesModule } from '@gitroom/nestjs-libraries/temporal/temporal.register';
import { InfiniteWorkflowRegisterModule } from '@gitroom/nestjs-libraries/temporal/infinite.workflow.register';
@Global() @Global()
@Module({ @Module({
@ -28,6 +29,7 @@ import { TemporalRegisterMissingSearchAttributesModule } from '@gitroom/nestjs-l
ChatModule, ChatModule,
getTemporalModule(false), getTemporalModule(false),
TemporalRegisterMissingSearchAttributesModule, TemporalRegisterMissingSearchAttributesModule,
InfiniteWorkflowRegisterModule,
ThrottlerModule.forRoot([ ThrottlerModule.forRoot([
{ {
ttl: 3600000, ttl: 3600000,

View File

@ -1,5 +1,9 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Activity, ActivityMethod } from 'nestjs-temporal-core'; import {
Activity,
ActivityMethod,
TemporalService,
} from 'nestjs-temporal-core';
import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service'; import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service';
import { import {
NotificationService, NotificationService,
@ -13,6 +17,12 @@ import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integration
import { timer } from '@gitroom/helpers/utils/timer'; import { timer } from '@gitroom/helpers/utils/timer';
import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service'; import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service';
import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service'; import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service';
import { TypedSearchAttributes } from '@temporalio/common';
import {
organizationId,
postId as postIdSearchParam,
} from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
import { postWorkflow } from '@gitroom/orchestrator/workflows';
@Injectable() @Injectable()
@Activity() @Activity()
@ -23,9 +33,44 @@ export class PostActivity {
private _integrationManager: IntegrationManager, private _integrationManager: IntegrationManager,
private _integrationService: IntegrationService, private _integrationService: IntegrationService,
private _refreshIntegrationService: RefreshIntegrationService, private _refreshIntegrationService: RefreshIntegrationService,
private _webhookService: WebhooksService private _webhookService: WebhooksService,
private _temporalService: TemporalService
) {} ) {}
@ActivityMethod()
async searchForMissingThreeHoursPosts() {
const list = await this._postService.searchForMissingThreeHoursPosts();
for (const post of list) {
await this._temporalService.client
.getRawClient()
.workflow.signalWithStart('postWorkflow', {
workflowId: `post_${post.id}`,
taskQueue: 'main',
signal: 'poke',
signalArgs: [],
args: [
{
taskQueue: post.integration.providerIdentifier
.split('-')[0]
.toLowerCase(),
postId: post.id,
organizationId: post.organizationId,
},
],
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: post.id,
},
{
key: organizationId,
value: post.organizationId,
},
]),
});
}
}
@ActivityMethod() @ActivityMethod()
async updatePost(id: string, postId: string, releaseURL: string) { async updatePost(id: string, postId: string, releaseURL: string) {
return this._postService.updatePost(id, postId, releaseURL); return this._postService.updatePost(id, postId, releaseURL);

View File

@ -1,3 +1,4 @@
export * from './post.workflow'; export * from './post.workflow';
export * from './autopost.workflow'; export * from './autopost.workflow';
export * from './digest.email.workflow'; export * from './digest.email.workflow';
export * from './missing.post.workflow';

View File

@ -0,0 +1,19 @@
import { proxyActivities, sleep } from '@temporalio/workflow';
import { PostActivity } from '@gitroom/orchestrator/activities/post.activity';
const { searchForMissingThreeHoursPosts } = proxyActivities<PostActivity>({
startToCloseTimeout: '10 minute',
retry: {
maximumAttempts: 3,
backoffCoefficient: 1,
initialInterval: '2 minutes',
},
});
export async function missingPostWorkflow() {
await searchForMissingThreeHoursPosts();
while (true) {
await sleep('1 hour');
await searchForMissingThreeHoursPosts();
}
}

View File

@ -5,6 +5,8 @@ import {
startChild, startChild,
proxyActivities, proxyActivities,
sleep, sleep,
defineSignal,
setHandler,
} from '@temporalio/workflow'; } from '@temporalio/workflow';
import dayjs from 'dayjs'; import dayjs from 'dayjs';
import { Integration } from '@prisma/client'; import { Integration } from '@prisma/client';
@ -42,6 +44,8 @@ const {
}, },
}); });
const poke = defineSignal('poke');
export async function postWorkflow({ export async function postWorkflow({
taskQueue, taskQueue,
postId, postId,
@ -53,7 +57,6 @@ export async function postWorkflow({
organizationId: string; organizationId: string;
postNow?: boolean; postNow?: boolean;
}) { }) {
// Dynamic task queue, for concurrency // Dynamic task queue, for concurrency
const { const {
postSocial, postSocial,
@ -65,6 +68,11 @@ export async function postWorkflow({
processPlug, processPlug,
} = proxyTaskQueue(taskQueue); } = proxyTaskQueue(taskQueue);
let poked = false;
setHandler(poke, () => {
poked = true;
});
const startTime = new Date(); const startTime = new Date();
// get all the posts and comments to post // get all the posts and comments to post
const postsList = await getPostsList(organizationId, postId); const postsList = await getPostsList(organizationId, postId);
@ -77,6 +85,9 @@ export async function postWorkflow({
// if it's a repeatable post, we should ignore this // if it's a repeatable post, we should ignore this
if (!postNow) { if (!postNow) {
if (dayjs(post.publishDate).isBefore(dayjs())) {
return;
}
await sleep(dayjs(post.publishDate).diff(dayjs(), 'millisecond')); await sleep(dayjs(post.publishDate).diff(dayjs(), 'millisecond'));
} }

View File

@ -53,6 +53,87 @@ services:
- postiz-network - postiz-network
restart: always restart: always
temporal-elasticsearch:
container_name: temporal-elasticsearch
image: elasticsearch:7.17.27
environment:
- cluster.routing.allocation.disk.threshold_enabled=true
- cluster.routing.allocation.disk.watermark.low=512mb
- cluster.routing.allocation.disk.watermark.high=256mb
- cluster.routing.allocation.disk.watermark.flood_stage=128mb
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms256m -Xmx256m
- xpack.security.enabled=false
networks:
- temporal-network
expose:
- 9200
volumes:
- /var/lib/elasticsearch/data
temporal-postgresql:
container_name: temporal-postgresql
image: postgres:16
environment:
POSTGRES_PASSWORD: temporal
POSTGRES_USER: temporal
networks:
- temporal-network
expose:
- 5432
volumes:
- /var/lib/postgresql/data
temporal:
container_name: temporal
ports:
- "7233:7233"
image: temporalio/auto-setup:1.28.1
depends_on:
- temporal-postgresql
- temporal-elasticsearch
environment:
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=temporal-postgresql
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
- ENABLE_ES=true
- ES_SEEDS=temporal-elasticsearch
- ES_VERSION=v7
- TEMPORAL_NAMESPACE=default
networks:
- temporal-network
volumes:
- ./dynamicconfig:/etc/temporal/config/dynamicconfig
labels:
kompose.volume.type: configMap
temporal-admin-tools:
container_name: temporal-admin-tools
image: temporalio/admin-tools:1.28.1-tctl-1.18.4-cli-1.4.1
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
networks:
- temporal-network
stdin_open: true
depends_on:
- temporal
tty: true
temporal-ui:
container_name: temporal-ui
image: temporalio/ui:2.34.0
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://127.0.0.1:3000
networks:
- temporal-network
ports:
- "8080:8080"
volumes: volumes:
redisinsight: redisinsight:
postgres-volume: postgres-volume:
@ -61,3 +142,6 @@ volumes:
networks: networks:
postiz-network: postiz-network:
external: false external: false
temporal-network:
driver: bridge
name: temporal-network

View File

@ -0,0 +1,3 @@
system.forceSearchAttributesCacheRefreshOnRead:
- value: true # Dev setup only. Please don't turn this on in production.
constraints: {}

View File

@ -0,0 +1,6 @@
limit.maxIDLength:
- value: 255
constraints: {}
system.forceSearchAttributesCacheRefreshOnRead:
- value: true # Dev setup only. Please don't turn this on in production.
constraints: {}

View File

@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common';
import { NotificationsRepository } from '@gitroom/nestjs-libraries/database/prisma/notifications/notifications.repository'; import { NotificationsRepository } from '@gitroom/nestjs-libraries/database/prisma/notifications/notifications.repository';
import { EmailService } from '@gitroom/nestjs-libraries/services/email.service'; import { EmailService } from '@gitroom/nestjs-libraries/services/email.service';
import { OrganizationRepository } from '@gitroom/nestjs-libraries/database/prisma/organizations/organization.repository'; import { OrganizationRepository } from '@gitroom/nestjs-libraries/database/prisma/organizations/organization.repository';
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
import { TemporalService } from 'nestjs-temporal-core'; import { TemporalService } from 'nestjs-temporal-core';
import { TypedSearchAttributes } from '@temporalio/common'; import { TypedSearchAttributes } from '@temporalio/common';
import { organizationId } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute'; import { organizationId } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
@ -32,13 +31,6 @@ export class NotificationService {
); );
} }
getNotificationsSince(organizationId: string, since: string) {
return this._notificationRepository.getNotificationsSince(
organizationId,
since
);
}
async inAppNotification( async inAppNotification(
orgId: string, orgId: string,
subject: string, subject: string,
@ -56,8 +48,18 @@ export class NotificationService {
try { try {
await this._temporalService.client await this._temporalService.client
.getRawClient() .getRawClient()
?.workflow.start('digestEmailWorkflow', { ?.workflow.signalWithStart('digestEmailWorkflow', {
workflowId: 'digest_email_workflow_' + orgId, workflowId: 'digest_email_workflow_' + orgId,
signal: 'email',
signalArgs: [
[
{
title: subject,
message,
type,
},
],
],
taskQueue: 'main', taskQueue: 'main',
args: [{ organizationId: orgId }], args: [{ organizationId: orgId }],
typedSearchAttributes: new TypedSearchAttributes([ typedSearchAttributes: new TypedSearchAttributes([
@ -69,20 +71,6 @@ export class NotificationService {
}); });
} catch (err) {} } catch (err) {}
await this._temporalService.signalWorkflow(
'digest_email_workflow_' + orgId,
'email',
[
[
{
title: subject,
message,
type,
},
],
]
);
return; return;
} }
@ -111,45 +99,6 @@ export class NotificationService {
} }
} }
async getDigestTypes(orgId: string): Promise<NotificationType[]> {
const typesKey = 'digest_types_' + orgId;
const types = await ioRedis.smembers(typesKey);
// Clean up the types key after reading
await ioRedis.del(typesKey);
return types as NotificationType[];
}
async sendDigestEmailsToOrg(
orgId: string,
subject: string,
message: string,
types: NotificationType[]
) {
const userOrg = await this._organizationRepository.getAllUsersOrgs(orgId);
const hasInfo = types.includes('info');
const hasSuccess = types.includes('success');
const hasFail = types.includes('fail');
for (const user of userOrg?.users || []) {
// 'info' type is always sent regardless of preferences
if (hasInfo) {
await this.sendEmail(user.user.email, subject, message);
continue;
}
// For digest, check if user wants any of the notification types in the digest
const wantsSuccess = hasSuccess && user.user.sendSuccessEmails;
const wantsFail = hasFail && user.user.sendFailureEmails;
// Only send if user wants at least one type of notification in the digest
if (!wantsSuccess && !wantsFail) {
continue;
}
await this.sendEmail(user.user.email, subject, message);
}
}
async sendEmail(to: string, subject: string, html: string, replyTo?: string) { async sendEmail(to: string, subject: string, html: string, replyTo?: string) {
await this._emailService.sendEmail(to, subject, html, replyTo); await this._emailService.sendEmail(to, subject, html, replyTo);
} }

View File

@ -27,24 +27,6 @@ export class PostsRepository {
private _errors: PrismaRepository<'errors'> private _errors: PrismaRepository<'errors'>
) {} ) {}
checkPending15minutesBack() {
return this._post.model.post.findMany({
where: {
publishDate: {
lte: dayjs.utc().subtract(15, 'minute').toDate(),
gte: dayjs.utc().subtract(30, 'minute').toDate(),
},
state: 'QUEUE',
deletedAt: null,
parentPostId: null,
},
select: {
id: true,
publishDate: true,
},
});
}
searchForMissingThreeHoursPosts() { searchForMissingThreeHoursPosts() {
return this._post.model.post.findMany({ return this._post.model.post.findMany({
where: { where: {
@ -54,8 +36,8 @@ export class PostsRepository {
disabled: false, disabled: false,
}, },
publishDate: { publishDate: {
gte: dayjs.utc().toDate(), gte: dayjs.utc().subtract(2, 'hour').toDate(),
lt: dayjs.utc().add(3, 'hour').toDate(), lt: dayjs.utc().add(2, 'hour').toDate(),
}, },
state: 'QUEUE', state: 'QUEUE',
deletedAt: null, deletedAt: null,
@ -63,6 +45,12 @@ export class PostsRepository {
}, },
select: { select: {
id: true, id: true,
organizationId: true,
integration: {
select: {
providerIdentifier: true,
}
},
publishDate: true, publishDate: true,
}, },
}); });

View File

@ -49,6 +49,10 @@ export class PostsService {
private _temporalService: TemporalService private _temporalService: TemporalService
) {} ) {}
searchForMissingThreeHoursPosts() {
return this._postRepository.searchForMissingThreeHoursPosts();
}
updatePost(id: string, postId: string, releaseURL: string) { updatePost(id: string, postId: string, releaseURL: string) {
return this._postRepository.updatePost(id, postId, releaseURL); return this._postRepository.updatePost(id, postId, releaseURL);
} }

View File

@ -0,0 +1,31 @@
import { Global, Injectable, Module, OnModuleInit } from '@nestjs/common';
import { TemporalService } from 'nestjs-temporal-core';
@Injectable()
export class InfiniteWorkflowRegister implements OnModuleInit {
constructor(private _temporalService: TemporalService) {}
async onModuleInit(): Promise<void> {
if (!!process.env.RUN_CRON) {
try {
await this._temporalService.client
?.getRawClient()
?.workflow?.start('missingPostWorkflow', {
workflowId: 'missing-post-workflow',
taskQueue: 'main',
});
} catch (err) {}
}
}
}
@Global()
@Module({
imports: [],
controllers: [],
providers: [InfiniteWorkflowRegister],
get exports() {
return this.providers;
},
})
export class InfiniteWorkflowRegisterModule {}

View File

@ -16,7 +16,7 @@
"publish-sdk": "pnpm run --filter ./apps/sdk publish", "publish-sdk": "pnpm run --filter ./apps/sdk publish",
"pm2-run": "pm2 delete all || true && pnpm run prisma-db-push && pnpm run --parallel pm2 && pm2 logs", "pm2-run": "pm2 delete all || true && pnpm run prisma-db-push && pnpm run --parallel pm2 && pm2 logs",
"dev:stripe": "pnpm dlx concurrently \"stripe listen --forward-to localhost:3000/stripe\" \"pnpm run dev\"", "dev:stripe": "pnpm dlx concurrently \"stripe listen --forward-to localhost:3000/stripe\" \"pnpm run dev\"",
"build": "pnpm -r --workspace-concurrency=1 --filter ./apps/frontend --filter ./apps/backend --filter ./apps/cron run build", "build": "pnpm -r --workspace-concurrency=1 --filter ./apps/frontend --filter ./apps/backend --filter ./apps/orchestrator run build",
"build:backend": "rm -rf apps/backend/dist && pnpm --filter ./apps/backend run build", "build:backend": "rm -rf apps/backend/dist && pnpm --filter ./apps/backend run build",
"build:frontend": "rm -rf apps/frontend/dist && pnpm --filter ./apps/frontend run build", "build:frontend": "rm -rf apps/frontend/dist && pnpm --filter ./apps/frontend run build",
"build:orchestrator": "rm -rf apps/orchestrator/dist && pnpm --filter ./apps/orchestrator run build", "build:orchestrator": "rm -rf apps/orchestrator/dist && pnpm --filter ./apps/orchestrator run build",
@ -26,7 +26,6 @@
"dev:orchestrator": "rm -rf apps/orchestrator/dist && pnpm --filter ./apps/orchestrator run dev", "dev:orchestrator": "rm -rf apps/orchestrator/dist && pnpm --filter ./apps/orchestrator run dev",
"start:prod:backend": "pnpm --filter ./apps/backend run start", "start:prod:backend": "pnpm --filter ./apps/backend run start",
"start:prod:frontend": "pnpm --filter ./apps/frontend run start", "start:prod:frontend": "pnpm --filter ./apps/frontend run start",
"start:prod:cron": "pnpm --filter ./apps/cron run start",
"dev:docker": "docker compose -f ./docker-compose.dev.yaml up -d", "dev:docker": "docker compose -f ./docker-compose.dev.yaml up -d",
"commands:build:development": "pnpm --filter ./apps/commands run build", "commands:build:development": "pnpm --filter ./apps/commands run build",
"prisma-generate": "pnpm dlx prisma@6.5.0 generate --schema ./libraries/nestjs-libraries/src/database/prisma/schema.prisma", "prisma-generate": "pnpm dlx prisma@6.5.0 generate --schema ./libraries/nestjs-libraries/src/database/prisma/schema.prisma",

View File

@ -0,0 +1,7 @@
#!/bin/sh
sleep 5
tctl namespace create --namespace "default" --description 'Default namespace' --rd 1
tini -s -- sleep infinity