diff --git a/apps/peertube-runner/src/server/process/shared/process-live.ts b/apps/peertube-runner/src/server/process/shared/process-live.ts index d372d493d..4530d10ec 100644 --- a/apps/peertube-runner/src/server/process/shared/process-live.ts +++ b/apps/peertube-runner/src/server/process/shared/process-live.ts @@ -17,11 +17,15 @@ import { buildUUID } from '@peertube/peertube-node-utils' import { FSWatcher, watch } from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' import { ensureDir, remove } from 'fs-extra/esm' +import { readFile } from 'fs/promises' import { basename, join } from 'path' import { ConfigManager } from '../../../shared/config-manager.js' import { logger } from '../../../shared/index.js' import { buildFFmpegLive, ProcessOptions } from './common.js' +type CustomLiveRTMPHLSTranscodingUpdatePayload = + Omit & { resolutionPlaylistFile?: [ Buffer, string ] | Blob | string } + export class ProcessLiveRTMPHLSTranscoding { private readonly outputPath: string @@ -33,6 +37,8 @@ export class ProcessLiveRTMPHLSTranscoding { private readonly playlistsCreated = new Set() private allPlaylistsCreated = false + private latestFilteredPlaylistContent: { [name: string]: string } = {} + private ffmpegCommand: FfmpegCommand private ended = false @@ -248,7 +254,7 @@ export class ProcessLiveRTMPHLSTranscoding { const videoChunkFilename = basename(deletedChunk) - let payload: LiveRTMPHLSTranscodingUpdatePayload = { + let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = { type: 'remove-chunk', videoChunkFilename } @@ -258,9 +264,10 @@ export class ProcessLiveRTMPHLSTranscoding { payload = { ...payload, + masterPlaylistFile: join(this.outputPath, 'master.m3u8'), resolutionPlaylistFilename: playlistName, - resolutionPlaylistFile: join(this.outputPath, playlistName) + resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName) } } @@ -278,7 +285,7 @@ export class ProcessLiveRTMPHLSTranscoding { const videoChunkFilename = basename(chunk) - let payload: LiveRTMPHLSTranscodingUpdatePayload = { + let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = { type: 'add-chunk', videoChunkFilename, videoChunkFile: chunk @@ -287,11 +294,14 @@ export class ProcessLiveRTMPHLSTranscoding { if (this.allPlaylistsCreated) { const playlistName = this.getPlaylistName(videoChunkFilename) + await this.updatePlaylistContent(playlistName, videoChunkFilename) + payload = { ...payload, + masterPlaylistFile: join(this.outputPath, 'master.m3u8'), resolutionPlaylistFilename: playlistName, - resolutionPlaylistFile: join(this.outputPath, playlistName) + resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName) } } @@ -304,7 +314,7 @@ export class ProcessLiveRTMPHLSTranscoding { await Promise.all(promises) } - private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { + private async updateWithRetry (payload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { if (this.ended || this.errored) return try { @@ -312,7 +322,7 @@ export class ProcessLiveRTMPHLSTranscoding { jobToken: this.options.job.jobToken, jobUUID: this.options.job.uuid, runnerToken: this.options.runnerToken, - payload + payload: payload as any }) } catch (err) { if (currentTry >= 3) throw err @@ -335,6 +345,22 @@ export class ProcessLiveRTMPHLSTranscoding { return basename(segmentPath).match(playlistIdMatcher)[1] } + private async updatePlaylistContent (playlistName: string, latestChunkFilename: string) { + const m3u8Path = join(this.outputPath, playlistName) + const playlistContent = await readFile(m3u8Path, 'utf-8') + + // Remove new chunk references, that will be processed later + this.latestFilteredPlaylistContent[playlistName] = playlistContent + .substring(0, playlistContent.lastIndexOf(latestChunkFilename) + latestChunkFilename.length) + '\n' + } + + private buildPlaylistFileParam (playlistName: string) { + return [ + Buffer.from(this.latestFilteredPlaylistContent[playlistName], 'utf-8'), + join(this.outputPath, 'master.m3u8') + ] as [ Buffer, string ] + } + // --------------------------------------------------------------------------- private cleanup () { diff --git a/packages/server-commands/src/requests/requests.ts b/packages/server-commands/src/requests/requests.ts index 49b7f9ce6..93d9028a6 100644 --- a/packages/server-commands/src/requests/requests.ts +++ b/packages/server-commands/src/requests/requests.ts @@ -141,9 +141,20 @@ export function makeUploadRequest (options: CommonRequestParams & { if (!value) return if (Array.isArray(value)) { - req.attach(attach, buildAbsoluteFixturePath(value[0]), value[1]) + req.attach( + attach, + value[0] instanceof Buffer + ? value[0] + : buildAbsoluteFixturePath(value[0]), + value[1] + ) } else { - req.attach(attach, buildAbsoluteFixturePath(value)) + req.attach( + attach, + value instanceof Buffer + ? value + : buildAbsoluteFixturePath(value) + ) } })