fix: don't wait for workflow start, for faster creation

This commit is contained in:
Nevo David 2026-01-07 15:40:31 +07:00
parent 82a92afabb
commit bd24282d2b
1 changed files with 61 additions and 93 deletions

View File

@ -414,6 +414,57 @@ export class PostsService {
getPostByForWebhookId(id: string) {
return this._postRepository.getPostByForWebhookId(id);
}
async startWorkflow(taskQueue: string, postId: string, orgId: string) {
try {
const workflows = this._temporalService.client
.getRawClient()
?.workflow.list({
query: `postId="${postId}" AND ExecutionStatus="Running"`,
});
for await (const executionInfo of workflows) {
try {
const workflow = await this._temporalService.client.getWorkflowHandle(
executionInfo.workflowId
);
if (
workflow &&
(await workflow.describe()).status.name !== 'TERMINATED'
) {
await workflow.terminate();
}
} catch (err) {}
}
} catch (err) {}
try {
await this._temporalService.client
.getRawClient()
?.workflow.start('postWorkflowV101', {
workflowId: `post_${postId}`,
taskQueue: 'main',
args: [
{
taskQueue: taskQueue,
postId: postId,
organizationId: orgId,
},
],
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: postId,
},
{
key: organizationId,
value: orgId,
},
]),
});
} catch (err) {}
}
async createPost(orgId: string, body: CreatePostDto): Promise<any[]> {
const postList = [];
for (const post of body.posts) {
@ -440,54 +491,11 @@ export class PostsService {
return [] as any[];
}
new Promise(async () => {
try {
const workflows = this._temporalService.client
.getRawClient()
?.workflow.list({
query: `postId="${posts[0].id}" AND ExecutionStatus="Running"`,
});
for await (const executionInfo of workflows) {
try {
const workflow =
await this._temporalService.client.getWorkflowHandle(
executionInfo.workflowId
);
if (
workflow &&
(await workflow.describe()).status.name !== 'TERMINATED'
) {
await workflow.terminate();
}
} catch (err) {}
}
} catch (err) {}
await this._temporalService.client
.getRawClient()
?.workflow.start('postWorkflowV101', {
workflowId: `post_${posts[0].id}`,
taskQueue: 'main',
args: [
{
taskQueue: post.settings.__type.split('-')[0].toLowerCase(),
postId: posts[0].id,
organizationId: orgId,
},
],
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: posts[0].id,
},
{
key: organizationId,
value: orgId,
},
]),
});
}).catch((err) => {});
this.startWorkflow(
post.settings.__type.split('-')[0].toLowerCase(),
posts[0].id,
orgId
).catch((err) => {});
Sentry.metrics.count('post_created', 1);
postList.push({
@ -512,53 +520,13 @@ export class PostsService {
const newDate = await this._postRepository.changeDate(orgId, id, date);
try {
const workflows = this._temporalService.client
.getRawClient()
?.workflow.list({
query: `postId="${getPostById.id}" AND ExecutionStatus="Running"`,
});
for await (const executionInfo of workflows) {
try {
const workflow = await this._temporalService.client.getWorkflowHandle(
executionInfo.workflowId
);
if (
workflow &&
(await workflow.describe()).status.name !== 'TERMINATED'
) {
await workflow.terminate();
}
} catch (err) {}
}
await this.startWorkflow(
getPostById.integration.providerIdentifier.split('-')[0].toLowerCase(),
getPostById.id,
orgId
);
} catch (err) {}
await this._temporalService.client
.getRawClient()
?.workflow.start('postWorkflowV101', {
workflowId: `post_${getPostById.id}`,
taskQueue: 'main',
args: [
{
taskQueue: getPostById.integration.providerIdentifier
.split('-')[0]
.toLowerCase(),
postId: getPostById.id,
organizationId: orgId,
},
],
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: getPostById.id,
},
{
key: organizationId,
value: orgId,
},
]),
});
return newDate;
}