From c5255e784ce824c69f774bc944546dc732134fe2 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 6 Aug 2024 10:57:19 +0200 Subject: [PATCH] Fix concurrency issues when sendin chunks --- .../src/server/process/shared/process-live.ts | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/apps/peertube-runner/src/server/process/shared/process-live.ts b/apps/peertube-runner/src/server/process/shared/process-live.ts index ba8cf1401..1db0585c5 100644 --- a/apps/peertube-runner/src/server/process/shared/process-live.ts +++ b/apps/peertube-runner/src/server/process/shared/process-live.ts @@ -268,41 +268,51 @@ export class ProcessLiveRTMPHLSTranscoding { private async sendPendingChunks (): Promise { if (this.ended) return Promise.resolve() - const promises: Promise[] = [] + const parallelPromises: Promise[] = [] for (const playlist of this.pendingChunksPerPlaylist.keys()) { + let sequentialPromises: Promise + for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) { logger.debug(`Sending added live chunk ${chunk} update`) const videoChunkFilename = basename(chunk) - let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = { - type: 'add-chunk', - videoChunkFilename, - videoChunkFile: chunk - } - - if (this.allPlaylistsCreated) { - const playlistName = this.getPlaylistName(videoChunkFilename) - - await this.updatePlaylistContent(playlistName, videoChunkFilename) - - payload = { - ...payload, - - masterPlaylistFile: join(this.outputPath, 'master.m3u8'), - resolutionPlaylistFilename: playlistName, - resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName) + const payloadBuilder = async () => { + let payload: CustomLiveRTMPHLSTranscodingUpdatePayload = { + type: 'add-chunk', + videoChunkFilename, + videoChunkFile: chunk } + + if (this.allPlaylistsCreated) { + const playlistName = this.getPlaylistName(videoChunkFilename) + + await this.updatePlaylistContent(playlistName, videoChunkFilename) + + payload = { + ...payload, + + masterPlaylistFile: join(this.outputPath, 'master.m3u8'), + resolutionPlaylistFilename: playlistName, + resolutionPlaylistFile: this.buildPlaylistFileParam(playlistName) + } + } + + return payload } - promises.push(this.updateWithRetry(payload)) + const p = payloadBuilder().then(p => this.updateWithRetry(p)) + + if (!sequentialPromises) sequentialPromises = p + else sequentialPromises = sequentialPromises.then(() => p) } + parallelPromises.push(sequentialPromises) this.pendingChunksPerPlaylist.set(playlist, []) } - await Promise.all(promises) + await Promise.all(parallelPromises) } private async updateWithRetry (payload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise {