Prevent stalled jobs

pull/5870/head
Chocobozzz 2023-06-22 15:25:39 +02:00
parent bc3918b2ae
commit d68b88bac4
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
3 changed files with 126 additions and 72 deletions

View File

@ -35,49 +35,48 @@ export async function downloadInputFile (options: {
return destination return destination
} }
export async function updateTranscodingProgress (options: { export function scheduleTranscodingProgress (options: {
server: PeerTubeServer server: PeerTubeServer
runnerToken: string runnerToken: string
job: JobWithToken job: JobWithToken
progress: number progressGetter: () => number
}) { }) {
const { server, job, runnerToken, progress } = options const { job, server, progressGetter, runnerToken } = options
return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress })
}
// ---------------------------------------------------------------------------
export function buildFFmpegVOD (options: {
server: PeerTubeServer
runnerToken: string
job: JobWithToken
}) {
const { server, job, runnerToken } = options
const updateInterval = ConfigManager.Instance.isTestInstance() const updateInterval = ConfigManager.Instance.isTestInstance()
? 500 ? 500
: 60000 : 60000
let progress: number const update = () => {
server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() })
.catch(err => logger.error({ err }, 'Cannot send job progress'))
}
const interval = setInterval(() => { const interval = setInterval(() => {
updateTranscodingProgress({ server, job, runnerToken, progress }) update()
.catch(err => logger.error({ err }, 'Cannot send job progress'))
}, updateInterval) }, updateInterval)
update()
return interval
}
// ---------------------------------------------------------------------------
export function buildFFmpegVOD (options: {
onJobProgress: (progress: number) => void
}) {
const { onJobProgress } = options
return new FFmpegVOD({ return new FFmpegVOD({
...getCommonFFmpegOptions(), ...getCommonFFmpegOptions(),
onError: () => clearInterval(interval),
onEnd: () => clearInterval(interval),
updateJobProgress: arg => { updateJobProgress: arg => {
if (arg < 0 || arg > 100) { const progress = arg < 0 || arg > 100
progress = undefined ? undefined
} else { : arg
progress = arg
} onJobProgress(progress)
} }
}) })
} }

View File

@ -5,30 +5,42 @@ import { join } from 'path'
import { buildUUID } from '@shared/extra-utils' import { buildUUID } from '@shared/extra-utils'
import { import {
RunnerJobStudioTranscodingPayload, RunnerJobStudioTranscodingPayload,
VideoStudioTranscodingSuccess,
VideoStudioTask, VideoStudioTask,
VideoStudioTaskCutPayload, VideoStudioTaskCutPayload,
VideoStudioTaskIntroPayload, VideoStudioTaskIntroPayload,
VideoStudioTaskOutroPayload, VideoStudioTaskOutroPayload,
VideoStudioTaskPayload, VideoStudioTaskPayload,
VideoStudioTaskWatermarkPayload VideoStudioTaskWatermarkPayload,
VideoStudioTranscodingSuccess
} from '@shared/models' } from '@shared/models'
import { ConfigManager } from '../../../shared/config-manager' import { ConfigManager } from '../../../shared/config-manager'
import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common' import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common'
export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) { export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) {
const { server, job, runnerToken } = options const { server, job, runnerToken } = options
const payload = job.payload const payload = job.payload
let inputPath: string
let outputPath: string
let tmpInputFilePath: string
let tasksProgress = 0
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => tasksProgress
})
try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`)
let outputPath: string inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) tmpInputFilePath = inputPath
let tmpInputFilePath = inputPath
logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`)
try {
for (const task of payload.tasks) { for (const task of payload.tasks) {
const outputFilename = 'output-edition-' + buildUUID() + '.mp4' const outputFilename = 'output-edition-' + buildUUID() + '.mp4'
outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename)
@ -45,6 +57,8 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
// For the next iteration // For the next iteration
tmpInputFilePath = outputPath tmpInputFilePath = outputPath
tasksProgress += Math.floor(100 / payload.tasks.length)
} }
const successBody: VideoStudioTranscodingSuccess = { const successBody: VideoStudioTranscodingSuccess = {
@ -58,8 +72,9 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
payload: successBody payload: successBody
}) })
} finally { } finally {
await remove(tmpInputFilePath) if (tmpInputFilePath) await remove(tmpInputFilePath)
await remove(outputPath) if (outputPath) await remove(outputPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
} }
} }

View File

@ -11,23 +11,36 @@ import {
VODWebVideoTranscodingSuccess VODWebVideoTranscodingSuccess
} from '@shared/models' } from '@shared/models'
import { ConfigManager } from '../../../shared/config-manager' import { ConfigManager } from '../../../shared/config-manager'
import { buildFFmpegVOD, downloadInputFile, ProcessOptions } from './common' import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common'
export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) {
const { server, job, runnerToken } = options const { server, job, runnerToken } = options
const payload = job.payload const payload = job.payload
logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) let ffmpegProgress: number
let inputPath: string
const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`)
const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken })
const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => ffmpegProgress
})
try { try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`)
inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`)
const ffmpegVod = buildFFmpegVOD({
onJobProgress: progress => { ffmpegProgress = progress }
})
await ffmpegVod.transcode({ await ffmpegVod.transcode({
type: 'video', type: 'video',
@ -52,8 +65,9 @@ export async function processWebVideoTranscoding (options: ProcessOptions<Runner
payload: successBody payload: successBody
}) })
} finally { } finally {
await remove(inputPath) if (inputPath) await remove(inputPath)
await remove(outputPath) if (outputPath) await remove(outputPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
} }
} }
@ -61,21 +75,32 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO
const { server, job, runnerToken } = options const { server, job, runnerToken } = options
const payload = job.payload const payload = job.payload
logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`) let ffmpegProgress: number
let inputPath: string
const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`)
const uuid = buildUUID() const uuid = buildUUID()
const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`)
const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4`
const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename))
const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => ffmpegProgress
})
try { try {
logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`)
inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`)
const ffmpegVod = buildFFmpegVOD({
onJobProgress: progress => { ffmpegProgress = progress }
})
await ffmpegVod.transcode({ await ffmpegVod.transcode({
type: 'hls', type: 'hls',
copyCodecs: false, copyCodecs: false,
@ -101,9 +126,10 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO
payload: successBody payload: successBody
}) })
} finally { } finally {
await remove(inputPath) if (inputPath) await remove(inputPath)
await remove(outputPath) if (outputPath) await remove(outputPath)
await remove(videoPath) if (videoPath) await remove(videoPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
} }
} }
@ -111,24 +137,37 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn
const { server, job, runnerToken } = options const { server, job, runnerToken } = options
const payload = job.payload const payload = job.payload
let ffmpegProgress: number
let audioPath: string
let inputPath: string
const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
const updateProgressInterval = scheduleTranscodingProgress({
job,
server,
runnerToken,
progressGetter: () => ffmpegProgress
})
try {
logger.info( logger.info(
`Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
`for audio merge transcoding job ${job.jobToken}` `for audio merge transcoding job ${job.jobToken}`
) )
const audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job })
const inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job })
logger.info( logger.info(
`Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
`for job ${job.jobToken}. Running audio merge transcoding.` `for job ${job.jobToken}. Running audio merge transcoding.`
) )
const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) const ffmpegVod = buildFFmpegVOD({
onJobProgress: progress => { ffmpegProgress = progress }
})
const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken })
try {
await ffmpegVod.transcode({ await ffmpegVod.transcode({
type: 'merge-audio', type: 'merge-audio',
@ -154,8 +193,9 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn
payload: successBody payload: successBody
}) })
} finally { } finally {
await remove(audioPath) if (audioPath) await remove(audioPath)
await remove(inputPath) if (inputPath) await remove(inputPath)
await remove(outputPath) if (outputPath) await remove(outputPath)
if (updateProgressInterval) clearInterval(updateProgressInterval)
} }
} }