feat: concurrency per platform

This commit is contained in:
Nevo David 2025-08-07 17:55:15 +07:00
parent 8559feed4e
commit 7bf2549ffc
29 changed files with 53 additions and 19 deletions

View File

@ -6,26 +6,31 @@ const connection = new Bottleneck.IORedisConnection({
client: ioRedis,
});
const bottleneck = new Bottleneck.Group({
maxConcurrent: 1,
datastore: 'ioredis',
connection,
});
const mapper = {} as Record<string, Bottleneck>;
export async function concurrencyService<T>(
export const concurrency = async <T>(
identifier: string,
maxConcurrent = 1,
func: (...args: any[]) => Promise<T>
): Promise<T> {
) => {
const strippedIdentifier = identifier.toLowerCase().split('-')[0];
mapper[strippedIdentifier] ??= new Bottleneck({
id: strippedIdentifier + '-concurrency',
maxConcurrent,
datastore: 'ioredis',
connection,
});
let load: T;
try {
load = await bottleneck
.key(identifier.split('-')[0])
.schedule<T>({ expiration: 120_000 }, async () => {
load = await mapper[strippedIdentifier].schedule<T>(
{ expiration: 120_000 },
async () => {
const res = await func();
await timer(2000);
return res;
});
}
);
} catch (err) {}
return load;
}
};

View File

@ -1,5 +1,5 @@
import { timer } from '@gitroom/helpers/utils/timer';
import { concurrencyService } from '@gitroom/helpers/utils/concurrency.service';
import { concurrency } from '@gitroom/helpers/utils/concurrency.service';
import { Integration } from '@prisma/client';
export class RefreshToken {
@ -25,6 +25,7 @@ export class NotEnoughScopes {
export abstract class SocialAbstract {
abstract identifier: string;
maxConcurrentJob = 1;
public handleErrors(
body: string
@ -42,8 +43,9 @@ export abstract class SocialAbstract {
}
async runInConcurrent<T>(func: (...args: any[]) => Promise<T>) {
const value = await concurrencyService<any>(
this.identifier.split('-')[0],
const value = await concurrency<any>(
this.identifier,
this.maxConcurrentJob,
async () => {
try {
return await func();
@ -67,8 +69,9 @@ export abstract class SocialAbstract {
identifier = '',
totalRetries = 0
): Promise<Response> {
const request = await concurrencyService(
this.identifier.split('-')[0],
const request = await concurrency(
this.identifier,
this.maxConcurrentJob,
() => fetch(url, options)
);

View File

@ -132,6 +132,7 @@ async function uploadVideo(
}
export class BlueskyProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 2; // Bluesky has moderate rate limits
identifier = 'bluesky';
name = 'Bluesky';
isBetweenSteps = false;

View File

@ -10,6 +10,7 @@ import { Integration } from '@prisma/client';
import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
export class DevToProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 3; // Dev.to has moderate publishing limits
identifier = 'devto';
name = 'Dev.to';
isBetweenSteps = false;

View File

@ -9,6 +9,7 @@ import { SocialAbstract } from '@gitroom/nestjs-libraries/integrations/social.ab
import { Integration } from '@prisma/client';
export class DiscordProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 5; // Discord has generous rate limits for webhook posting
identifier = 'discord';
name = 'Discord';
isBetweenSteps = false;

View File

@ -13,6 +13,7 @@ import { DribbbleDto } from '@gitroom/nestjs-libraries/dtos/posts/providers-sett
import mime from 'mime-types';
export class DribbbleProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 3; // Dribbble has moderate API limits
identifier = 'dribbble';
name = 'Dribbble';
isBetweenSteps = false;

View File

@ -22,6 +22,7 @@ export class FacebookProvider extends SocialAbstract implements SocialProvider {
'pages_read_engagement',
'read_insights',
];
override maxConcurrentJob = 3; // Facebook has reasonable rate limits
editor = 'normal' as const;
override handleErrors(body: string):

View File

@ -25,6 +25,7 @@ export class FarcasterProvider
isBetweenSteps = false;
isWeb3 = true;
scopes = [] as string[];
override maxConcurrentJob = 3; // Farcaster has moderate limits
editor = 'normal' as const;
async refreshToken(refresh_token: string): Promise<AuthTokenDetails> {

View File

@ -13,6 +13,7 @@ import { Integration } from '@prisma/client';
import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
export class HashnodeProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 3; // Hashnode has lenient publishing limits
identifier = 'hashnode';
name = 'Hashnode';
isBetweenSteps = false;

View File

@ -29,6 +29,7 @@ export class InstagramProvider
'instagram_manage_comments',
'instagram_manage_insights',
];
override maxConcurrentJob = 2; // Instagram/Facebook has moderate rate limits
editor = 'normal' as const;
async refreshToken(refresh_token: string): Promise<AuthTokenDetails> {

View File

@ -26,6 +26,7 @@ export class InstagramStandaloneProvider
'instagram_business_manage_comments',
'instagram_business_manage_insights',
];
override maxConcurrentJob = 1; // Instagram standalone has stricter limits
editor = 'normal' as const;

View File

@ -13,6 +13,7 @@ import { LemmySettingsDto } from '@gitroom/nestjs-libraries/dtos/posts/providers
import { groupBy } from 'lodash';
export class LemmyProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 3; // Lemmy instances typically have moderate limits
identifier = 'lemmy';
name = 'Lemmy';
isBetweenSteps = false;

View File

@ -20,6 +20,7 @@ export class LinkedinPageProvider
override name = 'LinkedIn Page';
override isBetweenSteps = true;
override refreshWait = true;
override maxConcurrentJob = 2; // LinkedIn Page has professional posting limits
override scopes = [
'openid',
'profile',

View File

@ -30,6 +30,7 @@ export class LinkedinProvider extends SocialAbstract implements SocialProvider {
'w_organization_social',
'r_organization_social',
];
override maxConcurrentJob = 2; // LinkedIn has professional posting limits
refreshWait = true;
editor = 'normal' as const;

View File

@ -9,6 +9,7 @@ import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
export class MastodonCustomProvider extends MastodonProvider {
override identifier = 'mastodon-custom';
override name = 'M. Instance';
override maxConcurrentJob = 5; // Custom Mastodon instances typically have generous limits
editor = 'normal' as const;
async externalUrl(url: string) {

View File

@ -9,6 +9,7 @@ import { SocialAbstract } from '@gitroom/nestjs-libraries/integrations/social.ab
import dayjs from 'dayjs';
export class MastodonProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 5; // Mastodon instances typically have generous limits
identifier = 'mastodon';
name = 'Mastodon';
isBetweenSteps = false;

View File

@ -10,6 +10,7 @@ import { Integration } from '@prisma/client';
import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
export class MediumProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 3; // Medium has lenient publishing limits
identifier = 'medium';
name = 'Medium';
isBetweenSteps = false;

View File

@ -24,6 +24,7 @@ const list = [
];
export class NostrProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 5; // Nostr relays typically have generous limits
identifier = 'nostr';
name = 'Nostr';
isBetweenSteps = false;

View File

@ -27,6 +27,7 @@ export class PinterestProvider
'pins:write',
'user_accounts:read',
];
override maxConcurrentJob = 3; // Pinterest has more lenient rate limits
editor = 'normal' as const;

View File

@ -11,6 +11,7 @@ import { groupBy } from 'lodash';
import { SocialAbstract } from '@gitroom/nestjs-libraries/integrations/social.abstract';
export class RedditProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 1; // Reddit has strict rate limits (1 request per second)
identifier = 'reddit';
name = 'Reddit';
isBetweenSteps = false;

View File

@ -10,6 +10,7 @@ import dayjs from 'dayjs';
import { Integration } from '@prisma/client';
export class SlackProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 3; // Slack has moderate API limits
identifier = 'slack';
name = 'Slack';
isBetweenSteps = false;

View File

@ -19,6 +19,7 @@ const frontendURL = process.env.FRONTEND_URL || 'http://localhost:5000';
const mediaStorage = process.env.STORAGE_PROVIDER || 'local';
export class TelegramProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 3; // Telegram has moderate bot API limits
identifier = 'telegram';
name = 'Telegram';
isBetweenSteps = false;

View File

@ -26,6 +26,7 @@ export class ThreadsProvider extends SocialAbstract implements SocialProvider {
'threads_manage_insights',
// 'threads_profile_discovery',
];
override maxConcurrentJob = 2; // Threads has moderate rate limits
editor = 'normal' as const;

View File

@ -24,6 +24,7 @@ export class TiktokProvider extends SocialAbstract implements SocialProvider {
'video.upload',
'user.info.profile',
];
override maxConcurrentJob = 1; // TikTok has strict video upload limits
editor = 'normal' as const;

View File

@ -13,6 +13,7 @@ import FormDataNew from 'form-data';
import mime from 'mime-types';
export class VkProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 2; // VK has moderate API limits
identifier = 'vk';
name = 'VK';
isBetweenSteps = false;

View File

@ -22,6 +22,7 @@ export class WordpressProvider
isBetweenSteps = false;
editor = 'html' as const;
scopes = [] as string[];
override maxConcurrentJob = 5; // WordPress self-hosted typically has generous limits
async generateAuthUrl() {
const state = makeId(6);

View File

@ -23,6 +23,7 @@ export class XProvider extends SocialAbstract implements SocialProvider {
name = 'X';
isBetweenSteps = false;
scopes = [] as string[];
override maxConcurrentJob = 1; // X has strict rate limits (300 posts per 3 hours)
toolTip =
'You will be logged in into your current account, if you would like a different account, change it first on X';

View File

@ -48,6 +48,7 @@ const clientAndYoutube = () => {
};
export class YoutubeProvider extends SocialAbstract implements SocialProvider {
override maxConcurrentJob = 1; // YouTube has strict upload quotas
identifier = 'youtube';
name = 'YouTube';
isBetweenSteps = false;

View File

@ -3,7 +3,7 @@ import { EmailInterface } from '@gitroom/nestjs-libraries/emails/email.interface
import { ResendProvider } from '@gitroom/nestjs-libraries/emails/resend.provider';
import { EmptyProvider } from '@gitroom/nestjs-libraries/emails/empty.provider';
import { NodeMailerProvider } from '@gitroom/nestjs-libraries/emails/node.mailer.provider';
import { concurrencyService } from '@gitroom/helpers/utils/concurrency.service';
import { concurrency } from '@gitroom/helpers/utils/concurrency.service';
@Injectable()
export class EmailService {
@ -96,7 +96,7 @@ export class EmailService {
</div>
`;
const sends = await concurrencyService('send-email', () =>
const sends = await concurrency('send-email', 1, () =>
this.emailService.sendEmail(
to,
subject,