Force stop remote live transcoding

pull/5817/head
Chocobozzz 2023-05-22 13:44:22 +02:00
parent f3bc1b5416
commit 17ecdf61ce
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
10 changed files with 45 additions and 5 deletions

View File

@ -34,6 +34,8 @@ export class ProcessLiveRTMPHLSTranscoding {
constructor (private readonly options: ProcessOptions<RunnerJobLiveRTMPHLSTranscodingPayload>) {
this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID())
logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
}
process () {
@ -289,6 +291,7 @@ export class ProcessLiveRTMPHLSTranscoding {
})
} catch (err) {
if (currentTry >= 3) throw err
if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err
logger.warn({ err }, 'Will retry update after error')
await wait(250)
@ -310,6 +313,8 @@ export class ProcessLiveRTMPHLSTranscoding {
// ---------------------------------------------------------------------------
private cleanup () {
logger.debug(`Cleaning up job ${this.options.job.uuid}`)
for (const fsWatcher of this.fsWatchers) {
fsWatcher.close()
.catch(err => logger.error({ err }, 'Cannot close watcher'))

View File

@ -178,6 +178,10 @@ class LiveManager {
return !!this.rtmpServer
}
hasSession (sessionId: string) {
return this.getContext().sessions.has(sessionId)
}
stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
const sessionId = this.videoSessions.get(videoUUID)
if (!sessionId) {

View File

@ -477,6 +477,7 @@ class MuxingSession extends EventEmitter {
lTags: this.lTags,
sessionId: this.sessionId,
inputLocalUrl: this.inputLocalUrl,
inputPublicUrl: this.inputPublicUrl,

View File

@ -25,6 +25,7 @@ interface AbstractTranscodingWrapperOptions {
lTags: LoggerTagsFn
sessionId: string
inputLocalUrl: string
inputPublicUrl: string
@ -52,6 +53,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter {
fps: number
}[]
protected readonly sessionId: string
protected readonly inputLocalUrl: string
protected readonly inputPublicUrl: string
@ -80,6 +82,7 @@ abstract class AbstractTranscodingWrapper extends EventEmitter {
this.videoUUID = options.videoLive.Video.uuid
this.streamingPlaylist = options.streamingPlaylist
this.sessionId = options.sessionId
this.inputLocalUrl = options.inputLocalUrl
this.inputPublicUrl = options.inputPublicUrl

View File

@ -5,6 +5,7 @@ export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper {
async run () {
await new LiveRTMPHLSTranscodingJobHandler().create({
rtmpUrl: this.inputPublicUrl,
sessionId: this.sessionId,
toTranscode: this.toTranscode,
video: this.videoLive.Video,
outputDirectory: this.outDirectory,

View File

@ -20,6 +20,7 @@ type CreateOptions = {
video: MVideo
playlist: MStreamingPlaylist
sessionId: string
rtmpUrl: string
toTranscode: {
@ -37,7 +38,7 @@ type CreateOptions = {
export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> {
async create (options: CreateOptions) {
const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options
const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory, sessionId } = options
const jobUUID = buildUUID()
const payload: RunnerJobLiveRTMPHLSTranscodingPayload = {
@ -54,6 +55,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = {
videoUUID: video.uuid,
masterPlaylistName: playlist.playlistFilename,
sessionId,
outputDirectory
}

View File

@ -11,8 +11,16 @@ import {
} from '@server/helpers/custom-validators/runners/jobs'
import { isRunnerTokenValid } from '@server/helpers/custom-validators/runners/runners'
import { cleanUpReqFiles } from '@server/helpers/express-utils'
import { LiveManager } from '@server/lib/live'
import { RunnerJobModel } from '@server/models/runner/runner-job'
import { HttpStatusCode, RunnerJobState, RunnerJobSuccessBody, RunnerJobUpdateBody, ServerErrorCode } from '@shared/models'
import {
HttpStatusCode,
RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
RunnerJobState,
RunnerJobSuccessBody,
RunnerJobUpdateBody,
ServerErrorCode
} from '@shared/models'
import { areValidationErrors } from '../shared'
const tags = [ 'runner' ]
@ -48,8 +56,9 @@ export const updateRunnerJobValidator = [
if (areValidationErrors(req, res, { tags })) return cleanUpReqFiles(req)
const body = req.body as RunnerJobUpdateBody
const job = res.locals.runnerJob
if (isRunnerJobUpdatePayloadValid(body.payload, res.locals.runnerJob.type, req.files) !== true) {
if (isRunnerJobUpdatePayloadValid(body.payload, job.type, req.files) !== true) {
cleanUpReqFiles(req)
return res.fail({
@ -59,6 +68,20 @@ export const updateRunnerJobValidator = [
})
}
if (res.locals.runnerJob.type === 'live-rtmp-hls-transcoding') {
const privatePayload = job.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
if (!LiveManager.Instance.hasSession(privatePayload.sessionId)) {
cleanUpReqFiles(req)
return res.fail({
status: HttpStatusCode.BAD_REQUEST_400,
message: 'Session of this live ended',
tags
})
}
}
return next()
}
]

View File

@ -197,7 +197,7 @@ describe('Test live constraints', function () {
live: {
enabled: true,
allowReplay: true,
maxDuration: 1,
maxDuration: 3,
transcoding: {
enabled: true,
resolutions: ConfigCommand.getCustomConfigResolutions(true)

View File

@ -525,7 +525,7 @@ describe('Test videos search', function () {
})
it('Should search by live', async function () {
this.timeout(60000)
this.timeout(120000)
{
const newConfig = {

View File

@ -34,6 +34,7 @@ export interface RunnerJobLiveRTMPHLSTranscodingPrivatePayload {
videoUUID: string
masterPlaylistName: string
outputDirectory: string
sessionId: string
}
// ---------------------------------------------------------------------------