diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts index dbeb9dfc1..a9b37bbc4 100644 --- a/packages/peertube-runner/server/process/shared/common.ts +++ b/packages/peertube-runner/server/process/shared/common.ts @@ -35,49 +35,48 @@ export async function downloadInputFile (options: { return destination } -export async function updateTranscodingProgress (options: { +export function scheduleTranscodingProgress (options: { server: PeerTubeServer runnerToken: string job: JobWithToken - progress: number + progressGetter: () => number }) { - const { server, job, runnerToken, progress } = options - - return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) -} - -// --------------------------------------------------------------------------- - -export function buildFFmpegVOD (options: { - server: PeerTubeServer - runnerToken: string - job: JobWithToken -}) { - const { server, job, runnerToken } = options + const { job, server, progressGetter, runnerToken } = options const updateInterval = ConfigManager.Instance.isTestInstance() ? 500 : 60000 - let progress: number + const update = () => { + server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() }) + .catch(err => logger.error({ err }, 'Cannot send job progress')) + } const interval = setInterval(() => { - updateTranscodingProgress({ server, job, runnerToken, progress }) - .catch(err => logger.error({ err }, 'Cannot send job progress')) + update() }, updateInterval) + update() + + return interval +} + +// --------------------------------------------------------------------------- + +export function buildFFmpegVOD (options: { + onJobProgress: (progress: number) => void +}) { + const { onJobProgress } = options + return new FFmpegVOD({ ...getCommonFFmpegOptions(), - onError: () => clearInterval(interval), - onEnd: () => clearInterval(interval), - updateJobProgress: arg => { - if (arg < 0 || arg > 100) { - progress = undefined - } else { - progress = arg - } + const progress = arg < 0 || arg > 100 + ? undefined + : arg + + onJobProgress(progress) } }) } diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts index ce014495e..afd9347fe 100644 --- a/packages/peertube-runner/server/process/shared/process-studio.ts +++ b/packages/peertube-runner/server/process/shared/process-studio.ts @@ -5,30 +5,42 @@ import { join } from 'path' import { buildUUID } from '@shared/extra-utils' import { RunnerJobStudioTranscodingPayload, - VideoStudioTranscodingSuccess, VideoStudioTask, VideoStudioTaskCutPayload, VideoStudioTaskIntroPayload, VideoStudioTaskOutroPayload, VideoStudioTaskPayload, - VideoStudioTaskWatermarkPayload + VideoStudioTaskWatermarkPayload, + VideoStudioTranscodingSuccess } from '@shared/models' import { ConfigManager } from '../../../shared/config-manager' -import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common' +import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common' export async function processStudioTranscoding (options: ProcessOptions) { const { server, job, runnerToken } = options const payload = job.payload - logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) - + let inputPath: string let outputPath: string - const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) - let tmpInputFilePath = inputPath + let tmpInputFilePath: string - logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) + let tasksProgress = 0 + + const updateProgressInterval = scheduleTranscodingProgress({ + job, + server, + runnerToken, + progressGetter: () => tasksProgress + }) try { + logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) + + inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) + tmpInputFilePath = inputPath + + logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) + for (const task of payload.tasks) { const outputFilename = 'output-edition-' + buildUUID() + '.mp4' outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) @@ -45,6 +57,8 @@ export async function processStudioTranscoding (options: ProcessOptions) { const { server, job, runnerToken } = options + const payload = job.payload - logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) - - const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) - - logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`) - - const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) + let ffmpegProgress: number + let inputPath: string const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) + const updateProgressInterval = scheduleTranscodingProgress({ + job, + server, + runnerToken, + progressGetter: () => ffmpegProgress + }) + try { + logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) + + inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) + + logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`) + + const ffmpegVod = buildFFmpegVOD({ + onJobProgress: progress => { ffmpegProgress = progress } + }) + await ffmpegVod.transcode({ type: 'video', @@ -52,8 +65,9 @@ export async function processWebVideoTranscoding (options: ProcessOptions ffmpegProgress + }) try { + logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`) + + inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) + + logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`) + + const ffmpegVod = buildFFmpegVOD({ + onJobProgress: progress => { ffmpegProgress = progress } + }) + await ffmpegVod.transcode({ type: 'hls', copyCodecs: false, @@ -101,9 +126,10 @@ export async function processHLSTranscoding (options: ProcessOptions ffmpegProgress + }) try { + logger.info( + `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + + `for audio merge transcoding job ${job.jobToken}` + ) + + audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) + inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) + + logger.info( + `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + + `for job ${job.jobToken}. Running audio merge transcoding.` + ) + + const ffmpegVod = buildFFmpegVOD({ + onJobProgress: progress => { ffmpegProgress = progress } + }) + await ffmpegVod.transcode({ type: 'merge-audio', @@ -154,8 +193,9 @@ export async function processAudioMergeTranscoding (options: ProcessOptions