Add ability to filter requested runner jobs

pull/6711/head
Chocobozzz 2024-10-31 10:22:35 +01:00
parent e0f39d7995
commit a91bd80087
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
9 changed files with 65 additions and 19 deletions

View File

@ -237,8 +237,15 @@ export class RunnerServer {
private async requestJob (server: PeerTubeServer) {
logger.debug(`Requesting jobs on ${server.url} for runner ${server.runnerName}`)
const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
const { availableJobs } = await server.runnerJobs.request({
runnerToken: server.runnerToken,
jobTypes: this.enabledJobs.size !== getSupportedJobsList().length
? Array.from(this.enabledJobs)
: undefined
})
// FIXME: remove in PeerTube v8: jobTypes has been introduced in PeerTube v7, so do the filter here too
const filtered = availableJobs.filter(j => isJobSupported(j, this.enabledJobs))
if (filtered.length === 0) {

View File

@ -1,3 +1,6 @@
import { RunnerJobType } from './runner-job-type.type.js'
export interface RequestRunnerJobBody {
runnerToken: string
jobTypes?: RunnerJobType[]
}

View File

@ -79,30 +79,30 @@ export class RunnerJobsCommand extends AbstractCommand {
...options,
path,
fields: pick(options, [ 'runnerToken' ]),
fields: pick(options, [ 'runnerToken', 'jobTypes' ]),
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
}))
}
async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ])
const { availableJobs } = await this.request({
...options,
const { availableJobs } = await this.request(options)
jobTypes: [ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ]
})
return {
availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
} as RequestRunnerJobResult<RunnerJobVODPayload>
return { availableJobs } as RequestRunnerJobResult<RunnerJobVODPayload>
}
async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ])
const { availableJobs } = await this.request({
...options,
const { availableJobs } = await this.request(options)
jobTypes: [ 'live-rtmp-hls-transcoding' ]
})
return {
availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
} as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
return { availableJobs } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
}
// ---------------------------------------------------------------------------

View File

@ -624,6 +624,14 @@ describe('Test managing runners', function () {
it('Should fail with an unknown runner token', async function () {
await server.runnerJobs.request({ runnerToken: badUUID, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
})
it('Should fail with a bad jobTypes token', async function () {
await server.runnerJobs.request({ runnerToken, jobTypes: 'toto' as any, expectedStatus: HttpStatusCode.BAD_REQUEST_400 })
})
it('Should succeed with the correct params', async function () {
await server.runnerJobs.request({ runnerToken, jobTypes: [] })
})
})
describe('Accept', function () {

View File

@ -395,6 +395,18 @@ describe('Test runner common actions', function () {
jobUUID = webVideoJobs[0].uuid
})
it('Should filter requested jobs', async function () {
{
const { availableJobs } = await server.runnerJobs.request({ runnerToken, jobTypes: [ 'vod-web-video-transcoding' ] })
expect(availableJobs).to.have.lengthOf(2)
}
{
const { availableJobs } = await server.runnerJobs.request({ runnerToken, jobTypes: [ 'vod-hls-transcoding' ] })
expect(availableJobs).to.have.lengthOf(0)
}
})
it('Should have sorted available jobs by priority', async function () {
const { availableJobs } = await server.runnerJobs.request({ runnerToken })

View File

@ -5,6 +5,7 @@ import {
HttpStatusCode,
ListRunnerJobsQuery,
LiveRTMPHLSTranscodingUpdatePayload,
RequestRunnerJobBody,
RequestRunnerJobResult,
RunnerJobState,
RunnerJobSuccessBody,
@ -158,7 +159,8 @@ export {
async function requestRunnerJob (req: express.Request, res: express.Response) {
const runner = res.locals.runner
const availableJobs = await RunnerJobModel.listAvailableJobs()
const body = req.body as RequestRunnerJobBody
const availableJobs = await RunnerJobModel.listAvailableJobs(body.jobTypes)
logger.debug('Runner %s requests for a job.', runner.name, { availableJobs, ...lTags(runner.name) })

View File

@ -74,6 +74,7 @@ const deleteRunnerValidator = [
const getRunnerFromTokenValidator = [
body('runnerToken').custom(isRunnerTokenValid),
body('jobTypes').optional().isArray(),
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
if (areValidationErrors(req, res, { tags })) return

View File

@ -185,16 +185,24 @@ export class RunnerJobModel extends SequelizeModel<RunnerJobModel> {
return RunnerJobModel.findOne<MRunnerJobRunner>(query)
}
static listAvailableJobs () {
const query = {
static listAvailableJobs (jobTypes?: string[]) {
const jobTypesWhere = jobTypes
? {
type: {
[Op.in]: jobTypes
}
}
: {}
return RunnerJobModel.findAll<MRunnerJob>({
limit: 10,
order: getSort('priority'),
where: {
state: RunnerJobState.PENDING
}
}
state: RunnerJobState.PENDING,
return RunnerJobModel.findAll<MRunnerJob>(query)
...jobTypesWhere
}
})
}
static listStalledJobs (options: {

View File

@ -6422,6 +6422,11 @@ paths:
properties:
runnerToken:
type: string
jobTypes:
type: array
description: Filter jobs depending on their types
items:
type: string
required:
- runnerToken
responses: