diff --git a/libraries/nestjs-libraries/src/integrations/social.abstract.ts b/libraries/nestjs-libraries/src/integrations/social.abstract.ts index 3d4e9802..9d807236 100644 --- a/libraries/nestjs-libraries/src/integrations/social.abstract.ts +++ b/libraries/nestjs-libraries/src/integrations/social.abstract.ts @@ -31,13 +31,32 @@ export abstract class SocialAbstract { return undefined; } + async runInConcurrent(func: (...args: any[]) => Promise) { + const value = await concurrencyService(this.identifier.split('-')[0], async () => { + try { + return await func(); + } catch (err) { + return {type: 'error', value: err}; + } + }); + + if (value && value.type === 'error') { + throw value.value; + } + + return value; + } + async fetch( url: string, options: RequestInit = {}, identifier = '', totalRetries = 0 ): Promise { - const request = await concurrencyService(this.identifier.split('-')[0], () => fetch(url, options)); + const request = await concurrencyService( + this.identifier.split('-')[0], + () => fetch(url, options) + ); if (request.status === 200 || request.status === 201) { return request; @@ -64,7 +83,10 @@ export abstract class SocialAbstract { const handleError = this.handleErrors(json || '{}'); - if (request.status === 401 && (handleError?.type === 'refresh-token' || !handleError)) { + if ( + request.status === 401 && + (handleError?.type === 'refresh-token' || !handleError) + ) { console.log('refresh token', json); throw new RefreshToken( identifier, @@ -74,7 +96,12 @@ export abstract class SocialAbstract { ); } - throw new BadBody(identifier, json, options.body!, handleError?.value || ''); + throw new BadBody( + identifier, + json, + options.body!, + handleError?.value || '' + ); } checkScopes(required: string[], got: string | string[]) { diff --git a/libraries/nestjs-libraries/src/integrations/social/x.provider.ts b/libraries/nestjs-libraries/src/integrations/social/x.provider.ts index 67f3b7f3..c0637035 100644 --- a/libraries/nestjs-libraries/src/integrations/social/x.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/x.provider.ts @@ -262,9 +262,11 @@ export class XProvider extends SocialAbstract implements SocialProvider { }); const { data: { username }, - } = await client.v2.me({ - 'user.fields': 'username', - }); + } = await this.runInConcurrent(async () => + client.v2.me({ + 'user.fields': 'username', + }) + ); // upload everything before, you don't want it to fail between the posts const uploadAll = ( @@ -272,20 +274,22 @@ export class XProvider extends SocialAbstract implements SocialProvider { postDetails.flatMap((p) => p?.media?.flatMap(async (m) => { return { - id: await client.v1.uploadMedia( - m.path.indexOf('mp4') > -1 - ? Buffer.from(await readOrFetch(m.path)) - : await sharp(await readOrFetch(m.path), { - animated: lookup(m.path) === 'image/gif', - }) - .resize({ - width: 1000, + id: await this.runInConcurrent(async () => + client.v1.uploadMedia( + m.path.indexOf('mp4') > -1 + ? Buffer.from(await readOrFetch(m.path)) + : await sharp(await readOrFetch(m.path), { + animated: lookup(m.path) === 'image/gif', }) - .gif() - .toBuffer(), - { - mimeType: lookup(m.path) || '', - } + .resize({ + width: 1000, + }) + .gif() + .toBuffer(), + { + mimeType: lookup(m.path) || '', + } + ) ), postId: p.id, }; @@ -308,25 +312,28 @@ export class XProvider extends SocialAbstract implements SocialProvider { const media_ids = (uploadAll[post.id] || []).filter((f) => f); // @ts-ignore - const { data }: { data: { id: string } } = await client.v2.tweet({ - ...(!postDetails?.[0]?.settings?.who_can_reply_post || - postDetails?.[0]?.settings?.who_can_reply_post === 'everyone' - ? {} - : { - reply_settings: postDetails?.[0]?.settings?.who_can_reply_post, - }), - ...(postDetails?.[0]?.settings?.community - ? { - community_id: - postDetails?.[0]?.settings?.community?.split('/').pop() || '', - } - : {}), - text: post.message, - ...(media_ids.length ? { media: { media_ids } } : {}), - ...(ids.length - ? { reply: { in_reply_to_tweet_id: ids[ids.length - 1].postId } } - : {}), - }); + const { data }: { data: { id: string } } = await this.runInConcurrent( async () => client.v2.tweet({ + ...(!postDetails?.[0]?.settings?.who_can_reply_post || + postDetails?.[0]?.settings?.who_can_reply_post === 'everyone' + ? {} + : { + reply_settings: + postDetails?.[0]?.settings?.who_can_reply_post, + }), + ...(postDetails?.[0]?.settings?.community + ? { + community_id: + postDetails?.[0]?.settings?.community?.split('/').pop() || + '', + } + : {}), + text: post.message, + ...(media_ids.length ? { media: { media_ids } } : {}), + ...(ids.length + ? { reply: { in_reply_to_tweet_id: ids[ids.length - 1].postId } } + : {}), + }) + ); ids.push({ postId: data.id, @@ -337,13 +344,15 @@ export class XProvider extends SocialAbstract implements SocialProvider { if (postDetails?.[0]?.settings?.active_thread_finisher) { try { - await client.v2.tweet({ - text: - postDetails?.[0]?.settings?.thread_finisher! + - '\n' + - ids[0].releaseURL, - reply: { in_reply_to_tweet_id: ids[ids.length - 1].postId }, - }); + await this.runInConcurrent(async () => + client.v2.tweet({ + text: + postDetails?.[0]?.settings?.thread_finisher! + + '\n' + + ids[0].releaseURL, + reply: { in_reply_to_tweet_id: ids[ids.length - 1].postId }, + }) + ); } catch (err) {} } diff --git a/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts b/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts index 39a1f502..1dbc6003 100644 --- a/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts +++ b/libraries/nestjs-libraries/src/integrations/social/youtube.provider.ts @@ -153,25 +153,27 @@ export class YoutubeProvider extends SocialAbstract implements SocialProvider { let all: GaxiosResponse; try { - all = await youtubeClient.videos.insert({ - part: ['id', 'snippet', 'status'], - notifySubscribers: true, - requestBody: { - snippet: { - title: settings.title, - description: firstPost?.message, - ...(settings?.tags?.length - ? { tags: settings.tags.map((p) => p.label) } - : {}), + all = await this.runInConcurrent(async () => + youtubeClient.videos.insert({ + part: ['id', 'snippet', 'status'], + notifySubscribers: true, + requestBody: { + snippet: { + title: settings.title, + description: firstPost?.message, + ...(settings?.tags?.length + ? { tags: settings.tags.map((p) => p.label) } + : {}), + }, + status: { + privacyStatus: settings.type, + }, }, - status: { - privacyStatus: settings.type, + media: { + body: response.data, }, - }, - media: { - body: response.data, - }, - }); + }) + ); } catch (err: any) { if ( err.response?.data?.error?.errors?.[0]?.reason === 'failedPrecondition' @@ -215,18 +217,20 @@ export class YoutubeProvider extends SocialAbstract implements SocialProvider { if (settings?.thumbnail?.path) { try { - await youtubeClient.thumbnails.set({ - videoId: all?.data?.id!, - media: { - body: ( - await axios({ - url: settings?.thumbnail?.path, - method: 'GET', - responseType: 'stream', - }) - ).data, - }, - }); + await this.runInConcurrent(async () => + youtubeClient.thumbnails.set({ + videoId: all?.data?.id!, + media: { + body: ( + await axios({ + url: settings?.thumbnail?.path, + method: 'GET', + responseType: 'stream', + }) + ).data, + }, + }) + ); } catch (err: any) { if ( err.response?.data?.error?.errors?.[0]?.domain === 'youtube.thumbnail'