diff --git a/server/helpers/ffmpeg-utils.ts b/server/helpers/ffmpeg-utils.ts index 268ed7624..3b794b8a2 100644 --- a/server/helpers/ffmpeg-utils.ts +++ b/server/helpers/ffmpeg-utils.ts @@ -353,7 +353,7 @@ function convertWebPToJPG (path: string, destination: string): Promise { }) } -function runLiveTranscoding (rtmpUrl: string, outPath: string, resolutions: number[], deleteSegments: boolean) { +function runLiveTranscoding (rtmpUrl: string, outPath: string, resolutions: number[], fps, deleteSegments: boolean) { const command = getFFmpeg(rtmpUrl) command.inputOption('-fflags nobuffer') @@ -375,10 +375,6 @@ function runLiveTranscoding (rtmpUrl: string, outPath: string, resolutions: numb })) ]) - const liveFPS = VIDEO_TRANSCODING_FPS.AVERAGE - - command.withFps(liveFPS) - command.outputOption('-b_strategy 1') command.outputOption('-bf 16') command.outputOption('-preset superfast') @@ -386,13 +382,14 @@ function runLiveTranscoding (rtmpUrl: string, outPath: string, resolutions: numb command.outputOption('-map_metadata -1') command.outputOption('-pix_fmt yuv420p') command.outputOption('-max_muxing_queue_size 1024') + command.outputOption('-g ' + (fps * 2)) for (let i = 0; i < resolutions.length; i++) { const resolution = resolutions[i] command.outputOption(`-map [vout${resolution}]`) command.outputOption(`-c:v:${i} libx264`) - command.outputOption(`-b:v:${i} ${getTargetBitrate(resolution, liveFPS, VIDEO_TRANSCODING_FPS)}`) + command.outputOption(`-b:v:${i} ${getTargetBitrate(resolution, fps, VIDEO_TRANSCODING_FPS)}`) command.outputOption(`-map a:0`) command.outputOption(`-c:a:${i} aac`) @@ -443,8 +440,8 @@ async function hlsPlaylistToFragmentedMP4 (hlsDirectory: string, segmentFiles: s command.run() function cleaner () { - remove(concatFile) - .catch(err => logger.error('Cannot remove concat file in %s.', hlsDirectory, { err })) + remove(concatFilePath) + .catch(err => logger.error('Cannot remove concat file in %s.', hlsDirectory, { err })) } return new Promise((res, rej) => { @@ -497,7 +494,7 @@ function addDefaultX264Params (command: ffmpeg.FfmpegCommand) { } function addDefaultLiveHLSParams (command: ffmpeg.FfmpegCommand, outPath: string, deleteSegments: boolean) { - command.outputOption('-hls_time ' + VIDEO_LIVE.SEGMENT_TIME) + command.outputOption('-hls_time ' + VIDEO_LIVE.SEGMENT_TIME_SECONDS) command.outputOption('-hls_list_size ' + VIDEO_LIVE.SEGMENTS_LIST_SIZE) if (deleteSegments === true) { diff --git a/server/helpers/video.ts b/server/helpers/video.ts index 488b4da17..999137c6d 100644 --- a/server/helpers/video.ts +++ b/server/helpers/video.ts @@ -15,7 +15,7 @@ import { MVideoThumbnail, MVideoWithRights } from '@server/types/models' -import { VideoPrivacy, VideoTranscodingPayload } from '@shared/models' +import { VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' import { VideoModel } from '../models/video/video' type VideoFetchType = 'all' | 'only-video' | 'only-video-with-rights' | 'id' | 'none' | 'only-immutable-attributes' @@ -104,6 +104,13 @@ function isPrivacyForFederation (privacy: VideoPrivacy) { (CONFIG.FEDERATION.VIDEOS.FEDERATE_UNLISTED === true && castedPrivacy === VideoPrivacy.UNLISTED) } +function isStateForFederation (state: VideoState) { + const castedState = parseInt(state + '', 10) + + return castedState === VideoState.PUBLISHED || castedState === VideoState.WAITING_FOR_LIVE || castedState === VideoState.LIVE_ENDED + +} + function getPrivaciesForFederation () { return (CONFIG.FEDERATION.VIDEOS.FEDERATE_UNLISTED === true) ? [ { privacy: VideoPrivacy.PUBLIC }, { privacy: VideoPrivacy.UNLISTED } ] @@ -127,6 +134,7 @@ export { addOptimizeOrMergeAudioJob, extractVideo, getExtFromMimetype, + isStateForFederation, isPrivacyForFederation, getPrivaciesForFederation } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index f8380eaa0..d1f94e6e6 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -609,7 +609,7 @@ const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls') const VIDEO_LIVE = { EXTENSION: '.ts', CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes - SEGMENT_TIME: 4, // 4 seconds + SEGMENT_TIME_SECONDS: 4, // 4 seconds SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist RTMP: { CHUNK_SIZE: 60000, @@ -738,7 +738,8 @@ if (isTestInstance() === true) { PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 - VIDEO_LIVE.CLEANUP_DELAY = 10000 + VIDEO_LIVE.CLEANUP_DELAY = 5000 + VIDEO_LIVE.SEGMENT_TIME_SECONDS = 2 } updateWebserverUrls() diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index ea1e6a38f..ab4aac0a1 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -85,7 +85,7 @@ async function federateVideoIfNeeded (videoArg: MVideoAPWithoutCaption, isNewVid // Check this is not a blacklisted video, or unfederated blacklisted video (video.isBlacklisted() === false || (isNewVideo === false && video.VideoBlacklist.unfederated === false)) && // Check the video is public/unlisted and published - video.hasPrivacyForFederation() && (video.state === VideoState.PUBLISHED || video.state === VideoState.WAITING_FOR_LIVE) + video.hasPrivacyForFederation() && video.hasStateForFederation() ) { // Fetch more attributes that we will need to serialize in AP object if (isArray(video.VideoCaptions) === false) { @@ -302,7 +302,7 @@ async function updateVideoFromAP (options: { }) { const { video, videoObject, account, channel, overrideTo } = options - logger.debug('Updating remote video "%s".', options.videoObject.uuid, { account, channel }) + logger.debug('Updating remote video "%s".', options.videoObject.uuid, { videoObject: options.videoObject, account, channel }) let videoFieldsSave: any const wasPrivateVideo = video.privacy === VideoPrivacy.PRIVATE @@ -562,6 +562,8 @@ function isAPHashTagObject (url: any): url is ActivityHashTagObject { return url && url.type === 'Hashtag' } + + async function createVideo (videoObject: VideoObject, channel: MChannelAccountLight, waitThumbnail = false) { logger.debug('Adding remote video %s.', videoObject.id) diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 1e964726e..2b900998a 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -6,22 +6,31 @@ import { publishAndFederateIfNeeded } from '@server/lib/video' import { getHLSDirectory } from '@server/lib/video-paths' import { generateHlsPlaylist } from '@server/lib/video-transcoding' import { VideoModel } from '@server/models/video/video' +import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylist, MVideo, MVideoLive, MVideoWithFile } from '@server/types/models' +import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' import { VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger } from '../../../helpers/logger' -import { VideoFileModel } from '@server/models/video/video-file' async function processVideoLiveEnding (job: Bull.Job) { const payload = job.data as VideoLiveEndingPayload + function logError () { + logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId) + } + const video = await VideoModel.load(payload.videoId) const live = await VideoLiveModel.loadByVideoId(payload.videoId) + if (!video || !live) { + logError() + return + } + const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) - if (!video || !streamingPlaylist || !live) { - logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId) + if (!streamingPlaylist) { + logError() return } @@ -52,21 +61,21 @@ async function saveLive (video: MVideo, live: MVideoLive) { const playlistPath = join(hlsDirectory, playlistFile) const { videoFileResolution } = await getVideoFileResolution(playlistPath) - const mp4TmpName = buildMP4TmpName(videoFileResolution) + const mp4TmpPath = buildMP4TmpPath(hlsDirectory, videoFileResolution) // Playlist name is for example 3.m3u8 // Segments names are 3-0.ts 3-1.ts etc const shouldStartWith = playlistFile.replace(/\.m3u8$/, '') + '-' const segmentFiles = files.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts')) - await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpName) + await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpPath) for (const file of segmentFiles) { await remove(join(hlsDirectory, file)) } if (!duration) { - duration = await getDurationFromVideoFile(mp4TmpName) + duration = await getDurationFromVideoFile(mp4TmpPath) } resolutions.push(videoFileResolution) @@ -90,7 +99,7 @@ async function saveLive (video: MVideo, live: MVideoLive) { hlsPlaylist.VideoFiles = [] for (const resolution of resolutions) { - const videoInputPath = buildMP4TmpName(resolution) + const videoInputPath = buildMP4TmpPath(hlsDirectory, resolution) const { isPortraitMode } = await getVideoFileResolution(videoInputPath) await generateHlsPlaylist({ @@ -101,7 +110,7 @@ async function saveLive (video: MVideo, live: MVideoLive) { isPortraitMode }) - await remove(join(hlsDirectory, videoInputPath)) + await remove(videoInputPath) } await publishAndFederateIfNeeded(video, true) @@ -110,7 +119,7 @@ async function saveLive (video: MVideo, live: MVideoLive) { async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { const hlsDirectory = getHLSDirectory(video, false) - await cleanupLiveFiles(hlsDirectory) + await remove(hlsDirectory) streamingPlaylist.destroy() .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) @@ -135,6 +144,6 @@ async function cleanupLiveFiles (hlsDirectory: string) { } } -function buildMP4TmpName (resolution: number) { - return resolution + '-tmp.mp4' +function buildMP4TmpPath (basePath: string, resolution: number) { + return join(basePath, resolution + '-tmp.mp4') } diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index 2d8f906e9..6eb05c9d6 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -4,7 +4,7 @@ import * as chokidar from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' import { ensureDir, stat } from 'fs-extra' import { basename } from 'path' -import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' +import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution, getVideoStreamCodec, getVideoStreamSize, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' import { logger } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' @@ -137,6 +137,13 @@ class LiveManager { this.abortSession(sessionId) } + getLiveQuotaUsedByUser (userId: number) { + const currentLives = this.livesPerUser.get(userId) + if (!currentLives) return 0 + + return currentLives.reduce((sum, obj) => sum + obj.size, 0) + } + private getContext () { return context } @@ -173,8 +180,15 @@ class LiveManager { const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) const session = this.getContext().sessions.get(sessionId) + const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath + + const [ resolutionResult, fps ] = await Promise.all([ + getVideoFileResolution(rtmpUrl), + getVideoFileFPS(rtmpUrl) + ]) + const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED - ? computeResolutionsToTranscode(session.videoHeight, 'live') + ? computeResolutionsToTranscode(resolutionResult.videoFileResolution, 'live') : [] logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { resolutionsEnabled }) @@ -193,8 +207,9 @@ class LiveManager { sessionId, videoLive, playlist: videoStreamingPlaylist, - streamPath, originalResolution: session.videoHeight, + rtmpUrl, + fps, resolutionsEnabled }) } @@ -203,11 +218,12 @@ class LiveManager { sessionId: string videoLive: MVideoLiveVideo playlist: MStreamingPlaylist - streamPath: string + rtmpUrl: string + fps: number resolutionsEnabled: number[] originalResolution: number }) { - const { sessionId, videoLive, playlist, streamPath, resolutionsEnabled, originalResolution } = options + const { sessionId, videoLive, playlist, resolutionsEnabled, originalResolution, fps, rtmpUrl } = options const startStreamDateTime = new Date().getTime() const allResolutions = resolutionsEnabled.concat([ originalResolution ]) @@ -238,17 +254,16 @@ class LiveManager { const outPath = getHLSDirectory(videoLive.Video) await ensureDir(outPath) + const videoUUID = videoLive.Video.uuid const deleteSegments = videoLive.saveReplay === false - const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED - ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, deleteSegments) + ? runLiveTranscoding(rtmpUrl, outPath, allResolutions, fps, deleteSegments) : runLiveMuxing(rtmpUrl, outPath, deleteSegments) - logger.info('Running live muxing/transcoding.') + logger.info('Running live muxing/transcoding for %s.', videoUUID) this.transSessions.set(sessionId, ffmpegExec) - const videoUUID = videoLive.Video.uuid const tsWatcher = chokidar.watch(outPath + '/*.ts') const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) @@ -307,7 +322,7 @@ class LiveManager { }) const onFFmpegEnded = () => { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath) + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) this.transSessions.delete(sessionId) @@ -332,13 +347,6 @@ class LiveManager { ffmpegExec.on('end', () => onFFmpegEnded()) } - getLiveQuotaUsedByUser (userId: number) { - const currentLives = this.livesPerUser.get(userId) - if (!currentLives) return 0 - - return currentLives.reduce((sum, obj) => sum + obj.size, 0) - } - private async onEndTransmuxing (videoId: number, cleanupNow = false) { try { const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 7e008f7ea..8e71f8c32 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -25,7 +25,7 @@ import { UpdatedAt } from 'sequelize-typescript' import { buildNSFWFilter } from '@server/helpers/express-utils' -import { getPrivaciesForFederation, isPrivacyForFederation } from '@server/helpers/video' +import { getPrivaciesForFederation, isPrivacyForFederation, isStateForFederation } from '@server/helpers/video' import { LiveManager } from '@server/lib/live-manager' import { getHLSDirectory, getTorrentFileName, getTorrentFilePath, getVideoFilename, getVideoFilePath } from '@server/lib/video-paths' import { getServerActor } from '@server/models/application/application' @@ -823,6 +823,8 @@ export class VideoModel extends Model { static stopLiveIfNeeded (instance: VideoModel) { if (!instance.isLive) return + logger.info('Stopping live of video %s after video deletion.', instance.uuid) + return LiveManager.Instance.stopSessionOf(instance.id) } @@ -1921,6 +1923,10 @@ export class VideoModel extends Model { return isPrivacyForFederation(this.privacy) } + hasStateForFederation () { + return isStateForFederation(this.state) + } + isNewVideo (newPrivacy: VideoPrivacy) { return this.hasPrivacyForFederation() === false && isPrivacyForFederation(newPrivacy) === true } diff --git a/server/tests/api/check-params/live.ts b/server/tests/api/check-params/live.ts index 3e97dffdc..2b2d1beec 100644 --- a/server/tests/api/check-params/live.ts +++ b/server/tests/api/check-params/live.ts @@ -1,7 +1,6 @@ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ import 'mocha' -import * as chai from 'chai' import { omit } from 'lodash' import { join } from 'path' import { LiveVideo, VideoPrivacy } from '@shared/models' @@ -14,11 +13,11 @@ import { immutableAssign, makePostBodyRequest, makeUploadRequest, + runAndTestFfmpegStreamError, sendRTMPStream, ServerInfo, setAccessTokensToServers, stopFfmpeg, - testFfmpegStreamError, updateCustomSubConfig, updateLive, uploadVideoAndGetId, @@ -30,9 +29,7 @@ describe('Test video lives API validator', function () { const path = '/api/v1/videos/live' let server: ServerInfo let userAccessToken = '' - let accountName: string let channelId: number - let channelName: string let videoId: number let videoIdNotLive: number @@ -414,7 +411,7 @@ describe('Test video lives API validator', function () { await waitUntilLiveStarts(server.url, server.accessToken, videoId) - await testFfmpegStreamError(server.url, server.accessToken, videoId, true) + await runAndTestFfmpegStreamError(server.url, server.accessToken, videoId, true) await stopFfmpeg(command) }) diff --git a/server/tests/api/live/index.ts b/server/tests/api/live/index.ts index 280daf423..ee77af286 100644 --- a/server/tests/api/live/index.ts +++ b/server/tests/api/live/index.ts @@ -1 +1,3 @@ +export * from './live-constraints' +export * from './live-save-replay' export * from './live' diff --git a/server/tests/api/live/live-constraints.ts b/server/tests/api/live/live-constraints.ts new file mode 100644 index 000000000..23c8e3b0a --- /dev/null +++ b/server/tests/api/live/live-constraints.ts @@ -0,0 +1,199 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' +import * as chai from 'chai' +import { User, VideoDetails, VideoPrivacy } from '@shared/models' +import { + checkLiveCleanup, + cleanupTests, + createLive, + createUser, + doubleFollow, + flushAndRunMultipleServers, + getMyUserInformation, + getVideo, + runAndTestFfmpegStreamError, + ServerInfo, + setAccessTokensToServers, + setDefaultVideoChannel, + updateCustomSubConfig, + updateUser, + userLogin, + wait, + waitJobs +} from '../../../../shared/extra-utils' + +const expect = chai.expect + +describe('Test live constraints', function () { + let servers: ServerInfo[] = [] + let userId: number + let userAccessToken: string + let userChannelId: number + + async function createLiveWrapper (saveReplay: boolean) { + const liveAttributes = { + name: 'user live', + channelId: userChannelId, + privacy: VideoPrivacy.PUBLIC, + saveReplay + } + + const res = await createLive(servers[0].url, userAccessToken, liveAttributes) + return res.body.video.uuid as string + } + + async function checkSaveReplay (videoId: string, resolutions = [ 720 ]) { + for (const server of servers) { + const res = await getVideo(server.url, videoId) + + const video: VideoDetails = res.body + expect(video.isLive).to.be.false + expect(video.duration).to.be.greaterThan(0) + } + + await checkLiveCleanup(servers[0], videoId, resolutions) + } + + before(async function () { + this.timeout(120000) + + servers = await flushAndRunMultipleServers(2) + + // Get the access tokens + await setAccessTokensToServers(servers) + await setDefaultVideoChannel(servers) + + await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true, + transcoding: { + enabled: false + } + } + }) + + { + const user = { username: 'user1', password: 'superpassword' } + const res = await createUser({ + url: servers[0].url, + accessToken: servers[0].accessToken, + username: user.username, + password: user.password + }) + userId = res.body.user.id + + userAccessToken = await userLogin(servers[0], user) + + const resMe = await getMyUserInformation(servers[0].url, userAccessToken) + userChannelId = (resMe.body as User).videoChannels[0].id + + await updateUser({ + url: servers[0].url, + userId, + accessToken: servers[0].accessToken, + videoQuota: 1, + videoQuotaDaily: -1 + }) + } + + // Server 1 and server 2 follow each other + await doubleFollow(servers[0], servers[1]) + }) + + it('Should not have size limit if save replay is disabled', async function () { + this.timeout(60000) + + const userVideoLiveoId = await createLiveWrapper(false) + await runAndTestFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false) + }) + + it('Should have size limit depending on user global quota if save replay is enabled', async function () { + this.timeout(60000) + + // Wait for user quota memoize cache invalidation + await wait(5000) + + const userVideoLiveoId = await createLiveWrapper(true) + await runAndTestFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) + + await waitJobs(servers) + + await checkSaveReplay(userVideoLiveoId) + }) + + it('Should have size limit depending on user daily quota if save replay is enabled', async function () { + this.timeout(60000) + + // Wait for user quota memoize cache invalidation + await wait(5000) + + await updateUser({ + url: servers[0].url, + userId, + accessToken: servers[0].accessToken, + videoQuota: -1, + videoQuotaDaily: 1 + }) + + const userVideoLiveoId = await createLiveWrapper(true) + await runAndTestFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) + + await waitJobs(servers) + + await checkSaveReplay(userVideoLiveoId) + }) + + it('Should succeed without quota limit', async function () { + this.timeout(60000) + + // Wait for user quota memoize cache invalidation + await wait(5000) + + await updateUser({ + url: servers[0].url, + userId, + accessToken: servers[0].accessToken, + videoQuota: 10 * 1000 * 1000, + videoQuotaDaily: -1 + }) + + const userVideoLiveoId = await createLiveWrapper(true) + await runAndTestFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false) + }) + + it('Should have max duration limit', async function () { + this.timeout(30000) + + await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true, + maxDuration: 1, + transcoding: { + enabled: true, + resolutions: { + '240p': true, + '360p': true, + '480p': true, + '720p': true, + '1080p': true, + '2160p': true + } + } + } + }) + + const userVideoLiveoId = await createLiveWrapper(true) + await runAndTestFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) + + await waitJobs(servers) + + await checkSaveReplay(userVideoLiveoId, [ 720, 480, 360, 240 ]) + }) + + after(async function () { + await cleanupTests(servers) + }) +}) diff --git a/server/tests/api/live/live-save-replay.ts b/server/tests/api/live/live-save-replay.ts new file mode 100644 index 000000000..3ffa0c093 --- /dev/null +++ b/server/tests/api/live/live-save-replay.ts @@ -0,0 +1,307 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' +import * as chai from 'chai' +import { FfmpegCommand } from 'fluent-ffmpeg' +import { LiveVideoCreate, VideoDetails, VideoPrivacy, VideoState } from '@shared/models' +import { + addVideoToBlacklist, + checkLiveCleanup, + cleanupTests, + createLive, + doubleFollow, + flushAndRunMultipleServers, + getVideo, + getVideosList, + removeVideo, + sendRTMPStreamInVideo, + ServerInfo, + setAccessTokensToServers, + setDefaultVideoChannel, + stopFfmpeg, + testFfmpegStreamError, + updateCustomSubConfig, + updateVideo, + waitJobs, + waitUntilLiveStarts +} from '../../../../shared/extra-utils' + +const expect = chai.expect + +describe('Save replay setting', function () { + let servers: ServerInfo[] = [] + let liveVideoUUID: string + let ffmpegCommand: FfmpegCommand + + async function createLiveWrapper (saveReplay: boolean) { + if (liveVideoUUID) { + try { + await removeVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitJobs(servers) + } catch {} + } + + const attributes: LiveVideoCreate = { + channelId: servers[0].videoChannel.id, + privacy: VideoPrivacy.PUBLIC, + name: 'my super live', + saveReplay + } + + const res = await createLive(servers[0].url, servers[0].accessToken, attributes) + return res.body.video.uuid + } + + async function checkVideosExist (videoId: string, existsInList: boolean, getStatus?: number) { + for (const server of servers) { + const length = existsInList ? 1 : 0 + + const resVideos = await getVideosList(server.url) + expect(resVideos.body.data).to.have.lengthOf(length) + expect(resVideos.body.total).to.equal(length) + + if (getStatus) { + await getVideo(server.url, videoId, getStatus) + } + } + } + + async function checkVideoState (videoId: string, state: VideoState) { + for (const server of servers) { + const res = await getVideo(server.url, videoId) + expect((res.body as VideoDetails).state.id).to.equal(state) + } + } + + before(async function () { + this.timeout(120000) + + servers = await flushAndRunMultipleServers(2) + + // Get the access tokens + await setAccessTokensToServers(servers) + await setDefaultVideoChannel(servers) + + // Server 1 and server 2 follow each other + await doubleFollow(servers[0], servers[1]) + + await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true, + maxDuration: null, + transcoding: { + enabled: false, + resolutions: { + '240p': true, + '360p': true, + '480p': true, + '720p': true, + '1080p': true, + '2160p': true + } + } + } + }) + }) + + describe('With save replay disabled', function () { + + before(async function () { + this.timeout(10000) + }) + + it('Should correctly create and federate the "waiting for stream" live', async function () { + this.timeout(20000) + + liveVideoUUID = await createLiveWrapper(false) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, false, 200) + await checkVideoState(liveVideoUUID, VideoState.WAITING_FOR_LIVE) + }) + + it('Should correctly have updated the live and federated it when streaming in the live', async function () { + this.timeout(20000) + + ffmpegCommand = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, true, 200) + await checkVideoState(liveVideoUUID, VideoState.PUBLISHED) + }) + + it('Should correctly delete the video files after the stream ended', async function () { + this.timeout(30000) + + await stopFfmpeg(ffmpegCommand) + + await waitJobs(servers) + + // Live still exist, but cannot be played anymore + await checkVideosExist(liveVideoUUID, false, 200) + await checkVideoState(liveVideoUUID, VideoState.LIVE_ENDED) + + // No resolutions saved since we did not save replay + await checkLiveCleanup(servers[0], liveVideoUUID, []) + }) + + it('Should correctly terminate the stream on blacklist and delete the live', async function () { + this.timeout(40000) + + liveVideoUUID = await createLiveWrapper(false) + + ffmpegCommand = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitJobs(servers) + await checkVideosExist(liveVideoUUID, true, 200) + + await Promise.all([ + addVideoToBlacklist(servers[0].url, servers[0].accessToken, liveVideoUUID, 'bad live', true), + testFfmpegStreamError(ffmpegCommand, true) + ]) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, false) + + await getVideo(servers[0].url, liveVideoUUID, 401) + await getVideo(servers[1].url, liveVideoUUID, 404) + + await checkLiveCleanup(servers[0], liveVideoUUID, []) + }) + + it('Should correctly terminate the stream on delete and delete the video', async function () { + this.timeout(40000) + + liveVideoUUID = await createLiveWrapper(false) + + ffmpegCommand = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitJobs(servers) + await checkVideosExist(liveVideoUUID, true, 200) + + await Promise.all([ + testFfmpegStreamError(ffmpegCommand, true), + removeVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + ]) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, false, 404) + await checkLiveCleanup(servers[0], liveVideoUUID, []) + }) + }) + + describe('With save replay enabled', function () { + + it('Should correctly create and federate the "waiting for stream" live', async function () { + this.timeout(20000) + + liveVideoUUID = await createLiveWrapper(true) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, false, 200) + await checkVideoState(liveVideoUUID, VideoState.WAITING_FOR_LIVE) + }) + + it('Should correctly have updated the live and federated it when streaming in the live', async function () { + this.timeout(20000) + + ffmpegCommand = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, true, 200) + await checkVideoState(liveVideoUUID, VideoState.PUBLISHED) + }) + + it('Should correctly have saved the live and federated it after the streaming', async function () { + this.timeout(30000) + + await stopFfmpeg(ffmpegCommand) + + await waitJobs(servers) + + // Live has been transcoded + await checkVideosExist(liveVideoUUID, true, 200) + await checkVideoState(liveVideoUUID, VideoState.PUBLISHED) + }) + + it('Should update the saved live and correctly federate the updated attributes', async function () { + this.timeout(30000) + + await updateVideo(servers[0].url, servers[0].accessToken, liveVideoUUID, { name: 'video updated' }) + await waitJobs(servers) + + for (const server of servers) { + const res = await getVideo(server.url, liveVideoUUID) + expect(res.body.name).to.equal('video updated') + expect(res.body.isLive).to.be.false + } + }) + + it('Should have cleaned up the live files', async function () { + await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ]) + }) + + it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () { + this.timeout(40000) + + liveVideoUUID = await createLiveWrapper(true) + + ffmpegCommand = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitJobs(servers) + await checkVideosExist(liveVideoUUID, true, 200) + + await Promise.all([ + addVideoToBlacklist(servers[0].url, servers[0].accessToken, liveVideoUUID, 'bad live', true), + testFfmpegStreamError(ffmpegCommand, true) + ]) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, false) + + await getVideo(servers[0].url, liveVideoUUID, 401) + await getVideo(servers[1].url, liveVideoUUID, 404) + + await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ]) + }) + + it('Should correctly terminate the stream on delete and delete the video', async function () { + this.timeout(40000) + + liveVideoUUID = await createLiveWrapper(true) + + ffmpegCommand = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitJobs(servers) + await checkVideosExist(liveVideoUUID, true, 200) + + await Promise.all([ + removeVideo(servers[0].url, servers[0].accessToken, liveVideoUUID), + testFfmpegStreamError(ffmpegCommand, true) + ]) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, false, 404) + await checkLiveCleanup(servers[0], liveVideoUUID, []) + }) + }) + + after(async function () { + await cleanupTests(servers) + }) +}) diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index f351e9650..f7ccb453d 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts @@ -4,6 +4,7 @@ import 'mocha' import * as chai from 'chai' import { LiveVideo, LiveVideoCreate, User, VideoDetails, VideoPrivacy } from '@shared/models' import { + addVideoToBlacklist, cleanupTests, createLive, createUser, @@ -15,6 +16,7 @@ import { getVideosList, makeRawRequest, removeVideo, + sendRTMPStream, ServerInfo, setAccessTokensToServers, setDefaultVideoChannel, @@ -22,9 +24,7 @@ import { testImage, updateCustomSubConfig, updateLive, - updateUser, userLogin, - wait, waitJobs } from '../../../../shared/extra-utils' @@ -32,7 +32,6 @@ const expect = chai.expect describe('Test live', function () { let servers: ServerInfo[] = [] - let liveVideoUUID: string let userId: number let userAccessToken: string let userChannelId: number @@ -49,7 +48,10 @@ describe('Test live', function () { await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { live: { enabled: true, - allowReplay: true + allowReplay: true, + transcoding: { + enabled: false + } } }) @@ -74,6 +76,7 @@ describe('Test live', function () { }) describe('Live creation, update and delete', function () { + let liveVideoUUID: string it('Should create a live with the appropriate parameters', async function () { this.timeout(20000) @@ -220,206 +223,74 @@ describe('Test live', function () { }) }) - describe('Test live constraints', function () { + describe('Stream checks', function () { + let liveVideo: LiveVideo & VideoDetails + let rtmpUrl: string - async function createLiveWrapper (saveReplay: boolean) { + before(function () { + rtmpUrl = 'rtmp://' + servers[0].hostname + ':1936' + }) + + async function createLiveWrapper () { const liveAttributes = { name: 'user live', channelId: userChannelId, privacy: VideoPrivacy.PUBLIC, - saveReplay + saveReplay: false } const res = await createLive(servers[0].url, userAccessToken, liveAttributes) - return res.body.video.uuid as string + const uuid = res.body.video.uuid + + const resLive = await getLive(servers[0].url, servers[0].accessToken, uuid) + const resVideo = await getVideo(servers[0].url, uuid) + + return Object.assign(resVideo.body, resLive.body) as LiveVideo & VideoDetails } - before(async function () { - await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { - live: { - enabled: true, - allowReplay: true - } - }) - - await updateUser({ - url: servers[0].url, - userId, - accessToken: servers[0].accessToken, - videoQuota: 1, - videoQuotaDaily: -1 - }) - }) - - it('Should not have size limit if save replay is disabled', async function () { - this.timeout(30000) - - const userVideoLiveoId = await createLiveWrapper(false) - await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false) - }) - - it('Should have size limit depending on user global quota if save replay is enabled', async function () { - this.timeout(30000) - - const userVideoLiveoId = await createLiveWrapper(true) - await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) - - await waitJobs(servers) - - for (const server of servers) { - const res = await getVideo(server.url, userVideoLiveoId) - - const video: VideoDetails = res.body - expect(video.isLive).to.be.false - expect(video.duration).to.be.greaterThan(0) - } - - // TODO: check stream correctly saved + cleaned - }) - - it('Should have size limit depending on user daily quota if save replay is enabled', async function () { - this.timeout(30000) - - await updateUser({ - url: servers[0].url, - userId, - accessToken: servers[0].accessToken, - videoQuota: -1, - videoQuotaDaily: 1 - }) - - const userVideoLiveoId = await createLiveWrapper(true) - await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) - - // TODO: check stream correctly saved + cleaned - }) - - it('Should succeed without quota limit', async function () { - this.timeout(30000) - - // Wait for user quota memoize cache invalidation - await wait(5000) - - await updateUser({ - url: servers[0].url, - userId, - accessToken: servers[0].accessToken, - videoQuota: 10 * 1000 * 1000, - videoQuotaDaily: -1 - }) - - const userVideoLiveoId = await createLiveWrapper(true) - await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false) - }) - - it('Should have max duration limit', async function () { - this.timeout(30000) - - await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { - live: { - enabled: true, - allowReplay: true, - maxDuration: 1 - } - }) - - const userVideoLiveoId = await createLiveWrapper(true) - await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true) - - // TODO: check stream correctly saved + cleaned - }) - }) - - describe('With save replay disabled', function () { - - it('Should correctly create and federate the "waiting for stream" live', async function () { - - }) - - it('Should correctly have updated the live and federated it when streaming in the live', async function () { - - }) - - it('Should correctly delete the video and the live after the stream ended', async function () { - // Wait 10 seconds - // get video 404 - // get video federation 404 - - // check cleanup - }) - - it('Should correctly terminate the stream on blacklist and delete the live', async function () { - // Wait 10 seconds - // get video 404 - // get video federation 404 - - // check cleanup - }) - - it('Should correctly terminate the stream on delete and delete the video', async function () { - // Wait 10 seconds - // get video 404 - // get video federation 404 - - // check cleanup - }) - }) - - describe('With save replay enabled', function () { - - it('Should correctly create and federate the "waiting for stream" live', async function () { - - }) - - it('Should correctly have updated the live and federated it when streaming in the live', async function () { - - }) - - it('Should correctly have saved the live and federated it after the streaming', async function () { - - }) - - it('Should update the saved live and correctly federate the updated attributes', async function () { - - }) - - it('Should have cleaned up the live files', async function () { - - }) - - it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () { - // Wait 10 seconds - // get video -> blacklisted - // get video federation -> blacklisted - - // check cleanup live files quand meme - }) - - it('Should correctly terminate the stream on delete and delete the video', async function () { - // Wait 10 seconds - // get video 404 - // get video federation 404 - - // check cleanup - }) - }) - - describe('Stream checks', function () { - it('Should not allow a stream without the appropriate path', async function () { + this.timeout(30000) + liveVideo = await createLiveWrapper() + + const command = sendRTMPStream(rtmpUrl + '/bad-live', liveVideo.streamKey) + await testFfmpegStreamError(command, true) }) it('Should not allow a stream without the appropriate stream key', async function () { + this.timeout(30000) + const command = sendRTMPStream(rtmpUrl + '/live', 'bad-stream-key') + await testFfmpegStreamError(command, true) + }) + + it('Should succeed with the correct params', async function () { + this.timeout(30000) + + const command = sendRTMPStream(rtmpUrl + '/live', liveVideo.streamKey) + await testFfmpegStreamError(command, false) }) it('Should not allow a stream on a live that was blacklisted', async function () { + this.timeout(30000) + liveVideo = await createLiveWrapper() + + await addVideoToBlacklist(servers[0].url, servers[0].accessToken, liveVideo.uuid) + + const command = sendRTMPStream(rtmpUrl + '/live', liveVideo.streamKey) + await testFfmpegStreamError(command, true) }) it('Should not allow a stream on a live that was deleted', async function () { + this.timeout(30000) + liveVideo = await createLiveWrapper() + + await removeVideo(servers[0].url, servers[0].accessToken, liveVideo.uuid) + + const command = sendRTMPStream(rtmpUrl + '/live', liveVideo.streamKey) + await testFfmpegStreamError(command, true) }) }) diff --git a/shared/extra-utils/videos/live.ts b/shared/extra-utils/videos/live.ts index a391565a4..f90dd420d 100644 --- a/shared/extra-utils/videos/live.ts +++ b/shared/extra-utils/videos/live.ts @@ -1,8 +1,14 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import { expect } from 'chai' import * as ffmpeg from 'fluent-ffmpeg' +import { pathExists, readdir } from 'fs-extra' import { omit } from 'lodash' +import { join } from 'path' import { LiveVideo, LiveVideoCreate, LiveVideoUpdate, VideoDetails, VideoState } from '@shared/models' -import { buildAbsoluteFixturePath, wait } from '../miscs/miscs' +import { buildAbsoluteFixturePath, buildServerDirectory, wait } from '../miscs/miscs' import { makeGetRequest, makePutBodyRequest, makeUploadRequest } from '../requests/requests' +import { ServerInfo } from '../server/servers' import { getVideoWithToken } from './videos' function getLive (url: string, token: string, videoId: number | string, statusCodeExpected = 200) { @@ -47,21 +53,22 @@ function createLive (url: string, token: string, fields: LiveVideoCreate, status }) } -async function sendRTMPStreamInVideo (url: string, token: string, videoId: number | string, onErrorCb?: Function) { +async function sendRTMPStreamInVideo (url: string, token: string, videoId: number | string) { const res = await getLive(url, token, videoId) const videoLive = res.body as LiveVideo - return sendRTMPStream(videoLive.rtmpUrl, videoLive.streamKey, onErrorCb) + return sendRTMPStream(videoLive.rtmpUrl, videoLive.streamKey) } -function sendRTMPStream (rtmpBaseUrl: string, streamKey: string, onErrorCb?: Function) { +function sendRTMPStream (rtmpBaseUrl: string, streamKey: string) { const fixture = buildAbsoluteFixturePath('video_short.mp4') const command = ffmpeg(fixture) command.inputOption('-stream_loop -1') command.inputOption('-re') - - command.outputOption('-c copy') + command.outputOption('-c:v libx264') + command.outputOption('-g 50') + command.outputOption('-keyint_min 2') command.outputOption('-f flv') const rtmpUrl = rtmpBaseUrl + '/' + streamKey @@ -70,7 +77,7 @@ function sendRTMPStream (rtmpBaseUrl: string, streamKey: string, onErrorCb?: Fun command.on('error', err => { if (err?.message?.includes('Exiting normally')) return - if (onErrorCb) onErrorCb(err) + if (process.env.DEBUG) console.error(err) }) if (process.env.DEBUG) { @@ -94,8 +101,13 @@ function waitFfmpegUntilError (command: ffmpeg.FfmpegCommand, successAfterMS = 1 }) } -async function testFfmpegStreamError (url: string, token: string, videoId: number | string, shouldHaveError: boolean) { +async function runAndTestFfmpegStreamError (url: string, token: string, videoId: number | string, shouldHaveError: boolean) { const command = await sendRTMPStreamInVideo(url, token, videoId) + + return testFfmpegStreamError(command, shouldHaveError) +} + +async function testFfmpegStreamError (command: ffmpeg.FfmpegCommand, shouldHaveError: boolean) { let error: Error try { @@ -127,6 +139,31 @@ async function waitUntilLiveStarts (url: string, token: string, videoId: number } while (video.state.id === VideoState.WAITING_FOR_LIVE) } +async function checkLiveCleanup (server: ServerInfo, videoUUID: string, resolutions: number[] = []) { + const basePath = buildServerDirectory(server.internalServerNumber, 'streaming-playlists') + const hlsPath = join(basePath, 'hls', videoUUID) + + if (resolutions.length === 0) { + const result = await pathExists(hlsPath) + expect(result).to.be.false + + return + } + + const files = await readdir(hlsPath) + + // fragmented file and playlist per resolution + master playlist + segments sha256 json file + expect(files).to.have.lengthOf(resolutions.length * 2 + 2) + + for (const resolution of resolutions) { + expect(files).to.contain(`${videoUUID}-${resolution}-fragmented.mp4`) + expect(files).to.contain(`${resolution}.m3u8`) + } + + expect(files).to.contain('master.m3u8') + expect(files).to.contain('segments-sha256.json') +} + // --------------------------------------------------------------------------- export { @@ -134,9 +171,11 @@ export { updateLive, waitUntilLiveStarts, createLive, - testFfmpegStreamError, + runAndTestFfmpegStreamError, + checkLiveCleanup, stopFfmpeg, sendRTMPStreamInVideo, waitFfmpegUntilError, - sendRTMPStream + sendRTMPStream, + testFfmpegStreamError } diff --git a/shared/extra-utils/videos/videos.ts b/shared/extra-utils/videos/videos.ts index 2f7f2182c..29a646541 100644 --- a/shared/extra-utils/videos/videos.ts +++ b/shared/extra-utils/videos/videos.ts @@ -312,6 +312,14 @@ function removeVideo (url: string, token: string, id: number | string, expectedS .expect(expectedStatus) } +async function removeAllVideos (server: ServerInfo) { + const resVideos = await getVideosList(server.url) + + for (const v of resVideos.body.data) { + await removeVideo(server.url, server.accessToken, v.id) + } +} + async function checkVideoFilesWereRemoved ( videoUUID: string, serverNumber: number, @@ -685,6 +693,7 @@ export { getVideoFileMetadataUrl, getVideoWithToken, getVideosList, + removeAllVideos, getVideosListPagination, getVideosListSort, removeVideo,