diff --git a/scripts/ci.sh b/scripts/ci.sh index 7360a03ce..b068deeb4 100755 --- a/scripts/ci.sh +++ b/scripts/ci.sh @@ -63,9 +63,9 @@ elif [ "$1" = "api-1" ]; then elif [ "$1" = "api-2" ]; then npm run build:server + liveFiles=$(findTestFiles ./dist/server/tests/api/live) serverFiles=$(findTestFiles ./dist/server/tests/api/server) usersFiles=$(findTestFiles ./dist/server/tests/api/users) - liveFiles=$(findTestFiles ./dist/server/tests/api/live) MOCHA_PARALLEL=true runTest "$1" 3 $serverFiles $usersFiles $liveFiles elif [ "$1" = "api-3" ]; then diff --git a/scripts/parse-log.ts b/scripts/parse-log.ts index eb3851085..1617e4446 100755 --- a/scripts/parse-log.ts +++ b/scripts/parse-log.ts @@ -85,6 +85,8 @@ function run () { const files = await getFiles() for (const file of files) { + if (file === 'peertube-audit.log') continue + console.log('Opening %s.', file) const stream = createReadStream(file) diff --git a/server.ts b/server.ts index 2caee18e7..9376a0875 100644 --- a/server.ts +++ b/server.ts @@ -124,7 +124,7 @@ import { PluginsCheckScheduler } from './server/lib/schedulers/plugins-check-sch import { PeerTubeVersionCheckScheduler } from './server/lib/schedulers/peertube-version-check-scheduler' import { Hooks } from './server/lib/plugins/hooks' import { PluginManager } from './server/lib/plugins/plugin-manager' -import { LiveManager } from './server/lib/live-manager' +import { LiveManager } from './server/lib/live' import { HttpStatusCode } from './shared/core-utils/miscs/http-error-codes' import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache' import { ServerConfigManager } from '@server/lib/server-config-manager' diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index 5fdb7d5bc..74b100e59 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -1,7 +1,7 @@ import * as express from 'express' import toInt from 'validator/lib/toInt' import { doJSONRequest } from '@server/helpers/requests' -import { LiveManager } from '@server/lib/live-manager' +import { LiveManager } from '@server/lib/live' import { openapiOperationDoc } from '@server/middlewares/doc' import { getServerActor } from '@server/models/application/application' import { MVideoAccountLight } from '@server/types/models' diff --git a/server/controllers/live.ts b/server/controllers/live.ts index cfb4741b7..f2686fb23 100644 --- a/server/controllers/live.ts +++ b/server/controllers/live.ts @@ -1,7 +1,7 @@ import * as cors from 'cors' import * as express from 'express' import { mapToJSON } from '@server/helpers/core-utils' -import { LiveManager } from '@server/lib/live-manager' +import { LiveSegmentShaStore } from '@server/lib/live' import { HttpStatusCode } from '@shared/core-utils/miscs/http-error-codes' const liveRouter = express.Router() @@ -22,7 +22,7 @@ export { function getSegmentsSha256 (req: express.Request, res: express.Response) { const videoUUID = req.params.videoUUID - const result = LiveManager.Instance.getSegmentsSha256(videoUUID) + const result = LiveSegmentShaStore.Instance.getSegmentsSha256(videoUUID) if (!result) { return res.status(HttpStatusCode.NOT_FOUND_404).end() diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index 0a0231a3a..5593ee257 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts @@ -4,7 +4,7 @@ import { Redis } from '../../redis' import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' import { APProcessorOptions } from '../../../types/activitypub-processor.model' import { MActorSignature } from '../../../types/models' -import { LiveManager } from '@server/lib/live-manager' +import { LiveManager } from '@server/lib/live/live-manager' async function processViewActivity (options: APProcessorOptions) { const { activity, byActor } = options diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 517b90abc..9eba41bf8 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -3,7 +3,7 @@ import { pathExists, readdir, remove } from 'fs-extra' import { join } from 'path' import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' import { VIDEO_LIVE } from '@server/initializers/constants' -import { LiveManager } from '@server/lib/live-manager' +import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live' import { generateVideoMiniature } from '@server/lib/thumbnail' import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/video-transcoding' import { publishAndFederateIfNeeded } from '@server/lib/video' @@ -12,7 +12,7 @@ import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' +import { MVideo, MVideoLive } from '@server/types/models' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger } from '../../../helpers/logger' @@ -37,7 +37,7 @@ async function processVideoLiveEnding (job: Bull.Job) { return } - LiveManager.Instance.cleanupShaSegments(video.uuid) + LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) if (live.saveReplay !== true) { return cleanupLive(video, streamingPlaylist) @@ -46,19 +46,10 @@ async function processVideoLiveEnding (job: Bull.Job) { return saveLive(video, live) } -async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { - const hlsDirectory = getHLSDirectory(video) - - await remove(hlsDirectory) - - await streamingPlaylist.destroy() -} - // --------------------------------------------------------------------------- export { - processVideoLiveEnding, - cleanupLive + processVideoLiveEnding } // --------------------------------------------------------------------------- @@ -94,7 +85,7 @@ async function saveLive (video: MVideo, live: MVideoLive) { let durationDone = false for (const playlistFile of playlistFiles) { - const concatenatedTsFile = LiveManager.Instance.buildConcatenatedName(playlistFile) + const concatenatedTsFile = buildConcatenatedName(playlistFile) const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile) const probe = await ffprobePromise(concatenatedTsFilePath) diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts deleted file mode 100644 index 563ba2578..000000000 --- a/server/lib/live-manager.ts +++ /dev/null @@ -1,633 +0,0 @@ - -import * as Bluebird from 'bluebird' -import * as chokidar from 'chokidar' -import { FfmpegCommand } from 'fluent-ffmpeg' -import { appendFile, ensureDir, readFile, stat } from 'fs-extra' -import { createServer, Server } from 'net' -import { basename, join } from 'path' -import { isTestInstance } from '@server/helpers/core-utils' -import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' -import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils' -import { logger, loggerTagsFactory } from '@server/helpers/logger' -import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants' -import { UserModel } from '@server/models/user/user' -import { VideoModel } from '@server/models/video/video' -import { VideoFileModel } from '@server/models/video/video-file' -import { VideoLiveModel } from '@server/models/video/video-live' -import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylist, MStreamingPlaylistVideo, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models' -import { VideoState, VideoStreamingPlaylistType } from '@shared/models' -import { federateVideoIfNeeded } from './activitypub/videos' -import { buildSha256Segment } from './hls' -import { JobQueue } from './job-queue' -import { cleanupLive } from './job-queue/handlers/video-live-ending' -import { PeerTubeSocket } from './peertube-socket' -import { VideoTranscodingProfilesManager } from './transcoding/video-transcoding-profiles' -import { isAbleToUploadVideo } from './user' -import { getHLSDirectory } from './video-paths' - -import memoizee = require('memoizee') -const NodeRtmpSession = require('node-media-server/node_rtmp_session') -const context = require('node-media-server/node_core_ctx') -const nodeMediaServerLogger = require('node-media-server/node_core_logger') - -// Disable node media server logs -nodeMediaServerLogger.setLogType(0) - -const config = { - rtmp: { - port: CONFIG.LIVE.RTMP.PORT, - chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE, - gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE, - ping: VIDEO_LIVE.RTMP.PING, - ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT - }, - transcoding: { - ffmpeg: 'ffmpeg' - } -} - -const lTags = loggerTagsFactory('live') - -class LiveManager { - - private static instance: LiveManager - - private readonly transSessions = new Map() - private readonly videoSessions = new Map() - // Values are Date().getTime() - private readonly watchersPerVideo = new Map() - private readonly segmentsSha256 = new Map>() - private readonly livesPerUser = new Map() - - private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => { - return isAbleToUploadVideo(userId, 1000) - }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) - - private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => { - return this.hasClientSocketsInBadHealth(sessionId) - }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH }) - - private rtmpServer: Server - - private constructor () { - } - - init () { - const events = this.getContext().nodeEvent - events.on('postPublish', (sessionId: string, streamPath: string) => { - logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) }) - - const splittedPath = streamPath.split('/') - if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) { - logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) }) - return this.abortSession(sessionId) - } - - this.handleSession(sessionId, streamPath, splittedPath[2]) - .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) - }) - - events.on('donePublish', sessionId => { - logger.info('Live session ended.', { sessionId, ...lTags(sessionId) }) - }) - - registerConfigChangedHandler(() => { - if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) { - this.run() - return - } - - if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) { - this.stop() - } - }) - - // Cleanup broken lives, that were terminated by a server restart for example - this.handleBrokenLives() - .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) - - setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) - } - - run () { - logger.info('Running RTMP server on port %d', config.rtmp.port, lTags()) - - this.rtmpServer = createServer(socket => { - const session = new NodeRtmpSession(config, socket) - - session.run() - }) - - this.rtmpServer.on('error', err => { - logger.error('Cannot run RTMP server.', { err, ...lTags() }) - }) - - this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) - } - - stop () { - logger.info('Stopping RTMP server.', lTags()) - - this.rtmpServer.close() - this.rtmpServer = undefined - - // Sessions is an object - this.getContext().sessions.forEach((session: any) => { - if (session instanceof NodeRtmpSession) { - session.stop() - } - }) - } - - isRunning () { - return !!this.rtmpServer - } - - getSegmentsSha256 (videoUUID: string) { - return this.segmentsSha256.get(videoUUID) - } - - stopSessionOf (videoId: number) { - const sessionId = this.videoSessions.get(videoId) - if (!sessionId) return - - this.videoSessions.delete(videoId) - this.abortSession(sessionId) - } - - getLiveQuotaUsedByUser (userId: number) { - const currentLives = this.livesPerUser.get(userId) - if (!currentLives) return 0 - - return currentLives.reduce((sum, obj) => sum + obj.size, 0) - } - - addViewTo (videoId: number) { - if (this.videoSessions.has(videoId) === false) return - - let watchers = this.watchersPerVideo.get(videoId) - - if (!watchers) { - watchers = [] - this.watchersPerVideo.set(videoId, watchers) - } - - watchers.push(new Date().getTime()) - } - - cleanupShaSegments (videoUUID: string) { - this.segmentsSha256.delete(videoUUID) - } - - addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { - const segmentName = basename(segmentPath) - const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, this.buildConcatenatedName(segmentName)) - - return readFile(segmentPath) - .then(data => appendFile(dest, data)) - .catch(err => logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...lTags() })) - } - - buildConcatenatedName (segmentOrPlaylistPath: string) { - const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) - - return 'concat-' + num[1] + '.ts' - } - - private processSegments (hlsVideoPath: string, videoUUID: string, videoLive: MVideoLive, segmentPaths: string[]) { - Bluebird.mapSeries(segmentPaths, async previousSegment => { - // Add sha hash of previous segments, because ffmpeg should have finished generating them - await this.addSegmentSha(videoUUID, previousSegment) - - if (videoLive.saveReplay) { - await this.addSegmentToReplay(hlsVideoPath, previousSegment) - } - }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...lTags(videoUUID) })) - } - - private getContext () { - return context - } - - private abortSession (id: string) { - const session = this.getContext().sessions.get(id) - if (session) { - session.stop() - this.getContext().sessions.delete(id) - } - - const transSession = this.transSessions.get(id) - if (transSession) { - transSession.kill('SIGINT') - this.transSessions.delete(id) - } - } - - private async handleSession (sessionId: string, streamPath: string, streamKey: string) { - const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) - if (!videoLive) { - logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) - return this.abortSession(sessionId) - } - - const video = videoLive.Video - if (video.isBlacklisted()) { - logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid)) - return this.abortSession(sessionId) - } - - // Cleanup old potential live files (could happen with a permanent live) - this.cleanupShaSegments(video.uuid) - - const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) - if (oldStreamingPlaylist) { - await cleanupLive(video, oldStreamingPlaylist) - } - - this.videoSessions.set(video.id, sessionId) - - const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) - - const session = this.getContext().sessions.get(sessionId) - const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath - - const [ resolutionResult, fps ] = await Promise.all([ - getVideoFileResolution(rtmpUrl), - getVideoFileFPS(rtmpUrl) - ]) - - const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED - ? computeResolutionsToTranscode(resolutionResult.videoFileResolution, 'live') - : [] - - const allResolutions = resolutionsEnabled.concat([ session.videoHeight ]) - - logger.info( - 'Will mux/transcode live video of original resolution %d.', session.videoHeight, - { allResolutions, ...lTags(sessionId, video.uuid) } - ) - - const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({ - videoId: video.id, - playlistUrl, - segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive), - p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions), - p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION, - - type: VideoStreamingPlaylistType.HLS - }, { returning: true }) as [ MStreamingPlaylist, boolean ] - - return this.runMuxing({ - sessionId, - videoLive, - playlist: Object.assign(videoStreamingPlaylist, { Video: video }), - rtmpUrl, - fps, - allResolutions - }) - } - - private async runMuxing (options: { - sessionId: string - videoLive: MVideoLiveVideo - playlist: MStreamingPlaylistVideo - rtmpUrl: string - fps: number - allResolutions: number[] - }) { - const { sessionId, videoLive, playlist, allResolutions, fps, rtmpUrl } = options - const startStreamDateTime = new Date().getTime() - - const user = await UserModel.loadByLiveId(videoLive.id) - if (!this.livesPerUser.has(user.id)) { - this.livesPerUser.set(user.id, []) - } - - const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 } - const livesOfUser = this.livesPerUser.get(user.id) - livesOfUser.push(currentUserLive) - - for (let i = 0; i < allResolutions.length; i++) { - const resolution = allResolutions[i] - - const file = new VideoFileModel({ - resolution, - size: -1, - extname: '.ts', - infoHash: null, - fps, - videoStreamingPlaylistId: playlist.id - }) - - VideoFileModel.customUpsert(file, 'streaming-playlist', null) - .catch(err => logger.error('Cannot create file for live streaming.', { err, ...lTags(sessionId, videoLive.Video.uuid) })) - } - - const outPath = getHLSDirectory(videoLive.Video) - await ensureDir(outPath) - - const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) - - if (videoLive.saveReplay === true) { - await ensureDir(replayDirectory) - } - - const videoUUID = videoLive.Video.uuid - - const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED - ? await getLiveTranscodingCommand({ - rtmpUrl, - outPath, - resolutions: allResolutions, - fps, - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.LIVE.TRANSCODING.PROFILE - }) - : getLiveMuxingCommand(rtmpUrl, outPath) - - logger.info('Running live muxing/transcoding for %s.', videoUUID, lTags(sessionId, videoUUID)) - this.transSessions.set(sessionId, ffmpegExec) - - const tsWatcher = chokidar.watch(outPath + '/*.ts') - - const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} - const playlistIdMatcher = /^([\d+])-/ - - const addHandler = segmentPath => { - logger.debug('Live add handler of %s.', segmentPath, lTags(sessionId, videoUUID)) - - const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] - - const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || [] - this.processSegments(outPath, videoUUID, videoLive, segmentsToProcess) - - segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] - - if (this.hasClientSocketsInBadHealthWithCache(sessionId)) { - logger.error( - 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + - ' Stopping session of video %s.', videoUUID, - lTags(sessionId, videoUUID) - ) - - this.stopSessionOf(videoLive.videoId) - return - } - - // Duration constraint check - if (this.isDurationConstraintValid(startStreamDateTime) !== true) { - logger.info('Stopping session of %s: max duration exceeded.', videoUUID, lTags(sessionId, videoUUID)) - - this.stopSessionOf(videoLive.videoId) - return - } - - // Check user quota if the user enabled replay saving - if (videoLive.saveReplay === true) { - stat(segmentPath) - .then(segmentStat => { - currentUserLive.size += segmentStat.size - }) - .then(() => this.isQuotaConstraintValid(user, videoLive)) - .then(quotaValid => { - if (quotaValid !== true) { - logger.info('Stopping session of %s: user quota exceeded.', videoUUID, lTags(sessionId, videoUUID)) - - this.stopSessionOf(videoLive.videoId) - } - }) - .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err, ...lTags(sessionId, videoUUID) })) - } - } - - const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath) - - tsWatcher.on('add', p => addHandler(p)) - tsWatcher.on('unlink', p => deleteHandler(p)) - - const masterWatcher = chokidar.watch(outPath + '/master.m3u8') - masterWatcher.on('add', async () => { - try { - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId) - - video.state = VideoState.PUBLISHED - await video.save() - videoLive.Video = video - - setTimeout(() => { - federateVideoIfNeeded(video, false) - .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...lTags(sessionId, videoUUID) })) - - PeerTubeSocket.Instance.sendVideoLiveNewState(video) - }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) - - } catch (err) { - logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err, ...lTags(sessionId, videoUUID) }) - } finally { - masterWatcher.close() - .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...lTags(sessionId, videoUUID) })) - } - }) - - const onFFmpegEnded = () => { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl, lTags(sessionId, videoUUID)) - - this.transSessions.delete(sessionId) - - this.watchersPerVideo.delete(videoLive.videoId) - this.videoSessions.delete(videoLive.videoId) - - const newLivesPerUser = this.livesPerUser.get(user.id) - .filter(o => o.liveId !== videoLive.id) - this.livesPerUser.set(user.id, newLivesPerUser) - - setTimeout(() => { - // Wait latest segments generation, and close watchers - - Promise.all([ tsWatcher.close(), masterWatcher.close() ]) - .then(() => { - // Process remaining segments hash - for (const key of Object.keys(segmentsToProcessPerPlaylist)) { - this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key]) - } - }) - .catch(err => { - logger.error( - 'Cannot close watchers of %s or process remaining hash segments.', outPath, - { err, ...lTags(sessionId, videoUUID) } - ) - }) - - this.onEndTransmuxing(videoLive.Video.id) - .catch(err => logger.error('Error in closed transmuxing.', { err, ...lTags(sessionId, videoUUID) })) - }, 1000) - } - - ffmpegExec.on('error', (err, stdout, stderr) => { - onFFmpegEnded() - - // Don't care that we killed the ffmpeg process - if (err?.message?.includes('Exiting normally')) return - - logger.error('Live transcoding error.', { err, stdout, stderr, ...lTags(sessionId, videoUUID) }) - - this.abortSession(sessionId) - }) - - ffmpegExec.on('end', () => onFFmpegEnded()) - - ffmpegExec.run() - } - - private async onEndTransmuxing (videoUUID: string, cleanupNow = false) { - try { - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) - if (!fullVideo) return - - const live = await VideoLiveModel.loadByVideoId(fullVideo.id) - - if (!live.permanentLive) { - JobQueue.Instance.createJob({ - type: 'video-live-ending', - payload: { - videoId: fullVideo.id - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) - - fullVideo.state = VideoState.LIVE_ENDED - } else { - fullVideo.state = VideoState.WAITING_FOR_LIVE - } - - await fullVideo.save() - - PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) - - await federateVideoIfNeeded(fullVideo, false) - } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) - } - } - - private async addSegmentSha (videoUUID: string, segmentPath: string) { - const segmentName = basename(segmentPath) - logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID)) - - const shaResult = await buildSha256Segment(segmentPath) - - if (!this.segmentsSha256.has(videoUUID)) { - this.segmentsSha256.set(videoUUID, new Map()) - } - - const filesMap = this.segmentsSha256.get(videoUUID) - filesMap.set(segmentName, shaResult) - } - - private removeSegmentSha (videoUUID: string, segmentPath: string) { - const segmentName = basename(segmentPath) - - logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) - - const filesMap = this.segmentsSha256.get(videoUUID) - if (!filesMap) { - logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID)) - return - } - - if (!filesMap.has(segmentName)) { - logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID)) - return - } - - filesMap.delete(segmentName) - } - - private isDurationConstraintValid (streamingStartTime: number) { - const maxDuration = CONFIG.LIVE.MAX_DURATION - // No limit - if (maxDuration < 0) return true - - const now = new Date().getTime() - const max = streamingStartTime + maxDuration - - return now <= max - } - - private hasClientSocketsInBadHealth (sessionId: string) { - const rtmpSession = this.getContext().sessions.get(sessionId) - - if (!rtmpSession) { - logger.warn('Cannot get session %s to check players socket health.', sessionId, lTags(sessionId)) - return - } - - for (const playerSessionId of rtmpSession.players) { - const playerSession = this.getContext().sessions.get(playerSessionId) - - if (!playerSession) { - logger.error('Cannot get player session %s to check socket health.', playerSession, lTags(sessionId)) - continue - } - - if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) { - return true - } - } - - return false - } - - private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) { - if (live.saveReplay !== true) return true - - return this.isAbleToUploadVideoWithCache(user.id) - } - - private async updateLiveViews () { - if (!this.isRunning()) return - - if (!isTestInstance()) logger.info('Updating live video views.', lTags()) - - for (const videoId of this.watchersPerVideo.keys()) { - const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE - - const watchers = this.watchersPerVideo.get(videoId) - - const numWatchers = watchers.length - - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) - video.views = numWatchers - await video.save() - - await federateVideoIfNeeded(video, false) - - PeerTubeSocket.Instance.sendVideoViewsUpdate(video) - - // Only keep not expired watchers - const newWatchers = watchers.filter(w => w > notBefore) - this.watchersPerVideo.set(videoId, newWatchers) - - logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) - } - } - - private async handleBrokenLives () { - const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() - - for (const uuid of videoUUIDs) { - await this.onEndTransmuxing(uuid, true) - } - } - - static get Instance () { - return this.instance || (this.instance = new this()) - } -} - -// --------------------------------------------------------------------------- - -export { - LiveManager -} diff --git a/server/lib/live/index.ts b/server/lib/live/index.ts new file mode 100644 index 000000000..8b46800da --- /dev/null +++ b/server/lib/live/index.ts @@ -0,0 +1,4 @@ +export * from './live-manager' +export * from './live-quota-store' +export * from './live-segment-sha-store' +export * from './live-utils' diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts new file mode 100644 index 000000000..d7199cc89 --- /dev/null +++ b/server/lib/live/live-manager.ts @@ -0,0 +1,412 @@ + +import { createServer, Server } from 'net' +import { isTestInstance } from '@server/helpers/core-utils' +import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' +import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants' +import { UserModel } from '@server/models/user/user' +import { VideoModel } from '@server/models/video/video' +import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' +import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' +import { VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { federateVideoIfNeeded } from '../activitypub/videos' +import { JobQueue } from '../job-queue' +import { PeerTubeSocket } from '../peertube-socket' +import { LiveQuotaStore } from './live-quota-store' +import { LiveSegmentShaStore } from './live-segment-sha-store' +import { cleanupLive } from './live-utils' +import { MuxingSession } from './shared' + +const NodeRtmpSession = require('node-media-server/node_rtmp_session') +const context = require('node-media-server/node_core_ctx') +const nodeMediaServerLogger = require('node-media-server/node_core_logger') + +// Disable node media server logs +nodeMediaServerLogger.setLogType(0) + +const config = { + rtmp: { + port: CONFIG.LIVE.RTMP.PORT, + chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE, + gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE, + ping: VIDEO_LIVE.RTMP.PING, + ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT + }, + transcoding: { + ffmpeg: 'ffmpeg' + } +} + +const lTags = loggerTagsFactory('live') + +class LiveManager { + + private static instance: LiveManager + + private readonly muxingSessions = new Map() + private readonly videoSessions = new Map() + // Values are Date().getTime() + private readonly watchersPerVideo = new Map() + + private rtmpServer: Server + + private constructor () { + } + + init () { + const events = this.getContext().nodeEvent + events.on('postPublish', (sessionId: string, streamPath: string) => { + logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) }) + + const splittedPath = streamPath.split('/') + if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) { + logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) }) + return this.abortSession(sessionId) + } + + this.handleSession(sessionId, streamPath, splittedPath[2]) + .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) + }) + + events.on('donePublish', sessionId => { + logger.info('Live session ended.', { sessionId, ...lTags(sessionId) }) + }) + + registerConfigChangedHandler(() => { + if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) { + this.run() + return + } + + if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) { + this.stop() + } + }) + + // Cleanup broken lives, that were terminated by a server restart for example + this.handleBrokenLives() + .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) + + setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) + } + + run () { + logger.info('Running RTMP server on port %d', config.rtmp.port, lTags()) + + this.rtmpServer = createServer(socket => { + const session = new NodeRtmpSession(config, socket) + + session.run() + }) + + this.rtmpServer.on('error', err => { + logger.error('Cannot run RTMP server.', { err, ...lTags() }) + }) + + this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + } + + stop () { + logger.info('Stopping RTMP server.', lTags()) + + this.rtmpServer.close() + this.rtmpServer = undefined + + // Sessions is an object + this.getContext().sessions.forEach((session: any) => { + if (session instanceof NodeRtmpSession) { + session.stop() + } + }) + } + + isRunning () { + return !!this.rtmpServer + } + + stopSessionOf (videoId: number) { + const sessionId = this.videoSessions.get(videoId) + if (!sessionId) return + + this.videoSessions.delete(videoId) + this.abortSession(sessionId) + } + + addViewTo (videoId: number) { + if (this.videoSessions.has(videoId) === false) return + + let watchers = this.watchersPerVideo.get(videoId) + + if (!watchers) { + watchers = [] + this.watchersPerVideo.set(videoId, watchers) + } + + watchers.push(new Date().getTime()) + } + + private getContext () { + return context + } + + private abortSession (sessionId: string) { + const session = this.getContext().sessions.get(sessionId) + if (session) { + session.stop() + this.getContext().sessions.delete(sessionId) + } + + const muxingSession = this.muxingSessions.get(sessionId) + if (muxingSession) muxingSession.abort() + } + + private async handleSession (sessionId: string, streamPath: string, streamKey: string) { + const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) + if (!videoLive) { + logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) + return this.abortSession(sessionId) + } + + const video = videoLive.Video + if (video.isBlacklisted()) { + logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid)) + return this.abortSession(sessionId) + } + + // Cleanup old potential live files (could happen with a permanent live) + LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) + + const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) + if (oldStreamingPlaylist) { + await cleanupLive(video, oldStreamingPlaylist) + } + + this.videoSessions.set(video.id, sessionId) + + const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath + + const [ { videoFileResolution }, fps ] = await Promise.all([ + getVideoFileResolution(rtmpUrl), + getVideoFileFPS(rtmpUrl) + ]) + + const allResolutions = this.buildAllResolutionsToTranscode(videoFileResolution) + + logger.info( + 'Will mux/transcode live video of original resolution %d.', videoFileResolution, + { allResolutions, ...lTags(sessionId, video.uuid) } + ) + + const streamingPlaylist = await this.createLivePlaylist(video, allResolutions) + + return this.runMuxingSession({ + sessionId, + videoLive, + streamingPlaylist, + rtmpUrl, + fps, + allResolutions + }) + } + + private async runMuxingSession (options: { + sessionId: string + videoLive: MVideoLiveVideo + streamingPlaylist: MStreamingPlaylistVideo + rtmpUrl: string + fps: number + allResolutions: number[] + }) { + const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, rtmpUrl } = options + const videoUUID = videoLive.Video.uuid + const localLTags = lTags(sessionId, videoUUID) + + const user = await UserModel.loadByLiveId(videoLive.id) + LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) + + const muxingSession = new MuxingSession({ + context: this.getContext(), + user, + sessionId, + videoLive, + streamingPlaylist, + rtmpUrl, + fps, + allResolutions + }) + + muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) + + muxingSession.on('bad-socket-health', ({ videoId }) => { + logger.error( + 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + + ' Stopping session of video %s.', videoUUID, + localLTags + ) + + this.stopSessionOf(videoId) + }) + + muxingSession.on('duration-exceeded', ({ videoId }) => { + logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) + + this.stopSessionOf(videoId) + }) + + muxingSession.on('quota-exceeded', ({ videoId }) => { + logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) + + this.stopSessionOf(videoId) + }) + + muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId)) + muxingSession.on('ffmpeg-end', ({ videoId }) => { + this.onMuxingFFmpegEnd(videoId) + }) + + muxingSession.on('after-cleanup', ({ videoId }) => { + this.muxingSessions.delete(sessionId) + + return this.onAfterMuxingCleanup(videoId) + .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) + }) + + this.muxingSessions.set(sessionId, muxingSession) + + muxingSession.runMuxing() + .catch(err => { + logger.error('Cannot run muxing.', { err, ...localLTags }) + this.abortSession(sessionId) + }) + } + + private async publishAndFederateLive (live: MVideoLiveVideo, localLTags: { tags: string[] }) { + const videoId = live.videoId + + try { + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + + logger.info('Will publish and federate live %s.', video.url, localLTags) + + video.state = VideoState.PUBLISHED + await video.save() + + live.Video = video + + setTimeout(() => { + federateVideoIfNeeded(video, false) + .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })) + + PeerTubeSocket.Instance.sendVideoLiveNewState(video) + }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) + } catch (err) { + logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) + } + } + + private onMuxingFFmpegEnd (videoId: number) { + this.watchersPerVideo.delete(videoId) + this.videoSessions.delete(videoId) + } + + private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { + try { + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) + if (!fullVideo) return + + const live = await VideoLiveModel.loadByVideoId(fullVideo.id) + + if (!live.permanentLive) { + JobQueue.Instance.createJob({ + type: 'video-live-ending', + payload: { + videoId: fullVideo.id + } + }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) + + fullVideo.state = VideoState.LIVE_ENDED + } else { + fullVideo.state = VideoState.WAITING_FOR_LIVE + } + + await fullVideo.save() + + PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) + + await federateVideoIfNeeded(fullVideo, false) + } catch (err) { + logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) + } + } + + private async updateLiveViews () { + if (!this.isRunning()) return + + if (!isTestInstance()) logger.info('Updating live video views.', lTags()) + + for (const videoId of this.watchersPerVideo.keys()) { + const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE + + const watchers = this.watchersPerVideo.get(videoId) + + const numWatchers = watchers.length + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + video.views = numWatchers + await video.save() + + await federateVideoIfNeeded(video, false) + + PeerTubeSocket.Instance.sendVideoViewsUpdate(video) + + // Only keep not expired watchers + const newWatchers = watchers.filter(w => w > notBefore) + this.watchersPerVideo.set(videoId, newWatchers) + + logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) + } + } + + private async handleBrokenLives () { + const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() + + for (const uuid of videoUUIDs) { + await this.onAfterMuxingCleanup(uuid, true) + } + } + + private buildAllResolutionsToTranscode (originResolution: number) { + const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED + ? computeResolutionsToTranscode(originResolution, 'live') + : [] + + return resolutionsEnabled.concat([ originResolution ]) + } + + private async createLivePlaylist (video: MVideo, allResolutions: number[]) { + const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) + const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({ + videoId: video.id, + playlistUrl, + segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive), + p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions), + p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION, + + type: VideoStreamingPlaylistType.HLS + }, { returning: true }) as [ MStreamingPlaylist, boolean ] + + return Object.assign(videoStreamingPlaylist, { Video: video }) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +// --------------------------------------------------------------------------- + +export { + LiveManager +} diff --git a/server/lib/live/live-quota-store.ts b/server/lib/live/live-quota-store.ts new file mode 100644 index 000000000..8ceccde98 --- /dev/null +++ b/server/lib/live/live-quota-store.ts @@ -0,0 +1,48 @@ +class LiveQuotaStore { + + private static instance: LiveQuotaStore + + private readonly livesPerUser = new Map() + + private constructor () { + } + + addNewLive (userId: number, liveId: number) { + if (!this.livesPerUser.has(userId)) { + this.livesPerUser.set(userId, []) + } + + const currentUserLive = { liveId, size: 0 } + const livesOfUser = this.livesPerUser.get(userId) + livesOfUser.push(currentUserLive) + } + + removeLive (userId: number, liveId: number) { + const newLivesPerUser = this.livesPerUser.get(userId) + .filter(o => o.liveId !== liveId) + + this.livesPerUser.set(userId, newLivesPerUser) + } + + addQuotaTo (userId: number, liveId: number, size: number) { + const lives = this.livesPerUser.get(userId) + const live = lives.find(l => l.liveId === liveId) + + live.size += size + } + + getLiveQuotaOf (userId: number) { + const currentLives = this.livesPerUser.get(userId) + if (!currentLives) return 0 + + return currentLives.reduce((sum, obj) => sum + obj.size, 0) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +export { + LiveQuotaStore +} diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts new file mode 100644 index 000000000..4af6f3ebf --- /dev/null +++ b/server/lib/live/live-segment-sha-store.ts @@ -0,0 +1,64 @@ +import { basename } from 'path' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { buildSha256Segment } from '../hls' + +const lTags = loggerTagsFactory('live') + +class LiveSegmentShaStore { + + private static instance: LiveSegmentShaStore + + private readonly segmentsSha256 = new Map>() + + private constructor () { + } + + getSegmentsSha256 (videoUUID: string) { + return this.segmentsSha256.get(videoUUID) + } + + async addSegmentSha (videoUUID: string, segmentPath: string) { + const segmentName = basename(segmentPath) + logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID)) + + const shaResult = await buildSha256Segment(segmentPath) + + if (!this.segmentsSha256.has(videoUUID)) { + this.segmentsSha256.set(videoUUID, new Map()) + } + + const filesMap = this.segmentsSha256.get(videoUUID) + filesMap.set(segmentName, shaResult) + } + + removeSegmentSha (videoUUID: string, segmentPath: string) { + const segmentName = basename(segmentPath) + + logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) + + const filesMap = this.segmentsSha256.get(videoUUID) + if (!filesMap) { + logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID)) + return + } + + if (!filesMap.has(segmentName)) { + logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID)) + return + } + + filesMap.delete(segmentName) + } + + cleanupShaSegments (videoUUID: string) { + this.segmentsSha256.delete(videoUUID) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +export { + LiveSegmentShaStore +} diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts new file mode 100644 index 000000000..e4526c7a5 --- /dev/null +++ b/server/lib/live/live-utils.ts @@ -0,0 +1,23 @@ +import { remove } from 'fs-extra' +import { basename } from 'path' +import { MStreamingPlaylist, MVideo } from '@server/types/models' +import { getHLSDirectory } from '../video-paths' + +function buildConcatenatedName (segmentOrPlaylistPath: string) { + const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) + + return 'concat-' + num[1] + '.ts' +} + +async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { + const hlsDirectory = getHLSDirectory(video) + + await remove(hlsDirectory) + + await streamingPlaylist.destroy() +} + +export { + cleanupLive, + buildConcatenatedName +} diff --git a/server/lib/live/shared/index.ts b/server/lib/live/shared/index.ts new file mode 100644 index 000000000..c4d1b59ec --- /dev/null +++ b/server/lib/live/shared/index.ts @@ -0,0 +1 @@ +export * from './muxing-session' diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts new file mode 100644 index 000000000..96f6c2c89 --- /dev/null +++ b/server/lib/live/shared/muxing-session.ts @@ -0,0 +1,341 @@ + +import * as Bluebird from 'bluebird' +import * as chokidar from 'chokidar' +import { FfmpegCommand } from 'fluent-ffmpeg' +import { appendFile, ensureDir, readFile, stat } from 'fs-extra' +import { basename, join } from 'path' +import { EventEmitter } from 'stream' +import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' +import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' +import { VideoFileModel } from '@server/models/video/video-file' +import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' +import { VideoTranscodingProfilesManager } from '../../transcoding/video-transcoding-profiles' +import { isAbleToUploadVideo } from '../../user' +import { getHLSDirectory } from '../../video-paths' +import { LiveQuotaStore } from '../live-quota-store' +import { LiveSegmentShaStore } from '../live-segment-sha-store' +import { buildConcatenatedName } from '../live-utils' + +import memoizee = require('memoizee') + +interface MuxingSessionEvents { + 'master-playlist-created': ({ videoId: number }) => void + + 'bad-socket-health': ({ videoId: number }) => void + 'duration-exceeded': ({ videoId: number }) => void + 'quota-exceeded': ({ videoId: number }) => void + + 'ffmpeg-end': ({ videoId: number }) => void + 'ffmpeg-error': ({ sessionId: string }) => void + + 'after-cleanup': ({ videoId: number }) => void +} + +declare interface MuxingSession { + on( + event: U, listener: MuxingSessionEvents[U] + ): this + + emit( + event: U, ...args: Parameters + ): boolean +} + +class MuxingSession extends EventEmitter { + + private ffmpegCommand: FfmpegCommand + + private readonly context: any + private readonly user: MUserId + private readonly sessionId: string + private readonly videoLive: MVideoLiveVideo + private readonly streamingPlaylist: MStreamingPlaylistVideo + private readonly rtmpUrl: string + private readonly fps: number + private readonly allResolutions: number[] + + private readonly videoId: number + private readonly videoUUID: string + private readonly saveReplay: boolean + + private readonly lTags: LoggerTagsFn + + private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} + + private tsWatcher: chokidar.FSWatcher + private masterWatcher: chokidar.FSWatcher + + private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => { + return isAbleToUploadVideo(userId, 1000) + }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) + + private readonly hasClientSocketInBadHealthWithCache = memoizee((sessionId: string) => { + return this.hasClientSocketInBadHealth(sessionId) + }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH }) + + constructor (options: { + context: any + user: MUserId + sessionId: string + videoLive: MVideoLiveVideo + streamingPlaylist: MStreamingPlaylistVideo + rtmpUrl: string + fps: number + allResolutions: number[] + }) { + super() + + this.context = options.context + this.user = options.user + this.sessionId = options.sessionId + this.videoLive = options.videoLive + this.streamingPlaylist = options.streamingPlaylist + this.rtmpUrl = options.rtmpUrl + this.fps = options.fps + this.allResolutions = options.allResolutions + + this.videoId = this.videoLive.Video.id + this.videoUUID = this.videoLive.Video.uuid + + this.saveReplay = this.videoLive.saveReplay + + this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) + } + + async runMuxing () { + this.createFiles() + + const outPath = await this.prepareDirectories() + + this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED + ? await getLiveTranscodingCommand({ + rtmpUrl: this.rtmpUrl, + outPath, + resolutions: this.allResolutions, + fps: this.fps, + availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), + profile: CONFIG.LIVE.TRANSCODING.PROFILE + }) + : getLiveMuxingCommand(this.rtmpUrl, outPath) + + logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags) + + this.watchTSFiles(outPath) + this.watchMasterFile(outPath) + + this.ffmpegCommand.on('error', (err, stdout, stderr) => { + this.onFFmpegError(err, stdout, stderr, outPath) + }) + + this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath)) + + this.ffmpegCommand.run() + } + + abort () { + if (!this.ffmpegCommand) return false + + this.ffmpegCommand.kill('SIGINT') + return true + } + + private onFFmpegError (err: any, stdout: string, stderr: string, outPath: string) { + this.onFFmpegEnded(outPath) + + // Don't care that we killed the ffmpeg process + if (err?.message?.includes('Exiting normally')) return + + logger.error('Live transcoding error.', { err, stdout, stderr, ...this.lTags }) + + this.emit('ffmpeg-error', ({ sessionId: this.sessionId })) + } + + private onFFmpegEnded (outPath: string) { + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.rtmpUrl, this.lTags) + + setTimeout(() => { + // Wait latest segments generation, and close watchers + + Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) + .then(() => { + // Process remaining segments hash + for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { + this.processSegments(outPath, this.segmentsToProcessPerPlaylist[key]) + } + }) + .catch(err => { + logger.error( + 'Cannot close watchers of %s or process remaining hash segments.', outPath, + { err, ...this.lTags } + ) + }) + + this.emit('after-cleanup', { videoId: this.videoId }) + }, 1000) + } + + private watchMasterFile (outPath: string) { + this.masterWatcher = chokidar.watch(outPath + '/master.m3u8') + + this.masterWatcher.on('add', async () => { + this.emit('master-playlist-created', { videoId: this.videoId }) + + this.masterWatcher.close() + .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags })) + }) + } + + private watchTSFiles (outPath: string) { + const startStreamDateTime = new Date().getTime() + + this.tsWatcher = chokidar.watch(outPath + '/*.ts') + + const playlistIdMatcher = /^([\d+])-/ + + const addHandler = async segmentPath => { + logger.debug('Live add handler of %s.', segmentPath, this.lTags) + + const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] + + const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] + this.processSegments(outPath, segmentsToProcess) + + this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] + + if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { + this.emit('bad-socket-health', { videoId: this.videoId }) + return + } + + // Duration constraint check + if (this.isDurationConstraintValid(startStreamDateTime) !== true) { + this.emit('duration-exceeded', { videoId: this.videoId }) + return + } + + // Check user quota if the user enabled replay saving + if (await this.isQuotaExceeded(segmentPath) === true) { + this.emit('quota-exceeded', { videoId: this.videoId }) + } + } + + const deleteHandler = segmentPath => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) + + this.tsWatcher.on('add', p => addHandler(p)) + this.tsWatcher.on('unlink', p => deleteHandler(p)) + } + + private async isQuotaExceeded (segmentPath: string) { + if (this.saveReplay !== true) return false + + try { + const segmentStat = await stat(segmentPath) + + LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.videoLive.id, segmentStat.size) + + const canUpload = await this.isAbleToUploadVideoWithCache(this.user.id) + + return canUpload !== true + } catch (err) { + logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags }) + } + } + + private createFiles () { + for (let i = 0; i < this.allResolutions.length; i++) { + const resolution = this.allResolutions[i] + + const file = new VideoFileModel({ + resolution, + size: -1, + extname: '.ts', + infoHash: null, + fps: this.fps, + videoStreamingPlaylistId: this.streamingPlaylist.id + }) + + VideoFileModel.customUpsert(file, 'streaming-playlist', null) + .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags })) + } + } + + private async prepareDirectories () { + const outPath = getHLSDirectory(this.videoLive.Video) + await ensureDir(outPath) + + const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) + + if (this.videoLive.saveReplay === true) { + await ensureDir(replayDirectory) + } + + return outPath + } + + private isDurationConstraintValid (streamingStartTime: number) { + const maxDuration = CONFIG.LIVE.MAX_DURATION + // No limit + if (maxDuration < 0) return true + + const now = new Date().getTime() + const max = streamingStartTime + maxDuration + + return now <= max + } + + private processSegments (hlsVideoPath: string, segmentPaths: string[]) { + Bluebird.mapSeries(segmentPaths, async previousSegment => { + // Add sha hash of previous segments, because ffmpeg should have finished generating them + await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) + + if (this.saveReplay) { + await this.addSegmentToReplay(hlsVideoPath, previousSegment) + } + }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags })) + } + + private hasClientSocketInBadHealth (sessionId: string) { + const rtmpSession = this.context.sessions.get(sessionId) + + if (!rtmpSession) { + logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags) + return + } + + for (const playerSessionId of rtmpSession.players) { + const playerSession = this.context.sessions.get(playerSessionId) + + if (!playerSession) { + logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags) + continue + } + + if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) { + return true + } + } + + return false + } + + private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { + const segmentName = basename(segmentPath) + const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName)) + + try { + const data = await readFile(segmentPath) + + await appendFile(dest, data) + } catch (err) { + logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags }) + } + } +} + +// --------------------------------------------------------------------------- + +export { + MuxingSession +} diff --git a/server/lib/user.ts b/server/lib/user.ts index 91a682a7e..8820e8243 100644 --- a/server/lib/user.ts +++ b/server/lib/user.ts @@ -14,7 +14,7 @@ import { MUser, MUserDefault, MUserId } from '../types/models/user' import { generateAndSaveActorKeys } from './activitypub/actors' import { getLocalAccountActivityPubUrl } from './activitypub/url' import { Emailer } from './emailer' -import { LiveManager } from './live-manager' +import { LiveQuotaStore } from './live/live-quota-store' import { buildActorInstance } from './local-actor' import { Redis } from './redis' import { createLocalVideoChannel } from './video-channel' @@ -129,7 +129,7 @@ async function getOriginalVideoFileTotalFromUser (user: MUserId) { const base = await UserModel.getTotalRawQuery(query, user.id) - return base + LiveManager.Instance.getLiveQuotaUsedByUser(user.id) + return base + LiveQuotaStore.Instance.getLiveQuotaOf(user.id) } // Returns cumulative size of all video files uploaded in the last 24 hours. @@ -143,10 +143,10 @@ async function getOriginalVideoFileTotalDailyFromUser (user: MUserId) { const base = await UserModel.getTotalRawQuery(query, user.id) - return base + LiveManager.Instance.getLiveQuotaUsedByUser(user.id) + return base + LiveQuotaStore.Instance.getLiveQuotaOf(user.id) } -async function isAbleToUploadVideo (userId: number, size: number) { +async function isAbleToUploadVideo (userId: number, newVideoSize: number) { const user = await UserModel.loadById(userId) if (user.videoQuota === -1 && user.videoQuotaDaily === -1) return Promise.resolve(true) @@ -156,8 +156,8 @@ async function isAbleToUploadVideo (userId: number, size: number) { getOriginalVideoFileTotalDailyFromUser(user) ]) - const uploadedTotal = size + totalBytes - const uploadedDaily = size + totalBytesDaily + const uploadedTotal = newVideoSize + totalBytes + const uploadedDaily = newVideoSize + totalBytesDaily if (user.videoQuotaDaily === -1) return uploadedTotal < user.videoQuota if (user.videoQuota === -1) return uploadedDaily < user.videoQuotaDaily diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts index 37c43c3b0..0984c0d7a 100644 --- a/server/lib/video-blacklist.ts +++ b/server/lib/video-blacklist.ts @@ -16,7 +16,7 @@ import { CONFIG } from '../initializers/config' import { VideoBlacklistModel } from '../models/video/video-blacklist' import { sendDeleteVideo } from './activitypub/send' import { federateVideoIfNeeded } from './activitypub/videos' -import { LiveManager } from './live-manager' +import { LiveManager } from './live/live-manager' import { Notifier } from './notifier' import { Hooks } from './plugins/hooks' diff --git a/server/models/video/video.ts b/server/models/video/video.ts index ca839b0d4..d2efc2553 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -27,7 +27,7 @@ import { import { setAsUpdated } from '@server/helpers/database-utils' import { buildNSFWFilter } from '@server/helpers/express-utils' import { getPrivaciesForFederation, isPrivacyForFederation, isStateForFederation } from '@server/helpers/video' -import { LiveManager } from '@server/lib/live-manager' +import { LiveManager } from '@server/lib/live/live-manager' import { getHLSDirectory, getVideoFilePath } from '@server/lib/video-paths' import { getServerActor } from '@server/models/application/application' import { ModelCache } from '@server/models/model-cache' diff --git a/server/tests/api/live/index.ts b/server/tests/api/live/index.ts index c733f564e..e6bcef49f 100644 --- a/server/tests/api/live/index.ts +++ b/server/tests/api/live/index.ts @@ -1,4 +1,6 @@ import './live-constraints' +import './live-socket-messages' import './live-permanent' import './live-save-replay' +import './live-views' import './live' diff --git a/server/tests/api/live/live-socket-messages.ts b/server/tests/api/live/live-socket-messages.ts new file mode 100644 index 000000000..e00909ade --- /dev/null +++ b/server/tests/api/live/live-socket-messages.ts @@ -0,0 +1,196 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' +import * as chai from 'chai' +import { getLiveNotificationSocket } from '@shared/extra-utils/socket/socket-io' +import { VideoPrivacy, VideoState } from '@shared/models' +import { + cleanupTests, + createLive, + doubleFollow, + flushAndRunMultipleServers, + getVideoIdFromUUID, + sendRTMPStreamInVideo, + ServerInfo, + setAccessTokensToServers, + setDefaultVideoChannel, + stopFfmpeg, + updateCustomSubConfig, + viewVideo, + wait, + waitJobs, + waitUntilLiveEnded, + waitUntilLivePublishedOnAllServers +} from '../../../../shared/extra-utils' + +const expect = chai.expect + +describe('Test live', function () { + let servers: ServerInfo[] = [] + + before(async function () { + this.timeout(120000) + + servers = await flushAndRunMultipleServers(2) + + // Get the access tokens + await setAccessTokensToServers(servers) + await setDefaultVideoChannel(servers) + + await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true, + transcoding: { + enabled: false + } + } + }) + + // Server 1 and server 2 follow each other + await doubleFollow(servers[0], servers[1]) + }) + + describe('Live socket messages', function () { + + async function createLiveWrapper () { + const liveAttributes = { + name: 'live video', + channelId: servers[0].videoChannel.id, + privacy: VideoPrivacy.PUBLIC + } + + const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) + return res.body.video.uuid + } + + it('Should correctly send a message when the live starts and ends', async function () { + this.timeout(60000) + + const localStateChanges: VideoState[] = [] + const remoteStateChanges: VideoState[] = [] + + const liveVideoUUID = await createLiveWrapper() + await waitJobs(servers) + + { + const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) + + const localSocket = getLiveNotificationSocket(servers[0].url) + localSocket.on('state-change', data => localStateChanges.push(data.state)) + localSocket.emit('subscribe', { videoId }) + } + + { + const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID) + + const remoteSocket = getLiveNotificationSocket(servers[1].url) + remoteSocket.on('state-change', data => remoteStateChanges.push(data.state)) + remoteSocket.emit('subscribe', { videoId }) + } + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + await waitJobs(servers) + + for (const stateChanges of [ localStateChanges, remoteStateChanges ]) { + expect(stateChanges).to.have.length.at.least(1) + expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.PUBLISHED) + } + + await stopFfmpeg(command) + + for (const server of servers) { + await waitUntilLiveEnded(server.url, server.accessToken, liveVideoUUID) + } + await waitJobs(servers) + + for (const stateChanges of [ localStateChanges, remoteStateChanges ]) { + expect(stateChanges).to.have.length.at.least(2) + expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.LIVE_ENDED) + } + }) + + it('Should correctly send views change notification', async function () { + this.timeout(60000) + + let localLastVideoViews = 0 + let remoteLastVideoViews = 0 + + const liveVideoUUID = await createLiveWrapper() + await waitJobs(servers) + + { + const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) + + const localSocket = getLiveNotificationSocket(servers[0].url) + localSocket.on('views-change', data => { localLastVideoViews = data.views }) + localSocket.emit('subscribe', { videoId }) + } + + { + const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID) + + const remoteSocket = getLiveNotificationSocket(servers[1].url) + remoteSocket.on('views-change', data => { remoteLastVideoViews = data.views }) + remoteSocket.emit('subscribe', { videoId }) + } + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + await waitJobs(servers) + + expect(localLastVideoViews).to.equal(0) + expect(remoteLastVideoViews).to.equal(0) + + await viewVideo(servers[0].url, liveVideoUUID) + await viewVideo(servers[1].url, liveVideoUUID) + + await waitJobs(servers) + await wait(5000) + await waitJobs(servers) + + expect(localLastVideoViews).to.equal(2) + expect(remoteLastVideoViews).to.equal(2) + + await stopFfmpeg(command) + }) + + it('Should not receive a notification after unsubscribe', async function () { + this.timeout(120000) + + const stateChanges: VideoState[] = [] + + const liveVideoUUID = await createLiveWrapper() + await waitJobs(servers) + + const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) + + const socket = getLiveNotificationSocket(servers[0].url) + socket.on('state-change', data => stateChanges.push(data.state)) + socket.emit('subscribe', { videoId }) + + const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) + + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + await waitJobs(servers) + + // Notifier waits before sending a notification + await wait(10000) + + expect(stateChanges).to.have.lengthOf(1) + socket.emit('unsubscribe', { videoId }) + + await stopFfmpeg(command) + await waitJobs(servers) + + expect(stateChanges).to.have.lengthOf(1) + }) + }) + + after(async function () { + await cleanupTests(servers) + }) +}) diff --git a/server/tests/api/live/live-views.ts b/server/tests/api/live/live-views.ts new file mode 100644 index 000000000..a44d21ffa --- /dev/null +++ b/server/tests/api/live/live-views.ts @@ -0,0 +1,130 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' +import * as chai from 'chai' +import { FfmpegCommand } from 'fluent-ffmpeg' +import { VideoDetails, VideoPrivacy } from '@shared/models' +import { + cleanupTests, + createLive, + doubleFollow, + flushAndRunMultipleServers, + getVideo, + sendRTMPStreamInVideo, + ServerInfo, + setAccessTokensToServers, + setDefaultVideoChannel, + stopFfmpeg, + updateCustomSubConfig, + viewVideo, + wait, + waitJobs, + waitUntilLivePublishedOnAllServers +} from '../../../../shared/extra-utils' + +const expect = chai.expect + +describe('Test live', function () { + let servers: ServerInfo[] = [] + + before(async function () { + this.timeout(120000) + + servers = await flushAndRunMultipleServers(2) + + // Get the access tokens + await setAccessTokensToServers(servers) + await setDefaultVideoChannel(servers) + + await updateCustomSubConfig(servers[0].url, servers[0].accessToken, { + live: { + enabled: true, + allowReplay: true, + transcoding: { + enabled: false + } + } + }) + + // Server 1 and server 2 follow each other + await doubleFollow(servers[0], servers[1]) + }) + + describe('Live views', function () { + let liveVideoId: string + let command: FfmpegCommand + + async function countViews (expected: number) { + for (const server of servers) { + const res = await getVideo(server.url, liveVideoId) + const video: VideoDetails = res.body + + expect(video.views).to.equal(expected) + } + } + + before(async function () { + this.timeout(30000) + + const liveAttributes = { + name: 'live video', + channelId: servers[0].videoChannel.id, + privacy: VideoPrivacy.PUBLIC + } + + const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) + liveVideoId = res.body.video.uuid + + command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) + await waitUntilLivePublishedOnAllServers(servers, liveVideoId) + await waitJobs(servers) + }) + + it('Should display no views for a live', async function () { + await countViews(0) + }) + + it('Should view a live twice and display 1 view', async function () { + this.timeout(30000) + + await viewVideo(servers[0].url, liveVideoId) + await viewVideo(servers[0].url, liveVideoId) + + await wait(7000) + + await waitJobs(servers) + + await countViews(1) + }) + + it('Should wait and display 0 views', async function () { + this.timeout(30000) + + await wait(12000) + await waitJobs(servers) + + await countViews(0) + }) + + it('Should view a live on a remote and on local and display 2 views', async function () { + this.timeout(30000) + + await viewVideo(servers[0].url, liveVideoId) + await viewVideo(servers[1].url, liveVideoId) + await viewVideo(servers[1].url, liveVideoId) + + await wait(7000) + await waitJobs(servers) + + await countViews(2) + }) + + after(async function () { + await stopFfmpeg(command) + }) + }) + + after(async function () { + await cleanupTests(servers) + }) +}) diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index 57fb58150..50397924e 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts @@ -2,10 +2,8 @@ import 'mocha' import * as chai from 'chai' -import { FfmpegCommand } from 'fluent-ffmpeg' import { join } from 'path' import { ffprobePromise, getVideoStreamFromFile } from '@server/helpers/ffprobe-utils' -import { getLiveNotificationSocket } from '@shared/extra-utils/socket/socket-io' import { LiveVideo, LiveVideoCreate, Video, VideoDetails, VideoPrivacy, VideoState, VideoStreamingPlaylistType } from '@shared/models' import { HttpStatusCode } from '../../../../shared/core-utils/miscs/http-error-codes' import { @@ -22,7 +20,6 @@ import { getMyVideosWithFilter, getPlaylist, getVideo, - getVideoIdFromUUID, getVideosList, getVideosWithFilters, killallServers, @@ -40,11 +37,11 @@ import { updateCustomSubConfig, updateLive, uploadVideoAndGetId, - viewVideo, wait, waitJobs, waitUntilLiveEnded, waitUntilLivePublished, + waitUntilLivePublishedOnAllServers, waitUntilLiveSegmentGeneration } from '../../../../shared/extra-utils' @@ -53,12 +50,6 @@ const expect = chai.expect describe('Test live', function () { let servers: ServerInfo[] = [] - async function waitUntilLivePublishedOnAllServers (videoId: string) { - for (const server of servers) { - await waitUntilLivePublished(server.url, server.accessToken, videoId) - } - } - before(async function () { this.timeout(120000) @@ -247,7 +238,7 @@ describe('Test live', function () { liveVideoId = resLive.body.video.uuid command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) - await waitUntilLivePublishedOnAllServers(liveVideoId) + await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) }) @@ -461,7 +452,7 @@ describe('Test live', function () { liveVideoId = await createLiveWrapper(false) const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) - await waitUntilLivePublishedOnAllServers(liveVideoId) + await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) await testVideoResolutions(liveVideoId, [ 720 ]) @@ -477,7 +468,7 @@ describe('Test live', function () { liveVideoId = await createLiveWrapper(false) const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) - await waitUntilLivePublishedOnAllServers(liveVideoId) + await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) await testVideoResolutions(liveVideoId, resolutions) @@ -494,7 +485,7 @@ describe('Test live', function () { liveVideoId = await createLiveWrapper(true) const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId, 'video_short2.webm') - await waitUntilLivePublishedOnAllServers(liveVideoId) + await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) await testVideoResolutions(liveVideoId, resolutions) @@ -504,7 +495,7 @@ describe('Test live', function () { await waitJobs(servers) - await waitUntilLivePublishedOnAllServers(liveVideoId) + await waitUntilLivePublishedOnAllServers(servers, liveVideoId) const bitrateLimits = { 720: 5000 * 1000, // 60FPS @@ -559,216 +550,6 @@ describe('Test live', function () { }) }) - describe('Live views', function () { - let liveVideoId: string - let command: FfmpegCommand - - async function countViews (expected: number) { - for (const server of servers) { - const res = await getVideo(server.url, liveVideoId) - const video: VideoDetails = res.body - - expect(video.views).to.equal(expected) - } - } - - before(async function () { - this.timeout(30000) - - const liveAttributes = { - name: 'live video', - channelId: servers[0].videoChannel.id, - privacy: VideoPrivacy.PUBLIC - } - - const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) - liveVideoId = res.body.video.uuid - - command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) - await waitUntilLivePublishedOnAllServers(liveVideoId) - await waitJobs(servers) - }) - - it('Should display no views for a live', async function () { - await countViews(0) - }) - - it('Should view a live twice and display 1 view', async function () { - this.timeout(30000) - - await viewVideo(servers[0].url, liveVideoId) - await viewVideo(servers[0].url, liveVideoId) - - await wait(7000) - - await waitJobs(servers) - - await countViews(1) - }) - - it('Should wait and display 0 views', async function () { - this.timeout(30000) - - await wait(7000) - await waitJobs(servers) - - await countViews(0) - }) - - it('Should view a live on a remote and on local and display 2 views', async function () { - this.timeout(30000) - - await viewVideo(servers[0].url, liveVideoId) - await viewVideo(servers[1].url, liveVideoId) - await viewVideo(servers[1].url, liveVideoId) - - await wait(7000) - await waitJobs(servers) - - await countViews(2) - }) - - after(async function () { - await stopFfmpeg(command) - }) - }) - - describe('Live socket messages', function () { - - async function createLiveWrapper () { - const liveAttributes = { - name: 'live video', - channelId: servers[0].videoChannel.id, - privacy: VideoPrivacy.PUBLIC - } - - const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) - return res.body.video.uuid - } - - it('Should correctly send a message when the live starts and ends', async function () { - this.timeout(60000) - - const localStateChanges: VideoState[] = [] - const remoteStateChanges: VideoState[] = [] - - const liveVideoUUID = await createLiveWrapper() - await waitJobs(servers) - - { - const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) - - const localSocket = getLiveNotificationSocket(servers[0].url) - localSocket.on('state-change', data => localStateChanges.push(data.state)) - localSocket.emit('subscribe', { videoId }) - } - - { - const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID) - - const remoteSocket = getLiveNotificationSocket(servers[1].url) - remoteSocket.on('state-change', data => remoteStateChanges.push(data.state)) - remoteSocket.emit('subscribe', { videoId }) - } - - const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) - - await waitUntilLivePublishedOnAllServers(liveVideoUUID) - await waitJobs(servers) - - for (const stateChanges of [ localStateChanges, remoteStateChanges ]) { - expect(stateChanges).to.have.length.at.least(1) - expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.PUBLISHED) - } - - await stopFfmpeg(command) - - for (const server of servers) { - await waitUntilLiveEnded(server.url, server.accessToken, liveVideoUUID) - } - await waitJobs(servers) - - for (const stateChanges of [ localStateChanges, remoteStateChanges ]) { - expect(stateChanges).to.have.length.at.least(2) - expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.LIVE_ENDED) - } - }) - - it('Should correctly send views change notification', async function () { - this.timeout(60000) - - let localLastVideoViews = 0 - let remoteLastVideoViews = 0 - - const liveVideoUUID = await createLiveWrapper() - await waitJobs(servers) - - { - const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) - - const localSocket = getLiveNotificationSocket(servers[0].url) - localSocket.on('views-change', data => { localLastVideoViews = data.views }) - localSocket.emit('subscribe', { videoId }) - } - - { - const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID) - - const remoteSocket = getLiveNotificationSocket(servers[1].url) - remoteSocket.on('views-change', data => { remoteLastVideoViews = data.views }) - remoteSocket.emit('subscribe', { videoId }) - } - - const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) - - await waitUntilLivePublishedOnAllServers(liveVideoUUID) - await waitJobs(servers) - - expect(localLastVideoViews).to.equal(0) - expect(remoteLastVideoViews).to.equal(0) - - await viewVideo(servers[0].url, liveVideoUUID) - await viewVideo(servers[1].url, liveVideoUUID) - - await waitJobs(servers) - await wait(5000) - await waitJobs(servers) - - expect(localLastVideoViews).to.equal(2) - expect(remoteLastVideoViews).to.equal(2) - - await stopFfmpeg(command) - }) - - it('Should not receive a notification after unsubscribe', async function () { - this.timeout(60000) - - const stateChanges: VideoState[] = [] - - const liveVideoUUID = await createLiveWrapper() - await waitJobs(servers) - - const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID) - - const socket = getLiveNotificationSocket(servers[0].url) - socket.on('state-change', data => stateChanges.push(data.state)) - socket.emit('subscribe', { videoId }) - - const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID) - - await waitUntilLivePublishedOnAllServers(liveVideoUUID) - await waitJobs(servers) - - expect(stateChanges).to.have.lengthOf(1) - socket.emit('unsubscribe', { videoId }) - - await stopFfmpeg(command) - await waitJobs(servers) - - expect(stateChanges).to.have.lengthOf(1) - }) - }) - describe('After a server restart', function () { let liveVideoId: string let liveVideoReplayId: string diff --git a/shared/extra-utils/videos/live.ts b/shared/extra-utils/videos/live.ts index d3cd974de..c0384769b 100644 --- a/shared/extra-utils/videos/live.ts +++ b/shared/extra-utils/videos/live.ts @@ -175,6 +175,12 @@ async function waitUntilLiveSaved (url: string, token: string, videoId: number | } while (video.isLive === true && video.state.id !== VideoState.PUBLISHED) } +async function waitUntilLivePublishedOnAllServers (servers: ServerInfo[], videoId: string) { + for (const server of servers) { + await waitUntilLivePublished(server.url, server.accessToken, videoId) + } +} + async function checkLiveCleanup (server: ServerInfo, videoUUID: string, resolutions: number[] = []) { const basePath = buildServerDirectory(server, 'streaming-playlists') const hlsPath = join(basePath, 'hls', videoUUID) @@ -226,6 +232,7 @@ export { sendRTMPStreamInVideo, waitUntilLiveEnded, waitFfmpegUntilError, + waitUntilLivePublishedOnAllServers, sendRTMPStream, testFfmpegStreamError }