import { join } from 'path' import Piscina from 'piscina' import { JOB_CONCURRENCY, WORKER_THREADS } from '@server/initializers/constants.js' import type httpBroadcast from './workers/http-broadcast.js' import type downloadImage from './workers/image-downloader.js' import type processImage from './workers/image-processor.js' import type getImageSize from './workers/get-image-size.js' import type signJsonLDObject from './workers/sign-json-ld-object.js' import type buildDigest from './workers/build-digest.js' import type httpUnicast from './workers/http-unicast.js' import { logger } from '@server/helpers/logger.js' let downloadImageWorker: Piscina export function downloadImageFromWorker (options: Parameters[0]): Promise> { if (!downloadImageWorker) { downloadImageWorker = new Piscina({ filename: new URL(join('workers', 'image-downloader.js'), import.meta.url).href, concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY, maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS, minThreads: 1 }) downloadImageWorker.on('error', err => logger.error('Error in download image worker', { err })) } return downloadImageWorker.run(options) } // --------------------------------------------------------------------------- let processImageWorker: Piscina export function processImageFromWorker (options: Parameters[0]): Promise> { if (!processImageWorker) { processImageWorker = new Piscina({ filename: new URL(join('workers', 'image-processor.js'), import.meta.url).href, concurrentTasksPerWorker: WORKER_THREADS.PROCESS_IMAGE.CONCURRENCY, maxThreads: WORKER_THREADS.PROCESS_IMAGE.MAX_THREADS, minThreads: 1 }) processImageWorker.on('error', err => logger.error('Error in process image worker', { err })) } return processImageWorker.run(options) } // --------------------------------------------------------------------------- let getImageSizeWorker: Piscina export function getImageSizeFromWorker (options: Parameters[0]): Promise> { if (!getImageSizeWorker) { getImageSizeWorker = new Piscina({ filename: new URL(join('workers', 'get-image-size.js'), import.meta.url).href, concurrentTasksPerWorker: WORKER_THREADS.GET_IMAGE_SIZE.CONCURRENCY, maxThreads: WORKER_THREADS.GET_IMAGE_SIZE.MAX_THREADS, minThreads: 1 }) getImageSizeWorker.on('error', err => logger.error('Error in get image size worker', { err })) } return getImageSizeWorker.run(options) } // --------------------------------------------------------------------------- let parallelHTTPBroadcastWorker: Piscina export function parallelHTTPBroadcastFromWorker (options: Parameters[0]): Promise> { if (!parallelHTTPBroadcastWorker) { parallelHTTPBroadcastWorker = new Piscina({ filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href, // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'], maxThreads: 1, minThreads: 1 }) parallelHTTPBroadcastWorker.on('error', err => logger.error('Error in parallel HTTP broadcast worker', { err })) } return parallelHTTPBroadcastWorker.run(options) } // --------------------------------------------------------------------------- let sequentialHTTPBroadcastWorker: Piscina export function sequentialHTTPBroadcastFromWorker ( options: Parameters[0] ): Promise> { if (!sequentialHTTPBroadcastWorker) { sequentialHTTPBroadcastWorker = new Piscina({ filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href, // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast'], maxThreads: 1, minThreads: 1 }) sequentialHTTPBroadcastWorker.on('error', err => logger.error('Error in sequential HTTP broadcast image worker', { err })) } return sequentialHTTPBroadcastWorker.run(options) } // --------------------------------------------------------------------------- let httpUnicastWorker: Piscina export function httpUnicastFromWorker ( options: Parameters[0] ): Promise> { if (!httpUnicastWorker) { httpUnicastWorker = new Piscina({ filename: new URL(join('workers', 'http-unicast.js'), import.meta.url).href, // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-unicast'], maxThreads: 1, minThreads: 1 }) httpUnicastWorker.on('error', err => logger.error('Error in HTTP unicast worker', { err })) } return httpUnicastWorker.run(options) } // --------------------------------------------------------------------------- let signJsonLDObjectWorker: Piscina export function signJsonLDObjectFromWorker ( options: Parameters>[0] ): ReturnType> { if (!signJsonLDObjectWorker) { signJsonLDObjectWorker = new Piscina({ filename: new URL(join('workers', 'sign-json-ld-object.js'), import.meta.url).href, concurrentTasksPerWorker: WORKER_THREADS.SIGN_JSON_LD_OBJECT.CONCURRENCY, maxThreads: WORKER_THREADS.SIGN_JSON_LD_OBJECT.MAX_THREADS, minThreads: 1 }) signJsonLDObjectWorker.on('error', err => logger.error('Error in sign JSONLD object worker', { err })) } return signJsonLDObjectWorker.run(options) } // --------------------------------------------------------------------------- let buildDigestWorker: Piscina export function buildDigestFromWorker ( options: Parameters[0] ): Promise> { if (!buildDigestWorker) { buildDigestWorker = new Piscina({ filename: new URL(join('workers', 'build-digest.js'), import.meta.url).href, // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs concurrentTasksPerWorker: WORKER_THREADS.BUILD_DIGEST.CONCURRENCY, maxThreads: WORKER_THREADS.BUILD_DIGEST.MAX_THREADS, minThreads: 1 }) buildDigestWorker.on('error', err => logger.error('Error in build digest worker', { err })) } return buildDigestWorker.run(options) } // --------------------------------------------------------------------------- export function getWorkersStats () { return [ { label: 'downloadImage', queueSize: downloadImageWorker?.queueSize || 0, completed: downloadImageWorker?.completed || 0 }, { label: 'processImageWorker', queueSize: processImageWorker?.queueSize || 0, completed: processImageWorker?.completed || 0 }, { label: 'getImageSizeWorker', queueSize: getImageSizeWorker?.queueSize || 0, completed: getImageSizeWorker?.completed || 0 }, { label: 'parallelHTTPBroadcastWorker', queueSize: parallelHTTPBroadcastWorker?.queueSize || 0, completed: parallelHTTPBroadcastWorker?.completed || 0 }, { label: 'sequentialHTTPBroadcastWorker', queueSize: sequentialHTTPBroadcastWorker?.queueSize || 0, completed: sequentialHTTPBroadcastWorker?.completed || 0 }, { label: 'httpUnicastWorker', queueSize: httpUnicastWorker?.queueSize || 0, completed: httpUnicastWorker?.completed || 0 }, { label: 'signJsonLDObjectWorker', queueSize: signJsonLDObjectWorker?.queueSize || 0, completed: signJsonLDObjectWorker?.completed || 0 }, { label: 'buildDigestWorker', queueSize: buildDigestWorker?.queueSize || 0, completed: buildDigestWorker?.completed || 0 } ] }