import { FSWatcher, watch } from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' import { ensureDir, remove } from 'fs-extra' import { logger } from 'packages/peertube-runner/shared' import { basename, join } from 'path' import { wait } from '@shared/core-utils' import { buildUUID } from '@shared/extra-utils' import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, hasAudioStream } from '@shared/ffmpeg' import { LiveRTMPHLSTranscodingSuccess, LiveRTMPHLSTranscodingUpdatePayload, PeerTubeProblemDocument, RunnerJobLiveRTMPHLSTranscodingPayload, ServerErrorCode } from '@shared/models' import { ConfigManager } from '../../../shared/config-manager' import { buildFFmpegLive, ProcessOptions } from './common' export class ProcessLiveRTMPHLSTranscoding { private readonly outputPath: string private readonly fsWatchers: FSWatcher[] = [] // Playlist name -> chunks private readonly pendingChunksPerPlaylist = new Map() private readonly playlistsCreated = new Set() private allPlaylistsCreated = false private ffmpegCommand: FfmpegCommand private ended = false private errored = false constructor (private readonly options: ProcessOptions) { this.outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), buildUUID()) logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`) } process () { const job = this.options.job const payload = job.payload return new Promise(async (res, rej) => { try { await ensureDir(this.outputPath) logger.info(`Probing ${payload.input.rtmpUrl}`) const probe = await ffprobePromise(payload.input.rtmpUrl) logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`) const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe) const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe) const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe) const m3u8Watcher = watch(this.outputPath + '/*.m3u8') this.fsWatchers.push(m3u8Watcher) const tsWatcher = watch(this.outputPath + '/*.ts') this.fsWatchers.push(tsWatcher) m3u8Watcher.on('change', p => { logger.debug(`${p} m3u8 playlist changed`) }) m3u8Watcher.on('add', p => { this.playlistsCreated.add(p) if (this.playlistsCreated.size === this.options.job.payload.output.toTranscode.length + 1) { this.allPlaylistsCreated = true logger.info('All m3u8 playlists are created.') } }) tsWatcher.on('add', async p => { try { await this.sendPendingChunks() } catch (err) { this.onUpdateError({ err, rej, res }) } const playlistName = this.getPlaylistIdFromTS(p) const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || [] pendingChunks.push(p) this.pendingChunksPerPlaylist.set(playlistName, pendingChunks) }) tsWatcher.on('unlink', p => { this.sendDeletedChunkUpdate(p) .catch(err => this.onUpdateError({ err, rej, res })) }) this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({ inputUrl: payload.input.rtmpUrl, outPath: this.outputPath, masterPlaylistName: 'master.m3u8', segmentListSize: payload.output.segmentListSize, segmentDuration: payload.output.segmentDuration, toTranscode: payload.output.toTranscode, bitrate, ratio, hasAudio }) logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`) this.ffmpegCommand.on('error', (err, stdout, stderr) => { this.onFFmpegError({ err, stdout, stderr }) res() }) this.ffmpegCommand.on('end', () => { this.onFFmpegEnded() .catch(err => logger.error({ err }, 'Error in FFmpeg end handler')) res() }) this.ffmpegCommand.run() } catch (err) { rej(err) } }) } // --------------------------------------------------------------------------- private onUpdateError (options: { err: Error res: () => void rej: (reason?: any) => void }) { const { err, res, rej } = options if (this.errored) return if (this.ended) return this.errored = true this.ffmpegCommand.kill('SIGINT') const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') res() } else { logger.error({ err }, 'Cannot send update after added/deleted chunk, stopping live transcoding') this.sendError(err) .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) rej(err) } this.cleanup() } // --------------------------------------------------------------------------- private onFFmpegError (options: { err: any stdout: string stderr: string }) { const { err, stdout, stderr } = options // Don't care that we killed the ffmpeg process if (err?.message?.includes('Exiting normally')) return if (this.errored) return if (this.ended) return this.errored = true logger.error({ err, stdout, stderr }, 'FFmpeg transcoding error.') this.sendError(err) .catch(subErr => logger.error({ err: subErr }, 'Cannot send error')) this.cleanup() } private async sendError (err: Error) { await this.options.server.runnerJobs.error({ jobToken: this.options.job.jobToken, jobUUID: this.options.job.uuid, runnerToken: this.options.runnerToken, message: err.message }) } // --------------------------------------------------------------------------- private async onFFmpegEnded () { if (this.ended) return this.ended = true logger.info('FFmpeg ended, sending success to server') // Wait last ffmpeg chunks generation await wait(1500) this.sendSuccess() .catch(err => logger.error({ err }, 'Cannot send success')) this.cleanup() } private async sendSuccess () { const successBody: LiveRTMPHLSTranscodingSuccess = {} await this.options.server.runnerJobs.success({ jobToken: this.options.job.jobToken, jobUUID: this.options.job.uuid, runnerToken: this.options.runnerToken, payload: successBody }) } // --------------------------------------------------------------------------- private sendDeletedChunkUpdate (deletedChunk: string): Promise { if (this.ended) return Promise.resolve() logger.debug(`Sending removed live chunk ${deletedChunk} update`) const videoChunkFilename = basename(deletedChunk) let payload: LiveRTMPHLSTranscodingUpdatePayload = { type: 'remove-chunk', videoChunkFilename } if (this.allPlaylistsCreated) { const playlistName = this.getPlaylistName(videoChunkFilename) payload = { ...payload, masterPlaylistFile: join(this.outputPath, 'master.m3u8'), resolutionPlaylistFilename: playlistName, resolutionPlaylistFile: join(this.outputPath, playlistName) } } return this.updateWithRetry(payload) } private async sendPendingChunks (): Promise { if (this.ended) return Promise.resolve() const promises: Promise[] = [] for (const playlist of this.pendingChunksPerPlaylist.keys()) { for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) { logger.debug(`Sending added live chunk ${chunk} update`) const videoChunkFilename = basename(chunk) let payload: LiveRTMPHLSTranscodingUpdatePayload = { type: 'add-chunk', videoChunkFilename, videoChunkFile: chunk } if (this.allPlaylistsCreated) { const playlistName = this.getPlaylistName(videoChunkFilename) payload = { ...payload, masterPlaylistFile: join(this.outputPath, 'master.m3u8'), resolutionPlaylistFilename: playlistName, resolutionPlaylistFile: join(this.outputPath, playlistName) } } promises.push(this.updateWithRetry(payload)) } this.pendingChunksPerPlaylist.set(playlist, []) } await Promise.all(promises) } private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { if (this.ended || this.errored) return try { await this.options.server.runnerJobs.update({ jobToken: this.options.job.jobToken, jobUUID: this.options.job.uuid, runnerToken: this.options.runnerToken, payload }) } catch (err) { if (currentTry >= 3) throw err if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) throw err logger.warn({ err }, 'Will retry update after error') await wait(250) return this.updateWithRetry(payload, currentTry + 1) } } private getPlaylistName (videoChunkFilename: string) { return `${videoChunkFilename.split('-')[0]}.m3u8` } private getPlaylistIdFromTS (segmentPath: string) { const playlistIdMatcher = /^([\d+])-/ return basename(segmentPath).match(playlistIdMatcher)[1] } // --------------------------------------------------------------------------- private cleanup () { logger.debug(`Cleaning up job ${this.options.job.uuid}`) for (const fsWatcher of this.fsWatchers) { fsWatcher.close() .catch(err => logger.error({ err }, 'Cannot close watcher')) } remove(this.outputPath) .catch(err => logger.error({ err }, `Cannot remove ${this.outputPath}`)) } }