diff --git a/package.json b/package.json index 15e55e978..0de785618 100644 --- a/package.json +++ b/package.json @@ -189,6 +189,7 @@ "@types/redis": "^2.8.5", "@types/request": "^2.0.3", "@types/socket.io": "^2.1.2", + "@types/socket.io-client": "^1.4.34", "@types/supertest": "^2.0.3", "@types/validator": "^13.0.0", "@types/webtorrent": "^0.107.0", @@ -211,6 +212,7 @@ "marked-man": "^0.7.0", "mocha": "^8.0.1", "nodemon": "^2.0.1", + "socket.io-client": "^2.3.1", "source-map-support": "^0.5.0", "supertest": "^4.0.2", "swagger-cli": "^4.0.2", diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index 6eb05c9d6..d253d06fc 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -244,7 +244,7 @@ class LiveManager { size: -1, extname: '.ts', infoHash: null, - fps: -1, + fps, videoStreamingPlaylistId: playlist.id }).catch(err => { logger.error('Cannot create file for live streaming.', { err }) diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index c918a8685..c4df399ca 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts @@ -6,6 +6,7 @@ import { UserNotificationModelForApi } from '@server/types/models/user' import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' import { logger } from '../helpers/logger' import { authenticateSocket } from '../middlewares' +import { isIdValid } from '@server/helpers/custom-validators/misc' class PeerTubeSocket { @@ -39,8 +40,17 @@ class PeerTubeSocket { this.liveVideosNamespace = io.of('/live-videos') .on('connection', socket => { - socket.on('subscribe', ({ videoId }) => socket.join(videoId)) - socket.on('unsubscribe', ({ videoId }) => socket.leave(videoId)) + socket.on('subscribe', ({ videoId }) => { + if (!isIdValid(videoId)) return + + socket.join(videoId) + }) + + socket.on('unsubscribe', ({ videoId }) => { + if (!isIdValid(videoId)) return + + socket.leave(videoId) + }) }) } diff --git a/server/models/video/video-file.ts b/server/models/video/video-file.ts index 8c8fc0b51..5048cf9b7 100644 --- a/server/models/video/video-file.ts +++ b/server/models/video/video-file.ts @@ -329,6 +329,10 @@ export class VideoFileModel extends Model { return !!MIMETYPES.AUDIO.EXT_MIMETYPE[this.extname] } + isLive () { + return this.size === -1 + } + hasSameUniqueKeysThan (other: MVideoFile) { return this.fps === other.fps && this.resolution === other.resolution && diff --git a/server/models/video/video-format-utils.ts b/server/models/video/video-format-utils.ts index 04e636a15..d4b213686 100644 --- a/server/models/video/video-format-utils.ts +++ b/server/models/video/video-format-utils.ts @@ -199,6 +199,7 @@ function videoFilesModelToFormattedJSON ( const video = extractVideo(model) return [ ...videoFiles ] + .filter(f => !f.isLive()) .sort(sortByResolutionDesc) .map(videoFile => { return { @@ -225,7 +226,9 @@ function addVideoFilesInAPAcc ( baseUrlWs: string, files: MVideoFile[] ) { - const sortedFiles = [ ...files ].sort(sortByResolutionDesc) + const sortedFiles = [ ...files ] + .filter(f => !f.isLive()) + .sort(sortByResolutionDesc) for (const file of sortedFiles) { acc.push({ diff --git a/server/tests/api/live/index.ts b/server/tests/api/live/index.ts index ee77af286..32219969a 100644 --- a/server/tests/api/live/index.ts +++ b/server/tests/api/live/index.ts @@ -1,3 +1,3 @@ -export * from './live-constraints' -export * from './live-save-replay' -export * from './live' +import './live-constraints' +import './live-save-replay' +import './live' diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index f7ccb453d..c795f201a 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts @@ -2,9 +2,12 @@ import 'mocha' import * as chai from 'chai' -import { LiveVideo, LiveVideoCreate, User, VideoDetails, VideoPrivacy } from '@shared/models' +import { getLiveNotificationSocket } from '@shared/extra-utils/socket/socket-io' +import { LiveVideo, LiveVideoCreate, User, Video, VideoDetails, VideoPrivacy, VideoState, VideoStreamingPlaylistType } from '@shared/models' import { addVideoToBlacklist, + checkLiveCleanup, + checkResolutionsInMasterPlaylist, cleanupTests, createLive, createUser, @@ -13,19 +16,23 @@ import { getLive, getMyUserInformation, getVideo, + getVideoIdFromUUID, getVideosList, makeRawRequest, removeVideo, sendRTMPStream, + sendRTMPStreamInVideo, ServerInfo, setAccessTokensToServers, setDefaultVideoChannel, + stopFfmpeg, testFfmpegStreamError, testImage, updateCustomSubConfig, updateLive, userLogin, - waitJobs + waitJobs, + waitUntilLiveStarts } from '../../../../shared/extra-utils' const expect = chai.expect @@ -234,12 +241,12 @@ describe('Test live', function () { async function createLiveWrapper () { const liveAttributes = { name: 'user live', - channelId: userChannelId, + channelId: servers[0].videoChannel.id, privacy: VideoPrivacy.PUBLIC, saveReplay: false } - const res = await createLive(servers[0].url, userAccessToken, liveAttributes) + const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) const uuid = res.body.video.uuid const resLive = await getLive(servers[0].url, servers[0].accessToken, uuid) @@ -295,42 +302,226 @@ describe('Test live', function () { }) describe('Live transcoding', function () { + let liveVideoId: string + + async function createLiveWrapper (saveReplay: boolean) { + const liveAttributes = { + name: 'live video', + channelId: servers[0].videoChannel.id, + privacy: VideoPrivacy.PUBLIC, + saveReplay + } + + const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) + return res.body.video.uuid + } + + async function testVideoResolutions (liveVideoId: string, resolutions: number[]) { + for (const server of servers) { + const resList = await getVideosList(server.url) + const videos: Video[] = resList.body.data + + expect(videos.find(v => v.uuid === liveVideoId)).to.exist + + const resVideo = await getVideo(server.url, liveVideoId) + const video: VideoDetails = resVideo.body + + expect(video.streamingPlaylists).to.have.lengthOf(1) + + const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS) + expect(hlsPlaylist).to.exist + + // Only finite files are displayed + expect(hlsPlaylist.files).to.have.lengthOf(0) + + await checkResolutionsInMasterPlaylist(hlsPlaylist.playlistUrl, resolutions) + } + } + + function updateConf (resolutions: number[]) { + return updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true, + maxDuration: null, + transcoding: { + enabled: true, + resolutions: { + '240p': resolutions.includes(240), + '360p': resolutions.includes(360), + '480p': resolutions.includes(480), + '720p': resolutions.includes(720), + '1080p': resolutions.includes(1080), + '2160p': resolutions.includes(2160) + } + } + } + }) + } + + before(async function () { + await updateConf([]) + }) it('Should enable transcoding without additional resolutions', async function () { - // enable - // stream - // wait federation + test + this.timeout(30000) + liveVideoId = await createLiveWrapper(false) + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId) + await waitJobs(servers) + + await testVideoResolutions(liveVideoId, [ 720 ]) + + await stopFfmpeg(command) }) it('Should enable transcoding with some resolutions', async function () { - // enable - // stream - // wait federation + test + this.timeout(30000) + + const resolutions = [ 240, 480 ] + await updateConf(resolutions) + liveVideoId = await createLiveWrapper(false) + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId) + await waitJobs(servers) + + await testVideoResolutions(liveVideoId, resolutions) + + await stopFfmpeg(command) }) it('Should enable transcoding with some resolutions and correctly save them', async function () { - // enable - // stream - // end stream - // wait federation + test + this.timeout(60000) + + const resolutions = [ 240, 360, 720 ] + await updateConf(resolutions) + liveVideoId = await createLiveWrapper(true) + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId) + await waitJobs(servers) + + await testVideoResolutions(liveVideoId, resolutions) + + await stopFfmpeg(command) + + await waitJobs(servers) + + for (const server of servers) { + const resVideo = await getVideo(server.url, liveVideoId) + const video: VideoDetails = resVideo.body + + expect(video.duration).to.be.greaterThan(1) + expect(video.files).to.have.lengthOf(0) + + const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS) + + expect(hlsPlaylist.files).to.have.lengthOf(resolutions.length) + + for (const resolution of resolutions) { + const file = hlsPlaylist.files.find(f => f.resolution.id === resolution) + + expect(file).to.exist + expect(file.fps).to.equal(25) + expect(file.size).to.be.greaterThan(1) + + await makeRawRequest(file.torrentUrl, 200) + await makeRawRequest(file.fileUrl, 200) + } + } }) it('Should correctly have cleaned up the live files', async function () { - // check files + this.timeout(30000) + + await checkLiveCleanup(servers[0], liveVideoId, [ 240, 360, 720 ]) }) }) describe('Live socket messages', function () { - it('Should correctly send a message when the live starts', async function () { - // local - // federation + async function createLiveWrapper () { + const liveAttributes = { + name: 'live video', + channelId: servers[0].videoChannel.id, + privacy: VideoPrivacy.PUBLIC + } + + const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) + return res.body.video.uuid + } + + it('Should correctly send a message when the live starts and ends', async function () { + this.timeout(60000) + + const localStateChanges: VideoState[] = [] + const remoteStateChanges: VideoState[] = [] + + const liveVideoUUID = await createLiveWrapper() + await waitJobs(servers) + + { + const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) + + const localSocket = getLiveNotificationSocket(servers[0].url) + localSocket.on('state-change', data => localStateChanges.push(data.state)) + localSocket.emit('subscribe', { videoId }) + } + + { + const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID) + + const remoteSocket = getLiveNotificationSocket(servers[1].url) + remoteSocket.on('state-change', data => remoteStateChanges.push(data.state)) + remoteSocket.emit('subscribe', { videoId }) + } + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitJobs(servers) + + for (const stateChanges of [ localStateChanges, remoteStateChanges ]) { + expect(stateChanges).to.have.lengthOf(1) + expect(stateChanges[0]).to.equal(VideoState.PUBLISHED) + } + + await stopFfmpeg(command) + await waitJobs(servers) + + for (const stateChanges of [ localStateChanges, remoteStateChanges ]) { + expect(stateChanges).to.have.lengthOf(2) + expect(stateChanges[1]).to.equal(VideoState.LIVE_ENDED) + } }) - it('Should correctly send a message when the live ends', async function () { - // local - // federation + it('Should not receive a notification after unsubscribe', async function () { + this.timeout(60000) + + const stateChanges: VideoState[] = [] + + const liveVideoUUID = await createLiveWrapper() + await waitJobs(servers) + + const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) + + const socket = getLiveNotificationSocket(servers[0].url) + socket.on('state-change', data => stateChanges.push(data.state)) + socket.emit('subscribe', { videoId }) + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoUUID) + await waitJobs(servers) + + expect(stateChanges).to.have.lengthOf(1) + socket.emit('unsubscribe', { videoId }) + + await stopFfmpeg(command) + await waitJobs(servers) + + expect(stateChanges).to.have.lengthOf(1) }) }) diff --git a/server/tests/api/videos/video-hls.ts b/server/tests/api/videos/video-hls.ts index 6555bc8b6..3a65cc1d2 100644 --- a/server/tests/api/videos/video-hls.ts +++ b/server/tests/api/videos/video-hls.ts @@ -1,9 +1,11 @@ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ -import * as chai from 'chai' import 'mocha' +import * as chai from 'chai' +import { join } from 'path' import { checkDirectoryIsEmpty, + checkResolutionsInMasterPlaylist, checkSegmentHash, checkTmpIsEmpty, cleanupTests, @@ -23,7 +25,6 @@ import { } from '../../../../shared/extra-utils' import { VideoDetails } from '../../../../shared/models/videos' import { VideoStreamingPlaylistType } from '../../../../shared/models/videos/video-streaming-playlist.type' -import { join } from 'path' import { DEFAULT_AUDIO_RESOLUTION } from '../../../initializers/constants' const expect = chai.expect @@ -66,16 +67,12 @@ async function checkHlsPlaylist (servers: ServerInfo[], videoUUID: string, hlsOn } { - const res = await getPlaylist(hlsPlaylist.playlistUrl) + await checkResolutionsInMasterPlaylist(hlsPlaylist.playlistUrl, resolutions) + const res = await getPlaylist(hlsPlaylist.playlistUrl) const masterPlaylist = res.text for (const resolution of resolutions) { - const reg = new RegExp( - '#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',FRAME-RATE=\\d+,CODECS="avc1.64001f,mp4a.40.2"' - ) - - expect(masterPlaylist).to.match(reg) expect(masterPlaylist).to.contain(`${resolution}.m3u8`) expect(masterPlaylist).to.contain(`${resolution}.m3u8`) } diff --git a/shared/extra-utils/socket/socket-io.ts b/shared/extra-utils/socket/socket-io.ts index 854ab71af..66099464f 100644 --- a/shared/extra-utils/socket/socket-io.ts +++ b/shared/extra-utils/socket/socket-io.ts @@ -6,8 +6,13 @@ function getUserNotificationSocket (serverUrl: string, accessToken: string) { }) } +function getLiveNotificationSocket (serverUrl: string) { + return io(serverUrl + '/live-videos') +} + // --------------------------------------------------------------------------- export { - getUserNotificationSocket + getUserNotificationSocket, + getLiveNotificationSocket } diff --git a/shared/extra-utils/videos/video-streaming-playlists.ts b/shared/extra-utils/videos/video-streaming-playlists.ts index e54da84aa..8cf0e4930 100644 --- a/shared/extra-utils/videos/video-streaming-playlists.ts +++ b/shared/extra-utils/videos/video-streaming-playlists.ts @@ -41,11 +41,26 @@ async function checkSegmentHash ( expect(sha256(res2.body)).to.equal(sha256Server) } +async function checkResolutionsInMasterPlaylist (playlistUrl: string, resolutions: number[]) { + const res = await getPlaylist(playlistUrl) + + const masterPlaylist = res.text + + for (const resolution of resolutions) { + const reg = new RegExp( + '#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',(FRAME-RATE=\\d+,)?CODECS="avc1.64001f,mp4a.40.2"' + ) + + expect(masterPlaylist).to.match(reg) + } +} + // --------------------------------------------------------------------------- export { getPlaylist, getSegment, + checkResolutionsInMasterPlaylist, getSegmentSha256, checkSegmentHash } diff --git a/yarn.lock b/yarn.lock index 00ce9e1c3..43eb6e9b0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -796,6 +796,11 @@ dependencies: "@types/node" "*" +"@types/socket.io-client@^1.4.34": + version "1.4.34" + resolved "https://registry.yarnpkg.com/@types/socket.io-client/-/socket.io-client-1.4.34.tgz#8ca5f5732a9ad92b79aba71083cda5e5821e3ed9" + integrity sha512-Lzia5OTQFJZJ5R4HsEEldywiiqT9+W2rDbyHJiiTGqOcju89sCsQ8aUXDljY6Ls33wKZZGC0bfMhr/VpOyjtXg== + "@types/socket.io@^2.1.2": version "2.1.11" resolved "https://registry.yarnpkg.com/@types/socket.io/-/socket.io-2.1.11.tgz#e0d6759880e5f9818d5297a3328b36641bae996b" @@ -6910,6 +6915,23 @@ socket.io-client@2.3.0: socket.io-parser "~3.3.0" to-array "0.1.4" +socket.io-client@^2.3.1: + version "2.3.1" + resolved "https://registry.yarnpkg.com/socket.io-client/-/socket.io-client-2.3.1.tgz#91a4038ef4d03c19967bb3c646fec6e0eaa78cff" + integrity sha512-YXmXn3pA8abPOY//JtYxou95Ihvzmg8U6kQyolArkIyLd0pgVhrfor/iMsox8cn07WCOOvvuJ6XKegzIucPutQ== + dependencies: + backo2 "1.0.2" + component-bind "1.0.0" + component-emitter "~1.3.0" + debug "~3.1.0" + engine.io-client "~3.4.0" + has-binary2 "~1.0.2" + indexof "0.0.1" + parseqs "0.0.6" + parseuri "0.0.6" + socket.io-parser "~3.3.0" + to-array "0.1.4" + socket.io-parser@~3.3.0: version "3.3.1" resolved "https://registry.yarnpkg.com/socket.io-parser/-/socket.io-parser-3.3.1.tgz#f07d9c8cb3fb92633aa93e76d98fd3a334623199"