diff --git a/libraries/helpers/src/utils/concurrency.service.ts b/libraries/helpers/src/utils/concurrency.service.ts index 3a3a80b8..e737d937 100644 --- a/libraries/helpers/src/utils/concurrency.service.ts +++ b/libraries/helpers/src/utils/concurrency.service.ts @@ -6,26 +6,31 @@ const connection = new Bottleneck.IORedisConnection({ client: ioRedis, }); -const bottleneck = new Bottleneck.Group({ - maxConcurrent: 1, - datastore: 'ioredis', - connection, -}); +const mapper = {} as Record; -export async function concurrencyService( +export const concurrency = async ( identifier: string, + maxConcurrent = 1, func: (...args: any[]) => Promise -): Promise { +) => { + const strippedIdentifier = identifier.toLowerCase().split('-')[0]; + mapper[strippedIdentifier] ??= new Bottleneck({ + id: strippedIdentifier + '-concurrency', + maxConcurrent, + datastore: 'ioredis', + connection, + }); let load: T; try { - load = await bottleneck - .key(identifier.split('-')[0]) - .schedule({ expiration: 120_000 }, async () => { + load = await mapper[strippedIdentifier].schedule( + { expiration: 120_000 }, + async () => { const res = await func(); await timer(2000); return res; - }); + } + ); } catch (err) {} return load; -} +}; diff --git a/libraries/nestjs-libraries/src/integrations/social.abstract.ts b/libraries/nestjs-libraries/src/integrations/social.abstract.ts index cd14c6e8..0d46f04b 100644 --- a/libraries/nestjs-libraries/src/integrations/social.abstract.ts +++ b/libraries/nestjs-libraries/src/integrations/social.abstract.ts @@ -1,5 +1,5 @@ import { timer } from '@gitroom/helpers/utils/timer'; -import { concurrencyService } from '@gitroom/helpers/utils/concurrency.service'; +import { concurrency } from '@gitroom/helpers/utils/concurrency.service'; import { Integration } from '@prisma/client'; export class RefreshToken { @@ -25,6 +25,7 @@ export class NotEnoughScopes { export abstract class SocialAbstract { abstract identifier: string; + maxConcurrentJob = 1; public handleErrors( body: string @@ -42,8 +43,9 @@ export abstract class SocialAbstract { } async runInConcurrent(func: (...args: any[]) => Promise) { - const value = await concurrencyService( - this.identifier.split('-')[0], + const value = await concurrency( + this.identifier, + this.maxConcurrentJob, async () => { try { return await func(); @@ -67,8 +69,9 @@ export abstract class SocialAbstract { identifier = '', totalRetries = 0 ): Promise { - const request = await concurrencyService( - this.identifier.split('-')[0], + const request = await concurrency( + this.identifier, + this.maxConcurrentJob, () => fetch(url, options) ); diff --git a/libraries/nestjs-libraries/src/integrations/social/bluesky.provider.ts b/libraries/nestjs-libraries/src/integrations/social/bluesky.provider.ts index 83f437e1..df739a5e 100644 --- a/libraries/nestjs-libraries/src/integrations/social/bluesky.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/bluesky.provider.ts @@ -132,6 +132,7 @@ async function uploadVideo( } export class BlueskyProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 2; // Bluesky has moderate rate limits identifier = 'bluesky'; name = 'Bluesky'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/dev.to.provider.ts b/libraries/nestjs-libraries/src/integrations/social/dev.to.provider.ts index 76f9c29c..a8e422b4 100644 --- a/libraries/nestjs-libraries/src/integrations/social/dev.to.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/dev.to.provider.ts @@ -10,6 +10,7 @@ import { Integration } from '@prisma/client'; import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; export class DevToProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 3; // Dev.to has moderate publishing limits identifier = 'devto'; name = 'Dev.to'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/discord.provider.ts b/libraries/nestjs-libraries/src/integrations/social/discord.provider.ts index 4c685d5e..54728649 100644 --- a/libraries/nestjs-libraries/src/integrations/social/discord.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/discord.provider.ts @@ -9,6 +9,7 @@ import { SocialAbstract } from '@gitroom/nestjs-libraries/integrations/social.ab import { Integration } from '@prisma/client'; export class DiscordProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 5; // Discord has generous rate limits for webhook posting identifier = 'discord'; name = 'Discord'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/dribbble.provider.ts b/libraries/nestjs-libraries/src/integrations/social/dribbble.provider.ts index 4dcab6b9..a8b76758 100644 --- a/libraries/nestjs-libraries/src/integrations/social/dribbble.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/dribbble.provider.ts @@ -13,6 +13,7 @@ import { DribbbleDto } from '@gitroom/nestjs-libraries/dtos/posts/providers-sett import mime from 'mime-types'; export class DribbbleProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 3; // Dribbble has moderate API limits identifier = 'dribbble'; name = 'Dribbble'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/facebook.provider.ts b/libraries/nestjs-libraries/src/integrations/social/facebook.provider.ts index 491db49d..be62e6d3 100644 --- a/libraries/nestjs-libraries/src/integrations/social/facebook.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/facebook.provider.ts @@ -22,6 +22,7 @@ export class FacebookProvider extends SocialAbstract implements SocialProvider { 'pages_read_engagement', 'read_insights', ]; + override maxConcurrentJob = 3; // Facebook has reasonable rate limits editor = 'normal' as const; override handleErrors(body: string): diff --git a/libraries/nestjs-libraries/src/integrations/social/farcaster.provider.ts b/libraries/nestjs-libraries/src/integrations/social/farcaster.provider.ts index 18b89a86..4bb81bdd 100644 --- a/libraries/nestjs-libraries/src/integrations/social/farcaster.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/farcaster.provider.ts @@ -25,6 +25,7 @@ export class FarcasterProvider isBetweenSteps = false; isWeb3 = true; scopes = [] as string[]; + override maxConcurrentJob = 3; // Farcaster has moderate limits editor = 'normal' as const; async refreshToken(refresh_token: string): Promise { diff --git a/libraries/nestjs-libraries/src/integrations/social/hashnode.provider.ts b/libraries/nestjs-libraries/src/integrations/social/hashnode.provider.ts index 55e097f4..eee8978e 100644 --- a/libraries/nestjs-libraries/src/integrations/social/hashnode.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/hashnode.provider.ts @@ -13,6 +13,7 @@ import { Integration } from '@prisma/client'; import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; export class HashnodeProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 3; // Hashnode has lenient publishing limits identifier = 'hashnode'; name = 'Hashnode'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/instagram.provider.ts b/libraries/nestjs-libraries/src/integrations/social/instagram.provider.ts index 6f80ff83..131a0f72 100644 --- a/libraries/nestjs-libraries/src/integrations/social/instagram.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/instagram.provider.ts @@ -29,6 +29,7 @@ export class InstagramProvider 'instagram_manage_comments', 'instagram_manage_insights', ]; + override maxConcurrentJob = 2; // Instagram/Facebook has moderate rate limits editor = 'normal' as const; async refreshToken(refresh_token: string): Promise { diff --git a/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts b/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts index 8e1ec25d..24c5e09a 100644 --- a/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/instagram.standalone.provider.ts @@ -26,6 +26,7 @@ export class InstagramStandaloneProvider 'instagram_business_manage_comments', 'instagram_business_manage_insights', ]; + override maxConcurrentJob = 1; // Instagram standalone has stricter limits editor = 'normal' as const; diff --git a/libraries/nestjs-libraries/src/integrations/social/lemmy.provider.ts b/libraries/nestjs-libraries/src/integrations/social/lemmy.provider.ts index 56d46aee..5f2e0e6d 100644 --- a/libraries/nestjs-libraries/src/integrations/social/lemmy.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/lemmy.provider.ts @@ -13,6 +13,7 @@ import { LemmySettingsDto } from '@gitroom/nestjs-libraries/dtos/posts/providers import { groupBy } from 'lodash'; export class LemmyProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 3; // Lemmy instances typically have moderate limits identifier = 'lemmy'; name = 'Lemmy'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/linkedin.page.provider.ts b/libraries/nestjs-libraries/src/integrations/social/linkedin.page.provider.ts index 70c731ef..52cf8aff 100644 --- a/libraries/nestjs-libraries/src/integrations/social/linkedin.page.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/linkedin.page.provider.ts @@ -20,6 +20,7 @@ export class LinkedinPageProvider override name = 'LinkedIn Page'; override isBetweenSteps = true; override refreshWait = true; + override maxConcurrentJob = 2; // LinkedIn Page has professional posting limits override scopes = [ 'openid', 'profile', diff --git a/libraries/nestjs-libraries/src/integrations/social/linkedin.provider.ts b/libraries/nestjs-libraries/src/integrations/social/linkedin.provider.ts index 639fde39..aac124f7 100644 --- a/libraries/nestjs-libraries/src/integrations/social/linkedin.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/linkedin.provider.ts @@ -30,6 +30,7 @@ export class LinkedinProvider extends SocialAbstract implements SocialProvider { 'w_organization_social', 'r_organization_social', ]; + override maxConcurrentJob = 2; // LinkedIn has professional posting limits refreshWait = true; editor = 'normal' as const; diff --git a/libraries/nestjs-libraries/src/integrations/social/mastodon.custom.provider.ts b/libraries/nestjs-libraries/src/integrations/social/mastodon.custom.provider.ts index d38c1b2b..f1c078a6 100644 --- a/libraries/nestjs-libraries/src/integrations/social/mastodon.custom.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/mastodon.custom.provider.ts @@ -9,6 +9,7 @@ import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; export class MastodonCustomProvider extends MastodonProvider { override identifier = 'mastodon-custom'; override name = 'M. Instance'; + override maxConcurrentJob = 5; // Custom Mastodon instances typically have generous limits editor = 'normal' as const; async externalUrl(url: string) { diff --git a/libraries/nestjs-libraries/src/integrations/social/mastodon.provider.ts b/libraries/nestjs-libraries/src/integrations/social/mastodon.provider.ts index b9f1da7e..413f637c 100644 --- a/libraries/nestjs-libraries/src/integrations/social/mastodon.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/mastodon.provider.ts @@ -9,6 +9,7 @@ import { SocialAbstract } from '@gitroom/nestjs-libraries/integrations/social.ab import dayjs from 'dayjs'; export class MastodonProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 5; // Mastodon instances typically have generous limits identifier = 'mastodon'; name = 'Mastodon'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/medium.provider.ts b/libraries/nestjs-libraries/src/integrations/social/medium.provider.ts index cf7d05ee..db838978 100644 --- a/libraries/nestjs-libraries/src/integrations/social/medium.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/medium.provider.ts @@ -10,6 +10,7 @@ import { Integration } from '@prisma/client'; import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; export class MediumProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 3; // Medium has lenient publishing limits identifier = 'medium'; name = 'Medium'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/nostr.provider.ts b/libraries/nestjs-libraries/src/integrations/social/nostr.provider.ts index e165a31f..ccfd2871 100644 --- a/libraries/nestjs-libraries/src/integrations/social/nostr.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/nostr.provider.ts @@ -24,6 +24,7 @@ const list = [ ]; export class NostrProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 5; // Nostr relays typically have generous limits identifier = 'nostr'; name = 'Nostr'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/pinterest.provider.ts b/libraries/nestjs-libraries/src/integrations/social/pinterest.provider.ts index 61104316..0c574dc5 100644 --- a/libraries/nestjs-libraries/src/integrations/social/pinterest.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/pinterest.provider.ts @@ -27,6 +27,7 @@ export class PinterestProvider 'pins:write', 'user_accounts:read', ]; + override maxConcurrentJob = 3; // Pinterest has more lenient rate limits editor = 'normal' as const; diff --git a/libraries/nestjs-libraries/src/integrations/social/reddit.provider.ts b/libraries/nestjs-libraries/src/integrations/social/reddit.provider.ts index 1dfd9aaf..f3b962ba 100644 --- a/libraries/nestjs-libraries/src/integrations/social/reddit.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/reddit.provider.ts @@ -11,6 +11,7 @@ import { groupBy } from 'lodash'; import { SocialAbstract } from '@gitroom/nestjs-libraries/integrations/social.abstract'; export class RedditProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 1; // Reddit has strict rate limits (1 request per second) identifier = 'reddit'; name = 'Reddit'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/slack.provider.ts b/libraries/nestjs-libraries/src/integrations/social/slack.provider.ts index ad45c12a..459acff2 100644 --- a/libraries/nestjs-libraries/src/integrations/social/slack.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/slack.provider.ts @@ -10,6 +10,7 @@ import dayjs from 'dayjs'; import { Integration } from '@prisma/client'; export class SlackProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 3; // Slack has moderate API limits identifier = 'slack'; name = 'Slack'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/telegram.provider.ts b/libraries/nestjs-libraries/src/integrations/social/telegram.provider.ts index ed6f5027..0fc8e8c7 100644 --- a/libraries/nestjs-libraries/src/integrations/social/telegram.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/telegram.provider.ts @@ -19,6 +19,7 @@ const frontendURL = process.env.FRONTEND_URL || 'http://localhost:5000'; const mediaStorage = process.env.STORAGE_PROVIDER || 'local'; export class TelegramProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 3; // Telegram has moderate bot API limits identifier = 'telegram'; name = 'Telegram'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts b/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts index 6ae04186..932466b1 100644 --- a/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/threads.provider.ts @@ -26,6 +26,7 @@ export class ThreadsProvider extends SocialAbstract implements SocialProvider { 'threads_manage_insights', // 'threads_profile_discovery', ]; + override maxConcurrentJob = 2; // Threads has moderate rate limits editor = 'normal' as const; diff --git a/libraries/nestjs-libraries/src/integrations/social/tiktok.provider.ts b/libraries/nestjs-libraries/src/integrations/social/tiktok.provider.ts index fcb26d5d..fe0700f0 100644 --- a/libraries/nestjs-libraries/src/integrations/social/tiktok.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/tiktok.provider.ts @@ -24,6 +24,7 @@ export class TiktokProvider extends SocialAbstract implements SocialProvider { 'video.upload', 'user.info.profile', ]; + override maxConcurrentJob = 1; // TikTok has strict video upload limits editor = 'normal' as const; diff --git a/libraries/nestjs-libraries/src/integrations/social/vk.provider.ts b/libraries/nestjs-libraries/src/integrations/social/vk.provider.ts index e5b45abb..12811204 100644 --- a/libraries/nestjs-libraries/src/integrations/social/vk.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/vk.provider.ts @@ -13,6 +13,7 @@ import FormDataNew from 'form-data'; import mime from 'mime-types'; export class VkProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 2; // VK has moderate API limits identifier = 'vk'; name = 'VK'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/integrations/social/wordpress.provider.ts b/libraries/nestjs-libraries/src/integrations/social/wordpress.provider.ts index 0341b8f9..f7422e06 100644 --- a/libraries/nestjs-libraries/src/integrations/social/wordpress.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/wordpress.provider.ts @@ -22,6 +22,7 @@ export class WordpressProvider isBetweenSteps = false; editor = 'html' as const; scopes = [] as string[]; + override maxConcurrentJob = 5; // WordPress self-hosted typically has generous limits async generateAuthUrl() { const state = makeId(6); diff --git a/libraries/nestjs-libraries/src/integrations/social/x.provider.ts b/libraries/nestjs-libraries/src/integrations/social/x.provider.ts index 72d1210a..a92f2271 100644 --- a/libraries/nestjs-libraries/src/integrations/social/x.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/x.provider.ts @@ -23,6 +23,7 @@ export class XProvider extends SocialAbstract implements SocialProvider { name = 'X'; isBetweenSteps = false; scopes = [] as string[]; + override maxConcurrentJob = 1; // X has strict rate limits (300 posts per 3 hours) toolTip = 'You will be logged in into your current account, if you would like a different account, change it first on X'; diff --git a/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts b/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts index 4bc8c0ef..d8327ba3 100644 --- a/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts @@ -48,6 +48,7 @@ const clientAndYoutube = () => { }; export class YoutubeProvider extends SocialAbstract implements SocialProvider { + override maxConcurrentJob = 1; // YouTube has strict upload quotas identifier = 'youtube'; name = 'YouTube'; isBetweenSteps = false; diff --git a/libraries/nestjs-libraries/src/services/email.service.ts b/libraries/nestjs-libraries/src/services/email.service.ts index c78326b5..0e0943dc 100644 --- a/libraries/nestjs-libraries/src/services/email.service.ts +++ b/libraries/nestjs-libraries/src/services/email.service.ts @@ -3,7 +3,7 @@ import { EmailInterface } from '@gitroom/nestjs-libraries/emails/email.interface import { ResendProvider } from '@gitroom/nestjs-libraries/emails/resend.provider'; import { EmptyProvider } from '@gitroom/nestjs-libraries/emails/empty.provider'; import { NodeMailerProvider } from '@gitroom/nestjs-libraries/emails/node.mailer.provider'; -import { concurrencyService } from '@gitroom/helpers/utils/concurrency.service'; +import { concurrency } from '@gitroom/helpers/utils/concurrency.service'; @Injectable() export class EmailService { @@ -96,7 +96,7 @@ export class EmailService { `; - const sends = await concurrencyService('send-email', () => + const sends = await concurrency('send-email', 1, () => this.emailService.sendEmail( to, subject,