diff --git a/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts index f804aa06a..aa7ba1f7e 100644 --- a/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -2,8 +2,8 @@ import { Job } from 'bullmq' import { ActivitypubHttpUnicastPayload } from '@peertube/peertube-models' import { buildGlobalHTTPHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/http.js' import { logger } from '../../../helpers/logger.js' -import { doRequest } from '../../../helpers/requests.js' import { ActorFollowHealthCache } from '../../actor-follow-health-cache.js' +import { httpUnicastFromWorker } from '@server/lib/worker/parent-process.js' async function processActivityPubHttpUnicast (job: Job) { logger.info('Processing ActivityPub unicast in job %s.', job.id) @@ -22,7 +22,7 @@ async function processActivityPubHttpUnicast (job: Job) { } try { - await doRequest(uri, options) + await httpUnicastFromWorker({ uri, requestOptions: options }) ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], []) } catch (err) { ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ]) diff --git a/server/core/lib/worker/parent-process.ts b/server/core/lib/worker/parent-process.ts index 3e97fb7e9..64d264202 100644 --- a/server/core/lib/worker/parent-process.ts +++ b/server/core/lib/worker/parent-process.ts @@ -7,6 +7,7 @@ import type processImage from './workers/image-processor.js' import type getImageSize from './workers/get-image-size.js' import type signJsonLDObject from './workers/sign-json-ld-object.js' import type buildDigest from './workers/build-digest.js' +import type httpUnicast from './workers/http-unicast.js' let downloadImageWorker: Piscina @@ -92,6 +93,25 @@ export function sequentialHTTPBroadcastFromWorker ( // --------------------------------------------------------------------------- +let httpUnicastWorker: Piscina + +export function httpUnicastFromWorker ( + options: Parameters[0] +): Promise> { + if (!httpUnicastWorker) { + httpUnicastWorker = new Piscina({ + filename: new URL(join('workers', 'http-unicast.js'), import.meta.url).href, + // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs + concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-unicast'], + maxThreads: 1 + }) + } + + return httpUnicastWorker.run(options) +} + +// --------------------------------------------------------------------------- + let signJsonLDObjectWorker: Piscina export function signJsonLDObjectFromWorker ( @@ -100,7 +120,6 @@ export function signJsonLDObjectFromWorker ( if (!signJsonLDObjectWorker) { signJsonLDObjectWorker = new Piscina({ filename: new URL(join('workers', 'sign-json-ld-object.js'), import.meta.url).href, - // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs concurrentTasksPerWorker: WORKER_THREADS.SIGN_JSON_LD_OBJECT.CONCURRENCY, maxThreads: WORKER_THREADS.SIGN_JSON_LD_OBJECT.MAX_THREADS }) diff --git a/server/core/lib/worker/workers/http-unicast.ts b/server/core/lib/worker/workers/http-unicast.ts new file mode 100644 index 000000000..5c8e4793e --- /dev/null +++ b/server/core/lib/worker/workers/http-unicast.ts @@ -0,0 +1,10 @@ +import { doRequest, PeerTubeRequestOptions } from '@server/helpers/requests.js' + +async function httpUnicast (payload: { + uri: string + requestOptions: PeerTubeRequestOptions +}) { + await doRequest(payload.uri, payload.requestOptions) +} + +export default httpUnicast