mirror of https://github.com/Chocobozzz/PeerTube
				
				
				
			
		
			
				
	
	
		
			287 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			TypeScript
		
	
	
			
		
		
	
	
			287 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			TypeScript
		
	
	
| import { omit, pick, wait } from '@shared/core-utils'
 | |
| import {
 | |
|   AbortRunnerJobBody,
 | |
|   AcceptRunnerJobBody,
 | |
|   AcceptRunnerJobResult,
 | |
|   ErrorRunnerJobBody,
 | |
|   HttpStatusCode,
 | |
|   isHLSTranscodingPayloadSuccess,
 | |
|   isLiveRTMPHLSTranscodingUpdatePayload,
 | |
|   isWebVideoOrAudioMergeTranscodingPayloadSuccess,
 | |
|   RequestRunnerJobBody,
 | |
|   RequestRunnerJobResult,
 | |
|   ResultList,
 | |
|   RunnerJobAdmin,
 | |
|   RunnerJobLiveRTMPHLSTranscodingPayload,
 | |
|   RunnerJobPayload,
 | |
|   RunnerJobState,
 | |
|   RunnerJobSuccessBody,
 | |
|   RunnerJobSuccessPayload,
 | |
|   RunnerJobType,
 | |
|   RunnerJobUpdateBody,
 | |
|   RunnerJobVODPayload
 | |
| } from '@shared/models'
 | |
| import { unwrapBody } from '../requests'
 | |
| import { waitJobs } from '../server'
 | |
| import { AbstractCommand, OverrideCommandOptions } from '../shared'
 | |
| 
 | |
| export class RunnerJobsCommand extends AbstractCommand {
 | |
| 
 | |
|   list (options: OverrideCommandOptions & {
 | |
|     start?: number
 | |
|     count?: number
 | |
|     sort?: string
 | |
|     search?: string
 | |
|   } = {}) {
 | |
|     const path = '/api/v1/runners/jobs'
 | |
| 
 | |
|     return this.getRequestBody<ResultList<RunnerJobAdmin>>({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       query: pick(options, [ 'start', 'count', 'sort', 'search' ]),
 | |
|       implicitToken: true,
 | |
|       defaultExpectedStatus: HttpStatusCode.OK_200
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) {
 | |
|     const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel'
 | |
| 
 | |
|     return this.postBodyRequest({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       implicitToken: true,
 | |
|       defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   // ---------------------------------------------------------------------------
 | |
| 
 | |
|   request (options: OverrideCommandOptions & RequestRunnerJobBody) {
 | |
|     const path = '/api/v1/runners/jobs/request'
 | |
| 
 | |
|     return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       fields: pick(options, [ 'runnerToken' ]),
 | |
|       implicitToken: false,
 | |
|       defaultExpectedStatus: HttpStatusCode.OK_200
 | |
|     }))
 | |
|   }
 | |
| 
 | |
|   async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
 | |
|     const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ])
 | |
| 
 | |
|     const { availableJobs } = await this.request(options)
 | |
| 
 | |
|     return {
 | |
|       availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
 | |
|     } as RequestRunnerJobResult<RunnerJobVODPayload>
 | |
|   }
 | |
| 
 | |
|   async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
 | |
|     const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ])
 | |
| 
 | |
|     const { availableJobs } = await this.request(options)
 | |
| 
 | |
|     return {
 | |
|       availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
 | |
|     } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
 | |
|   }
 | |
| 
 | |
|   // ---------------------------------------------------------------------------
 | |
| 
 | |
|   accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
 | |
|     const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept'
 | |
| 
 | |
|     return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       fields: pick(options, [ 'runnerToken' ]),
 | |
|       implicitToken: false,
 | |
|       defaultExpectedStatus: HttpStatusCode.OK_200
 | |
|     }))
 | |
|   }
 | |
| 
 | |
|   abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) {
 | |
|     const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort'
 | |
| 
 | |
|     return this.postBodyRequest({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]),
 | |
|       implicitToken: false,
 | |
|       defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) {
 | |
|     const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'
 | |
| 
 | |
|     const { payload } = options
 | |
|     const attaches: { [id: string]: any } = {}
 | |
|     let payloadWithoutFiles = payload
 | |
| 
 | |
|     if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
 | |
|       if (payload.masterPlaylistFile) {
 | |
|         attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
 | |
|       }
 | |
| 
 | |
|       attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
 | |
|       attaches[`payload[videoChunkFile]`] = payload.videoChunkFile
 | |
| 
 | |
|       payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
 | |
|     }
 | |
| 
 | |
|     return this.postUploadRequest({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       fields: {
 | |
|         ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),
 | |
| 
 | |
|         payload: payloadWithoutFiles
 | |
|       },
 | |
|       attaches,
 | |
|       implicitToken: false,
 | |
|       defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
 | |
|     const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'
 | |
| 
 | |
|     return this.postBodyRequest({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]),
 | |
|       implicitToken: false,
 | |
|       defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) {
 | |
|     const { payload } = options
 | |
| 
 | |
|     const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
 | |
|     const attaches: { [id: string]: any } = {}
 | |
|     let payloadWithoutFiles = payload
 | |
| 
 | |
|     if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
 | |
|       attaches[`payload[videoFile]`] = payload.videoFile
 | |
| 
 | |
|       payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'videoFile' ])
 | |
|     }
 | |
| 
 | |
|     if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
 | |
|       attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
 | |
| 
 | |
|       payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'resolutionPlaylistFile' ])
 | |
|     }
 | |
| 
 | |
|     return this.postUploadRequest({
 | |
|       ...options,
 | |
| 
 | |
|       path,
 | |
|       attaches,
 | |
|       fields: {
 | |
|         ...pick(options, [ 'jobToken', 'runnerToken' ]),
 | |
| 
 | |
|         payload: payloadWithoutFiles
 | |
|       },
 | |
|       implicitToken: false,
 | |
|       defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
 | |
|     const { host, protocol, pathname } = new URL(options.url)
 | |
| 
 | |
|     return this.postBodyRequest({
 | |
|       url: `${protocol}//${host}`,
 | |
|       path: pathname,
 | |
| 
 | |
|       fields: pick(options, [ 'jobToken', 'runnerToken' ]),
 | |
|       implicitToken: false,
 | |
|       defaultExpectedStatus: HttpStatusCode.OK_200
 | |
|     })
 | |
|   }
 | |
| 
 | |
|   // ---------------------------------------------------------------------------
 | |
| 
 | |
|   async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) {
 | |
|     const { availableJobs } = await this.request(options)
 | |
| 
 | |
|     const job = options.type
 | |
|       ? availableJobs.find(j => j.type === options.type)
 | |
|       : availableJobs[0]
 | |
| 
 | |
|     return this.accept({ ...options, jobUUID: job.uuid })
 | |
|   }
 | |
| 
 | |
|   async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) {
 | |
|     let jobUUID = jobUUIDToProcess
 | |
| 
 | |
|     if (!jobUUID) {
 | |
|       const { availableJobs } = await this.request({ runnerToken })
 | |
|       jobUUID = availableJobs[0].uuid
 | |
|     }
 | |
| 
 | |
|     const { job } = await this.accept({ runnerToken, jobUUID })
 | |
|     const jobToken = job.jobToken
 | |
| 
 | |
|     const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
 | |
|     await this.success({ runnerToken, jobUUID, jobToken, payload })
 | |
| 
 | |
|     await waitJobs([ this.server ])
 | |
| 
 | |
|     return job
 | |
|   }
 | |
| 
 | |
|   async cancelAllJobs (options: { state?: RunnerJobState } = {}) {
 | |
|     const { state } = options
 | |
| 
 | |
|     const { data } = await this.list({ count: 100 })
 | |
| 
 | |
|     const allowedStates = new Set<RunnerJobState>([
 | |
|       RunnerJobState.PENDING,
 | |
|       RunnerJobState.PROCESSING,
 | |
|       RunnerJobState.WAITING_FOR_PARENT_JOB
 | |
|     ])
 | |
| 
 | |
|     for (const job of data) {
 | |
|       if (state && job.state.id !== state) continue
 | |
|       else if (allowedStates.has(job.state.id) !== true) continue
 | |
| 
 | |
|       await this.cancelByAdmin({ jobUUID: job.uuid })
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   async getJob (options: OverrideCommandOptions & { uuid: string }) {
 | |
|     const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' })
 | |
| 
 | |
|     return data.find(j => j.uuid === options.uuid)
 | |
|   }
 | |
| 
 | |
|   async requestLiveJob (runnerToken: string) {
 | |
|     let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = []
 | |
| 
 | |
|     while (availableJobs.length === 0) {
 | |
|       const result = await this.requestLive({ runnerToken })
 | |
|       availableJobs = result.availableJobs
 | |
| 
 | |
|       if (availableJobs.length === 1) break
 | |
| 
 | |
|       await wait(150)
 | |
|     }
 | |
| 
 | |
|     return availableJobs[0]
 | |
|   }
 | |
| }
 |