From e7b9311e927f6ddd8bb7b2b0298a15c046ddd037 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 5 Aug 2024 16:32:55 +0200 Subject: [PATCH] Don't send m3u8 containing non existing chunks --- .../src/server/process/shared/process-live.ts | 46 +++++++++++++++---- .../server-commands/src/requests/requests.ts | 15 +++++- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/apps/peertube-runner/src/server/process/shared/process-live.ts b/apps/peertube-runner/src/server/process/shared/process-live.ts index ffb3c0c5a..ba8cf1401 100644 --- a/apps/peertube-runner/src/server/process/shared/process-live.ts +++ b/apps/peertube-runner/src/server/process/shared/process-live.ts @@ -1,7 +1,3 @@ -import { FSWatcher, watch } from 'chokidar' -import { FfmpegCommand } from 'fluent-ffmpeg' -import { ensureDir, remove } from 'fs-extra/esm' -import { basename, join } from 'path' import { wait } from '@peertube/peertube-core-utils' import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@peertube/peertube-ffmpeg' import { @@ -12,10 +8,18 @@ import { ServerErrorCode } from '@peertube/peertube-models' import { buildUUID } from '@peertube/peertube-node-utils' +import { FSWatcher, watch } from 'chokidar' +import { FfmpegCommand } from 'fluent-ffmpeg' +import { ensureDir, remove } from 'fs-extra/esm' +import { readFile } from 'fs/promises' +import { basename, join } from 'path' import { ConfigManager } from '../../../shared/config-manager.js' import { logger } from '../../../shared/index.js' import { buildFFmpegLive, ProcessOptions } from './common.js' +type CustomLiveRTMPHLSTranscodingUpdatePayload = + Omit & { resolutionPlaylistFile?: [ Buffer, string ] | Blob | string } + export class ProcessLiveRTMPHLSTranscoding { private readonly outputPath: string @@ -27,6 +31,8 @@ export class ProcessLiveRTMPHLSTranscoding { private readonly playlistsCreated = new Set() private allPlaylistsCreated = false + private latestFilteredPlaylistContent: { [name: string]: string } = {} + private ffmpegCommand: FfmpegCommand private ended = false @@ -239,7 +245,7 @@ export class ProcessLiveRTMPHLSTranscoding { const videoChunkFilename = basename(deletedChunk) - let payload: LiveRTMPHLSTranscodingUpdatePayload = { + let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = { type: 'remove-chunk', videoChunkFilename } @@ -249,9 +255,10 @@ export class ProcessLiveRTMPHLSTranscoding { payload = { ...payload, + masterPlaylistFile: join(this.outputPath, 'master.m3u8'), resolutionPlaylistFilename: playlistName, - resolutionPlaylistFile: join(this.outputPath, playlistName) + resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName) } } @@ -269,7 +276,7 @@ export class ProcessLiveRTMPHLSTranscoding { const videoChunkFilename = basename(chunk) - let payload: LiveRTMPHLSTranscodingUpdatePayload = { + let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = { type: 'add-chunk', videoChunkFilename, videoChunkFile: chunk @@ -278,11 +285,14 @@ export class ProcessLiveRTMPHLSTranscoding { if (this.allPlaylistsCreated) { const playlistName = this.getPlaylistName(videoChunkFilename) + await this.updatePlaylistContent(playlistName, videoChunkFilename) + payload = { ...payload, + masterPlaylistFile: join(this.outputPath, 'master.m3u8'), resolutionPlaylistFilename: playlistName, - resolutionPlaylistFile: join(this.outputPath, playlistName) + resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName) } } @@ -295,7 +305,7 @@ export class ProcessLiveRTMPHLSTranscoding { await Promise.all(promises) } - private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { + private async updateWithRetry (payload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { if (this.ended || this.errored) return try { @@ -303,7 +313,7 @@ export class ProcessLiveRTMPHLSTranscoding { jobToken: this.options.job.jobToken, jobUUID: this.options.job.uuid, runnerToken: this.options.runnerToken, - payload + payload: payload as any }) } catch (err) { if (currentTry >= 3) throw err @@ -326,6 +336,22 @@ export class ProcessLiveRTMPHLSTranscoding { return basename(segmentPath).match(playlistIdMatcher)[1] } + private async updatePlaylistContent (playlistName: string, latestChunkFilename: string) { + const m3u8Path = join(this.outputPath, playlistName) + const playlistContent = await readFile(m3u8Path, 'utf-8') + + // Remove new chunk references, that will be processed later + this.latestFilteredPlaylistContent[playlistName] = playlistContent + .substring(0, playlistContent.lastIndexOf(latestChunkFilename) + latestChunkFilename.length) + '\n' + } + + private buildPlaylistFileParam (playlistName: string) { + return [ + Buffer.from(this.latestFilteredPlaylistContent[playlistName], 'utf-8'), + join(this.outputPath, 'master.m3u8') + ] as [ Buffer, string ] + } + // --------------------------------------------------------------------------- private cleanup () { diff --git a/packages/server-commands/src/requests/requests.ts b/packages/server-commands/src/requests/requests.ts index 49b7f9ce6..93d9028a6 100644 --- a/packages/server-commands/src/requests/requests.ts +++ b/packages/server-commands/src/requests/requests.ts @@ -141,9 +141,20 @@ export function makeUploadRequest (options: CommonRequestParams & { if (!value) return if (Array.isArray(value)) { - req.attach(attach, buildAbsoluteFixturePath(value[0]), value[1]) + req.attach( + attach, + value[0] instanceof Buffer + ? value[0] + : buildAbsoluteFixturePath(value[0]), + value[1] + ) } else { - req.attach(attach, buildAbsoluteFixturePath(value)) + req.attach( + attach, + value instanceof Buffer + ? value + : buildAbsoluteFixturePath(value) + ) } })