diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index ef4ecb83e..57e28aabf 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' -import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' +import { + removeHLSFileObjectStorageByPath, + storeHLSFileFromContent, + storeHLSFileFromFilename, + storeHLSFileFromPath +} from '@server/lib/object-storage' import { VideoFileModel } from '@server/models/video/video-file' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' @@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter { private readonly lTags: LoggerTagsFn + // Path -> Queue + private readonly objectStorageSendQueues = new Map() + private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} private streamingPlaylist: MStreamingPlaylistVideo @@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter { this.watchMasterFile() this.watchTSFiles() - this.watchM3U8File() } abort () { @@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter { }) } - private watchM3U8File () { - const sendQueues = new Map() - - const onChange = async (m3u8Path: string) => { - if (m3u8Path.endsWith('.m3u8') !== true) return - if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return - - logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) - - try { - if (!sendQueues.has(m3u8Path)) { - sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) - } - - const queue = sendQueues.get(m3u8Path) - await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)) - } catch (err) { - logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) - } - } - - this.filesWatcher.on('change', onChange) - } - private watchTSFiles () { const startStreamDateTime = new Date().getTime() - const playlistIdMatcher = /^([\d+])-/ - const addHandler = async (segmentPath: string) => { if (segmentPath.endsWith('.ts') !== true) return logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) - const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] + const playlistId = this.getPlaylistIdFromTS(segmentPath) const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] this.processSegments(segmentsToProcess) @@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter { if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { try { await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) + + await this.processM3U8ToObjectStorage(segmentPath) } catch (err) { logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) } @@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter { } } + private async processM3U8ToObjectStorage (segmentPath: string) { + const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath)) + + logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags()) + + const segmentName = basename(segmentPath) + + const playlistContent = await readFile(m3u8Path, 'utf-8') + // Remove new chunk references, that will be processed later + const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n' + + try { + if (!this.objectStorageSendQueues.has(m3u8Path)) { + this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) + } + + const queue = this.objectStorageSendQueues.get(m3u8Path) + await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent)) + } catch (err) { + logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) + } + } + private onTranscodingError () { this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) } @@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter { ? new RemoteTranscodingWrapper(options) : new FFmpegTranscodingWrapper(options) } + + private getPlaylistIdFromTS (segmentPath: string) { + const playlistIdMatcher = /^([\d+])-/ + + return basename(segmentPath).match(playlistIdMatcher)[1] + } + + private getPlaylistNameFromTS (segmentPath: string) { + return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8` + } } // --------------------------------------------------------------------------- diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index be94b01a8..f517c5f69 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts @@ -59,6 +59,20 @@ async function storeObject (options: { return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) } +async function storeContent (options: { + content: string + inputPath: string + objectStorageKey: string + bucketInfo: BucketInfo + isPrivate: boolean +}): Promise { + const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options + + logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) + + return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate }) +} + // --------------------------------------------------------------------------- async function updateObjectACL (options: { @@ -206,6 +220,7 @@ export { buildKey, storeObject, + storeContent, removeObject, removeObjectByFullKey, @@ -223,7 +238,7 @@ export { // --------------------------------------------------------------------------- async function uploadToStorage (options: { - content: ReadStream + content: ReadStream | string objectStorageKey: string bucketInfo: BucketInfo isPrivate: boolean diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts index bfdef94fd..57d978e4c 100644 --- a/server/lib/object-storage/videos.ts +++ b/server/lib/object-storage/videos.ts @@ -42,6 +42,15 @@ function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) }) } +function storeHLSFileFromContent (playlist: MStreamingPlaylistVideo, path: string, content: string) { + return storeObject({ + inputPath: path, + objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)), + bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS, + isPrivate: playlist.Video.hasPrivateStaticPath() + }) +} + // --------------------------------------------------------------------------- function storeWebTorrentFile (video: MVideo, file: MVideoFile) { @@ -166,6 +175,7 @@ export { storeWebTorrentFile, storeHLSFileFromFilename, storeHLSFileFromPath, + storeHLSFileFromContent, updateWebTorrentFileACL, updateHLSFilesACL, diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts index 48a70d891..c3b0a95cc 100644 --- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts @@ -80,6 +80,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler