diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts index c10d7c4c1..948356eec 100644 --- a/apps/peertube-runner/src/server/server.ts +++ b/apps/peertube-runner/src/server/server.ts @@ -237,8 +237,15 @@ export class RunnerServer { private async requestJob (server: PeerTubeServer) { logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`) - const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) + const { availableJobs } = await server.runnerJobs.request({ + runnerToken: server.runnerToken, + jobTypes: this.enabledJobs.size !== getSupportedJobsList().length + ? Array.from(this.enabledJobs) + : undefined + }) + + // FIXME: remove in PeerTube v8: jobTypes has been introduced in PeerTube v7, so do the filter here too const filtered = availableJobs.filter(j => isJobSupported(j, this.enabledJobs)) if (filtered.length === 0) { diff --git a/packages/models/src/runners/request-runner-job-body.model.ts b/packages/models/src/runners/request-runner-job-body.model.ts index 0970d9007..16f59a022 100644 --- a/packages/models/src/runners/request-runner-job-body.model.ts +++ b/packages/models/src/runners/request-runner-job-body.model.ts @@ -1,3 +1,6 @@ +import { RunnerJobType } from './runner-job-type.type.js' + export interface RequestRunnerJobBody { runnerToken: string + jobTypes?: RunnerJobType[] } diff --git a/packages/server-commands/src/runners/runner-jobs-command.ts b/packages/server-commands/src/runners/runner-jobs-command.ts index 64bcc5655..0aca00cec 100644 --- a/packages/server-commands/src/runners/runner-jobs-command.ts +++ b/packages/server-commands/src/runners/runner-jobs-command.ts @@ -79,30 +79,30 @@ export class RunnerJobsCommand extends AbstractCommand { ...options, path, - fields: pick(options, [ 'runnerToken' ]), + fields: pick(options, [ 'runnerToken', 'jobTypes' ]), implicitToken: false, defaultExpectedStatus: HttpStatusCode.OK_200 })) } async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) { - const vodTypes = new Set([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]) + const { availableJobs } = await this.request({ + ...options, - const { availableJobs } = await this.request(options) + jobTypes: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ] + }) - return { - availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) - } as RequestRunnerJobResult + return { availableJobs } as RequestRunnerJobResult } async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) { - const vodTypes = new Set([ 'live-rtmp-hls-transcoding' ]) + const { availableJobs } = await this.request({ + ...options, - const { availableJobs } = await this.request(options) + jobTypes: [ 'live-rtmp-hls-transcoding' ] + }) - return { - availableJobs: availableJobs.filter(j => vodTypes.has(j.type)) - } as RequestRunnerJobResult + return { availableJobs } as RequestRunnerJobResult } // --------------------------------------------------------------------------- diff --git a/packages/tests/src/api/check-params/runners.ts b/packages/tests/src/api/check-params/runners.ts index 04ed8b5cb..bc7259660 100644 --- a/packages/tests/src/api/check-params/runners.ts +++ b/packages/tests/src/api/check-params/runners.ts @@ -624,6 +624,14 @@ describe('Test managing runners', function () { it('Should fail with an unknown runner token', async function () { await server.runnerJobs.request({ runnerToken: badUUID, expectedStatus: HttpStatusCode.NOT_FOUND_404 }) }) + + it('Should fail with a bad jobTypes token', async function () { + await server.runnerJobs.request({ runnerToken, jobTypes: 'toto' as any, expectedStatus: HttpStatusCode.BAD_REQUEST_400 }) + }) + + it('Should succeed with the correct params', async function () { + await server.runnerJobs.request({ runnerToken, jobTypes: [] }) + }) }) describe('Accept', function () { diff --git a/packages/tests/src/api/runners/runner-common.ts b/packages/tests/src/api/runners/runner-common.ts index f0234b0e6..2119593c4 100644 --- a/packages/tests/src/api/runners/runner-common.ts +++ b/packages/tests/src/api/runners/runner-common.ts @@ -395,6 +395,18 @@ describe('Test runner common actions', function () { jobUUID = webVideoJobs[0].uuid }) + it('Should filter requested jobs', async function () { + { + const { availableJobs } = await server.runnerJobs.request({ runnerToken, jobTypes: [ 'vod-web-video-transcoding' ] }) + expect(availableJobs).to.have.lengthOf(2) + } + + { + const { availableJobs } = await server.runnerJobs.request({ runnerToken, jobTypes: [ 'vod-hls-transcoding' ] }) + expect(availableJobs).to.have.lengthOf(0) + } + }) + it('Should have sorted available jobs by priority', async function () { const { availableJobs } = await server.runnerJobs.request({ runnerToken }) diff --git a/server/core/controllers/api/runners/jobs.ts b/server/core/controllers/api/runners/jobs.ts index 5b81062d8..cb66acd52 100644 --- a/server/core/controllers/api/runners/jobs.ts +++ b/server/core/controllers/api/runners/jobs.ts @@ -5,6 +5,7 @@ import { HttpStatusCode, ListRunnerJobsQuery, LiveRTMPHLSTranscodingUpdatePayload, + RequestRunnerJobBody, RequestRunnerJobResult, RunnerJobState, RunnerJobSuccessBody, @@ -158,7 +159,8 @@ export { async function requestRunnerJob (req: express.Request, res: express.Response) { const runner = res.locals.runner - const availableJobs = await RunnerJobModel.listAvailableJobs() + const body = req.body as RequestRunnerJobBody + const availableJobs = await RunnerJobModel.listAvailableJobs(body.jobTypes) logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) }) diff --git a/server/core/middlewares/validators/runners/runners.ts b/server/core/middlewares/validators/runners/runners.ts index 80525c609..c58009a5a 100644 --- a/server/core/middlewares/validators/runners/runners.ts +++ b/server/core/middlewares/validators/runners/runners.ts @@ -74,6 +74,7 @@ const deleteRunnerValidator = [ const getRunnerFromTokenValidator = [ body('runnerToken').custom(isRunnerTokenValid), + body('jobTypes').optional().isArray(), async (req: express.Request, res: express.Response, next: express.NextFunction) => { if (areValidationErrors(req, res, { tags })) return diff --git a/server/core/models/runner/runner-job.ts b/server/core/models/runner/runner-job.ts index e4d4c09f7..5fc421312 100644 --- a/server/core/models/runner/runner-job.ts +++ b/server/core/models/runner/runner-job.ts @@ -185,16 +185,24 @@ export class RunnerJobModel extends SequelizeModel { return RunnerJobModel.findOne(query) } - static listAvailableJobs () { - const query = { + static listAvailableJobs (jobTypes?: string[]) { + const jobTypesWhere = jobTypes + ? { + type: { + [Op.in]: jobTypes + } + } + : {} + + return RunnerJobModel.findAll({ limit: 10, order: getSort('priority'), where: { - state: RunnerJobState.PENDING - } - } + state: RunnerJobState.PENDING, - return RunnerJobModel.findAll(query) + ...jobTypesWhere + } + }) } static listStalledJobs (options: { diff --git a/support/doc/api/openapi.yaml b/support/doc/api/openapi.yaml index 86006bb53..8e1597079 100644 --- a/support/doc/api/openapi.yaml +++ b/support/doc/api/openapi.yaml @@ -6422,6 +6422,11 @@ paths: properties: runnerToken: type: string + jobTypes: + type: array + description: Filter jobs depending on their types + items: + type: string required: - runnerToken responses: