Introduce jobs command

pull/4271/head
Chocobozzz 2021-07-07 09:34:56 +02:00
parent c3d29f694b
commit 9c6327f803
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
8 changed files with 75 additions and 98 deletions

View File

@ -10,7 +10,6 @@ import {
closeAllSequelize,
completeVideoCheck,
flushAndRunMultipleServers,
getJobsListPaginationAndSort,
getVideo,
getVideoCommentThreads,
getVideosList,
@ -181,15 +180,13 @@ describe('Test handle downs', function () {
const states: JobState[] = [ 'waiting', 'active' ]
for (const state of states) {
const res = await getJobsListPaginationAndSort({
url: servers[0].url,
accessToken: servers[0].accessToken,
const body = await servers[0].jobsCommand.getJobsList({
state: state,
start: 0,
count: 50,
sort: '-createdAt'
})
expect(res.body.data).to.have.length(0)
expect(body.data).to.have.length(0)
}
})

View File

@ -2,13 +2,16 @@
import 'mocha'
import * as chai from 'chai'
import { cleanupTests, ServerInfo, setAccessTokensToServers } from '../../../../shared/extra-utils/index'
import { dateIsValid } from '../../../../shared/extra-utils/miscs/miscs'
import { doubleFollow } from '../../../../shared/extra-utils/server/follows'
import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../../../shared/extra-utils/server/jobs'
import { flushAndRunMultipleServers } from '../../../../shared/extra-utils/server/servers'
import { uploadVideo } from '../../../../shared/extra-utils/videos/videos'
import { Job } from '../../../../shared/models/server'
import {
cleanupTests,
dateIsValid,
doubleFollow,
flushAndRunMultipleServers,
ServerInfo,
setAccessTokensToServers,
uploadVideo,
waitJobs
} from '@shared/extra-utils'
const expect = chai.expect
@ -36,27 +39,25 @@ describe('Test jobs', function () {
})
it('Should list jobs', async function () {
const res = await getJobsList(servers[1].url, servers[1].accessToken, 'completed')
expect(res.body.total).to.be.above(2)
expect(res.body.data).to.have.length.above(2)
const body = await servers[1].jobsCommand.getJobsList({ state: 'completed' })
expect(body.total).to.be.above(2)
expect(body.data).to.have.length.above(2)
})
it('Should list jobs with sort, pagination and job type', async function () {
{
const res = await getJobsListPaginationAndSort({
url: servers[1].url,
accessToken: servers[1].accessToken,
const body = await servers[1].jobsCommand.getJobsList({
state: 'completed',
start: 1,
count: 2,
sort: 'createdAt'
})
expect(res.body.total).to.be.above(2)
expect(res.body.data).to.have.lengthOf(2)
expect(body.total).to.be.above(2)
expect(body.data).to.have.lengthOf(2)
let job: Job = res.body.data[0]
let job = body.data[0]
// Skip repeat jobs
if (job.type === 'videos-views') job = res.body.data[1]
if (job.type === 'videos-views') job = body.data[1]
expect(job.state).to.equal('completed')
expect(job.type.startsWith('activitypub-')).to.be.true
@ -66,29 +67,26 @@ describe('Test jobs', function () {
}
{
const res = await getJobsListPaginationAndSort({
url: servers[1].url,
accessToken: servers[1].accessToken,
const body = await servers[1].jobsCommand.getJobsList({
state: 'completed',
start: 0,
count: 100,
sort: 'createdAt',
jobType: 'activitypub-http-broadcast'
})
expect(res.body.total).to.be.above(2)
expect(body.total).to.be.above(2)
for (const j of res.body.data as Job[]) {
for (const j of body.data) {
expect(j.type).to.equal('activitypub-http-broadcast')
}
}
})
it('Should list all jobs', async function () {
const res = await getJobsList(servers[1].url, servers[1].accessToken)
const body = await servers[1].jobsCommand.getJobsList()
expect(body.total).to.be.above(2)
const jobs = res.body.data as Job[]
expect(res.body.total).to.be.above(2)
const jobs = body.data
expect(jobs).to.have.length.above(2)
// We know there are a least 1 delayed job (video views) and 1 completed job (broadcast)

View File

@ -16,7 +16,6 @@ import {
flushAndRunMultipleServers,
generateHighBitrateVideo,
generateVideoWithFramerate,
getJobsListPaginationAndSort,
getMyVideos,
getServerFileSize,
getVideo,
@ -709,17 +708,14 @@ describe('Test video transcoding', function () {
describe('Transcoding job queue', function () {
it('Should have the appropriate priorities for transcoding jobs', async function () {
const res = await getJobsListPaginationAndSort({
url: servers[1].url,
accessToken: servers[1].accessToken,
const body = await servers[1].jobsCommand.getJobsList({
start: 0,
count: 100,
sort: '-createdAt',
jobType: 'video-transcoding'
})
const jobs = res.body.data as Job[]
const jobs = body.data
const transcodingJobs = jobs.filter(j => j.data.videoUUID === video4k)
expect(transcodingJobs).to.have.lengthOf(14)

View File

@ -15,7 +15,6 @@ export * from './requests/requests'
export * from './server/clients'
export * from './server/config'
export * from './server/jobs'
export * from './server/plugins'
export * from './server/servers'

View File

@ -2,3 +2,5 @@ export * from './contact-form-command'
export * from './debug-command'
export * from './follows-command'
export * from './follows'
export * from './jobs'
export * from './jobs-command'

View File

@ -0,0 +1,35 @@
import { pick } from 'lodash'
import { HttpStatusCode } from '../../core-utils/miscs/http-error-codes'
import { Job, JobState, JobType, ResultList } from '../../models'
import { AbstractCommand, OverrideCommandOptions } from '../shared'
export class JobsCommand extends AbstractCommand {
getJobsList (options: OverrideCommandOptions & {
state?: JobState
jobType?: JobType
start?: number
count?: number
sort?: string
} = {}) {
const path = this.buildJobsUrl(options.state)
const query = pick(options, [ 'start', 'count', 'sort', 'jobType' ])
return this.getRequestBody<ResultList<Job>>({
...options,
path,
query,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
private buildJobsUrl (state?: JobState) {
let path = '/api/v1/jobs'
if (state) path += '/' + state
return path
}
}

View File

@ -1,57 +1,8 @@
import * as request from 'supertest'
import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
import { makeGetRequest } from '../../../shared/extra-utils'
import { Job, JobState, JobType } from '../../models'
import { JobState } from '../../models'
import { wait } from '../miscs/miscs'
import { ServerInfo } from './servers'
function buildJobsUrl (state?: JobState) {
let path = '/api/v1/jobs'
if (state) path += '/' + state
return path
}
function getJobsList (url: string, accessToken: string, state?: JobState) {
const path = buildJobsUrl(state)
return request(url)
.get(path)
.set('Accept', 'application/json')
.set('Authorization', 'Bearer ' + accessToken)
.expect(HttpStatusCode.OK_200)
.expect('Content-Type', /json/)
}
function getJobsListPaginationAndSort (options: {
url: string
accessToken: string
start: number
count: number
sort: string
state?: JobState
jobType?: JobType
}) {
const { url, accessToken, state, start, count, sort, jobType } = options
const path = buildJobsUrl(state)
const query = {
start,
count,
sort,
jobType
}
return makeGetRequest({
url,
path,
token: accessToken,
statusCodeExpected: HttpStatusCode.OK_200,
query
})
}
async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT
? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10)
@ -72,15 +23,13 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
// Check if each server has pending request
for (const server of servers) {
for (const state of states) {
const p = getJobsListPaginationAndSort({
url: server.url,
accessToken: server.accessToken,
state: state,
const p = server.jobsCommand.getJobsList({
state,
start: 0,
count: 10,
sort: '-createdAt'
}).then(res => res.body.data)
.then((jobs: Job[]) => jobs.filter(j => !repeatableJobs.includes(j.type)))
}).then(body => body.data)
.then(jobs => jobs.filter(j => !repeatableJobs.includes(j.type)))
.then(jobs => {
if (jobs.length !== 0) {
pendingRequests = true
@ -122,7 +71,5 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
// ---------------------------------------------------------------------------
export {
getJobsList,
waitJobs,
getJobsListPaginationAndSort
waitJobs
}

View File

@ -19,6 +19,7 @@ import { SearchCommand } from '../search'
import { ContactFormCommand } from './contact-form-command'
import { DebugCommand } from './debug-command'
import { FollowsCommand } from './follows-command'
import { JobsCommand } from './jobs-command'
interface ServerInfo {
app: ChildProcess
@ -83,6 +84,7 @@ interface ServerInfo {
contactFormCommand?: ContactFormCommand
debugCommand?: DebugCommand
followsCommand?: FollowsCommand
jobsCommand?: JobsCommand
}
function parallelTests () {
@ -299,6 +301,7 @@ async function runServer (server: ServerInfo, configOverrideArg?: any, args = []
server.contactFormCommand = new ContactFormCommand(server)
server.debugCommand = new DebugCommand(server)
server.followsCommand = new FollowsCommand(server)
server.jobsCommand = new JobsCommand(server)
res(server)
})