Avoid concurrency issue on transcoding

pull/5190/head
Chocobozzz 2022-08-09 09:09:31 +02:00
parent bd911b54b5
commit b42c2c7e89
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
8 changed files with 120 additions and 54 deletions

View File

@ -2,7 +2,7 @@ import { program } from 'commander'
import { isUUIDValid, toCompleteUUID } from '@server/helpers/custom-validators/misc'
import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
import { CONFIG } from '@server/initializers/config'
import { addTranscodingJob } from '@server/lib/video'
import { buildTranscodingJob } from '@server/lib/video'
import { VideoState, VideoTranscodingPayload } from '@shared/models'
import { initDatabaseModels } from '../server/initializers/database'
import { JobQueue } from '../server/lib/job-queue'
@ -57,7 +57,7 @@ async function run () {
for (const resolution of resolutionsEnabled) {
dataInput.push({
type: 'new-resolution-to-hls',
type: 'new-resolution-to-hls' as 'new-resolution-to-hls',
videoUUID: video.uuid,
resolution,
@ -72,7 +72,7 @@ async function run () {
} else {
if (options.resolution !== undefined) {
dataInput.push({
type: 'new-resolution-to-webtorrent',
type: 'new-resolution-to-webtorrent' as 'new-resolution-to-webtorrent',
videoUUID: video.uuid,
createHLSIfNeeded: true,
@ -90,7 +90,7 @@ async function run () {
}
dataInput.push({
type: 'optimize-to-webtorrent',
type: 'optimize-to-webtorrent' as 'optimize-to-webtorrent',
videoUUID: video.uuid,
isNewVideo: false
})
@ -103,7 +103,8 @@ async function run () {
await video.save()
for (const d of dataInput) {
await addTranscodingJob(d, {})
await JobQueue.Instance.createJob(await buildTranscodingJob(d))
console.log('Transcoding job for video %s created.', video.uuid)
}
}

View File

@ -1,10 +1,12 @@
import Bluebird from 'bluebird'
import express from 'express'
import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { addTranscodingJob } from '@server/lib/video'
import { JobQueue } from '@server/lib/job-queue'
import { Hooks } from '@server/lib/plugins/hooks'
import { buildTranscodingJob } from '@server/lib/video'
import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models'
import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares'
import { Hooks } from '@server/lib/plugins/hooks'
const lTags = loggerTagsFactory('api', 'video')
const transcodingRouter = express.Router()
@ -44,29 +46,81 @@ async function createTranscoding (req: express.Request, res: express.Response) {
video.state = VideoState.TO_TRANSCODE
await video.save()
for (const resolution of resolutions) {
const hasAudio = !!audioStream
const childrenResolutions = resolutions.filter(r => r !== maxResolution)
const children = await Bluebird.mapSeries(childrenResolutions, resolution => {
if (body.transcodingType === 'hls') {
await addTranscodingJob({
type: 'new-resolution-to-hls',
return buildHLSJobOption({
videoUUID: video.uuid,
hasAudio,
resolution,
hasAudio: !!audioStream,
copyCodecs: false,
isNewVideo: false,
autoDeleteWebTorrentIfNeeded: false,
isMaxQuality: maxResolution === resolution
})
} else if (body.transcodingType === 'webtorrent') {
await addTranscodingJob({
type: 'new-resolution-to-webtorrent',
videoUUID: video.uuid,
isNewVideo: false,
resolution,
hasAudio: !!audioStream,
createHLSIfNeeded: false
isMaxQuality: false
})
}
}
if (body.transcodingType === 'webtorrent') {
return buildWebTorrentJobOption({
videoUUID: video.uuid,
hasAudio,
resolution
})
}
})
const parent = body.transcodingType === 'hls'
? await buildHLSJobOption({
videoUUID: video.uuid,
hasAudio,
resolution: maxResolution,
isMaxQuality: false
})
: await buildWebTorrentJobOption({
videoUUID: video.uuid,
hasAudio,
resolution: maxResolution
})
// Porcess the last resolution after the other ones to prevent concurrency issue
// Because low resolutions use the biggest one as ffmpeg input
await JobQueue.Instance.createJobWithChildren(parent, children)
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
}
function buildHLSJobOption (options: {
videoUUID: string
hasAudio: boolean
resolution: number
isMaxQuality: boolean
}) {
const { videoUUID, hasAudio, resolution, isMaxQuality } = options
return buildTranscodingJob({
type: 'new-resolution-to-hls',
videoUUID,
resolution,
hasAudio,
copyCodecs: false,
isNewVideo: false,
autoDeleteWebTorrentIfNeeded: false,
isMaxQuality
})
}
function buildWebTorrentJobOption (options: {
videoUUID: string
hasAudio: boolean
resolution: number
}) {
const { videoUUID, hasAudio, resolution } = options
return buildTranscodingJob({
type: 'new-resolution-to-webtorrent',
videoUUID,
isNewVideo: false,
resolution,
hasAudio,
createHLSIfNeeded: false
})
}

View File

@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi
return totalItems
}
function createJob (payload: ActivitypubHttpFetcherPayload) {
return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
}
function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
const uri = fetchedVideo.shares
@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean
return crawlCollectionPage<string>(uri, handler, cleaner)
.catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) }))
}
function createJob (payload: ActivitypubHttpFetcherPayload) {
return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
}

View File

@ -1,7 +1,7 @@
import { Job } from 'bullmq'
import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
import { Hooks } from '@server/lib/plugins/hooks'
import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
import { VideoPathManager } from '@server/lib/video-path-manager'
import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
import { UserModel } from '@server/models/user/user'
@ -27,6 +27,7 @@ import {
optimizeOriginalVideofile,
transcodeNewWebTorrentResolution
} from '../../transcoding/transcoding'
import { JobQueue } from '../job-queue'
type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ])
}
await addTranscodingJob(hlsTranscodingPayload, jobOptions)
await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions))
return true
}
@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: {
priority: await getTranscodingJobPriority(user)
}
await addTranscodingJob(dataInput, jobOptions)
await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
}
if (resolutionCreated.length === 0) {

View File

@ -325,10 +325,8 @@ class JobQueue {
if (!job) continue
lastJob = {
name: 'job',
data: job.payload,
queueName: job.type,
opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
...this.buildJobFlowOption(job),
children: lastJob
? [ lastJob ]
: []
@ -338,6 +336,23 @@ class JobQueue {
return this.flowProducer.add(lastJob)
}
async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
return this.flowProducer.add({
...this.buildJobFlowOption(parent),
children: children.map(c => this.buildJobFlowOption(c))
})
}
private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
return {
name: 'job',
data: job.payload,
queueName: job.type,
opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
}
}
private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
return {
backoff: { delay: 60 * 1000, type: 'exponential' },
@ -425,10 +440,6 @@ class JobQueue {
}
}
waitJob (job: Job) {
return job.waitUntilFinished(this.queueEvents[job.queueName])
}
private addRepeatableJobs () {
this.queues['videos-views-stats'].add('job', {}, {
repeat: REPEAT_JOBS['videos-views-stats']

View File

@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
import { FilteredModelAttributes } from '@server/types'
import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
import { CreateJobOptions, JobQueue } from './job-queue/job-queue'
import { CreateJobOptions } from './job-queue/job-queue'
import { updateVideoMiniatureFromExisting } from './thumbnail'
function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: {
}
}
async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options })
return { type: 'video-transcoding' as 'video-transcoding', payload, ...options }
}
async function getTranscodingJobPriority (user: MUserId) {
@ -182,7 +182,7 @@ export {
buildVideoThumbnailsFromReq,
setVideoTags,
buildOptimizeOrMergeAudioJob,
addTranscodingJob,
buildTranscodingJob,
buildMoveToObjectStorageJob,
getTranscodingJobPriority,
getCachedVideoDuration

View File

@ -1592,22 +1592,21 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
}
getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) {
// We first transcode to WebTorrent format, so try this array first
if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) {
const file = fun(this.VideoFiles, file => file.resolution)
const files = this.getAllFiles()
const file = fun(files, file => file.resolution)
if (!file) return undefined
if (file.videoId) {
return Object.assign(file, { Video: this })
}
// No webtorrent files, try with streaming playlist files
if (Array.isArray(this.VideoStreamingPlaylists) && this.VideoStreamingPlaylists.length !== 0) {
if (file.videoStreamingPlaylistId) {
const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this })
const file = fun(streamingPlaylistWithVideo.VideoFiles, file => file.resolution)
return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo })
}
return undefined
throw new Error('File is not associated to a video of a playlist')
}
getMaxQualityFile<T extends MVideoWithFile> (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo {

View File

@ -165,7 +165,7 @@ async function processVideo (parameters: {
const youtubeDLBinary = await YoutubeDLCLI.safeGet()
const output = await youtubeDLBinary.download({
url: videoInfo.url,
format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
output: path,
additionalYoutubeDLArgs: command.args,
processOptions
@ -251,7 +251,7 @@ async function fetchObject (info: any) {
const youtubeDLCLI = await YoutubeDLCLI.safeGet()
const result = await youtubeDLCLI.getInfo({
url,
format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
processOptions
})
@ -336,7 +336,7 @@ function exitError (message: string, ...meta: any[]) {
function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) {
return youtubeDLCLI.getInfo({
url,
format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ],
processOptions
})