Fix broadcasting in parallel views

pull/5441/head
Chocobozzz 2022-11-16 14:40:10 +01:00
parent 2198bb5a19
commit f240fb4bea
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
3 changed files with 9 additions and 15 deletions

View File

@ -786,14 +786,6 @@ const WORKER_THREADS = {
PROCESS_IMAGE: { PROCESS_IMAGE: {
CONCURRENCY: 1, CONCURRENCY: 1,
MAX_THREADS: 5 MAX_THREADS: 5
},
SEQUENTIAL_HTTP_BROADCAST: {
CONCURRENCY: 1,
MAX_THREADS: 1
},
PARALLEL_HTTP_BROADCAST: {
CONCURRENCY: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
MAX_THREADS: 1
} }
} }

View File

@ -1,7 +1,7 @@
import { Job } from 'bullmq' import { Job } from 'bullmq'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
import { sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process' import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process'
import { ActivitypubHttpBroadcastPayload } from '@shared/models' import { ActivitypubHttpBroadcastPayload } from '@shared/models'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
@ -22,7 +22,7 @@ async function processActivityPubParallelHttpBroadcast (job: Job<ActivitypubHttp
const requestOptions = await buildRequestOptions(job.data) const requestOptions = await buildRequestOptions(job.data)
const { badUrls, goodUrls } = await sequentialHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions }) const { badUrls, goodUrls } = await parallelHTTPBroadcastFromWorker({ uris: job.data.uris, requestOptions })
return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
} }

View File

@ -1,7 +1,7 @@
import { join } from 'path' import { join } from 'path'
import Piscina from 'piscina' import Piscina from 'piscina'
import { processImage } from '@server/helpers/image-utils' import { processImage } from '@server/helpers/image-utils'
import { WORKER_THREADS } from '@server/initializers/constants' import { JOB_CONCURRENCY, WORKER_THREADS } from '@server/initializers/constants'
import { httpBroadcast } from './workers/http-broadcast' import { httpBroadcast } from './workers/http-broadcast'
import { downloadImage } from './workers/image-downloader' import { downloadImage } from './workers/image-downloader'
@ -43,8 +43,9 @@ function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadca
if (!parallelHTTPBroadcastWorker) { if (!parallelHTTPBroadcastWorker) {
parallelHTTPBroadcastWorker = new Piscina({ parallelHTTPBroadcastWorker = new Piscina({
filename: join(__dirname, 'workers', 'http-broadcast.js'), filename: join(__dirname, 'workers', 'http-broadcast.js'),
concurrentTasksPerWorker: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.CONCURRENCY, // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
maxThreads: WORKER_THREADS.PARALLEL_HTTP_BROADCAST.MAX_THREADS concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
maxThreads: 1
}) })
} }
@ -59,8 +60,9 @@ function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroad
if (!sequentialHTTPBroadcastWorker) { if (!sequentialHTTPBroadcastWorker) {
sequentialHTTPBroadcastWorker = new Piscina({ sequentialHTTPBroadcastWorker = new Piscina({
filename: join(__dirname, 'workers', 'http-broadcast.js'), filename: join(__dirname, 'workers', 'http-broadcast.js'),
concurrentTasksPerWorker: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.CONCURRENCY, // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
maxThreads: WORKER_THREADS.SEQUENTIAL_HTTP_BROADCAST.MAX_THREADS concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast'],
maxThreads: 1
}) })
} }