Refactor live manager

pull/4197/head
Chocobozzz 2021-06-16 15:14:41 +02:00
parent fd6584844b
commit 8ebf2a5d5d
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
23 changed files with 1255 additions and 886 deletions

View File

@ -63,9 +63,9 @@ elif [ "$1" = "api-1" ]; then
elif [ "$1" = "api-2" ]; then
npm run build:server
liveFiles=$(findTestFiles ./dist/server/tests/api/live)
serverFiles=$(findTestFiles ./dist/server/tests/api/server)
usersFiles=$(findTestFiles ./dist/server/tests/api/users)
liveFiles=$(findTestFiles ./dist/server/tests/api/live)
MOCHA_PARALLEL=true runTest "$1" 3 $serverFiles $usersFiles $liveFiles
elif [ "$1" = "api-3" ]; then

View File

@ -85,6 +85,8 @@ function run () {
const files = await getFiles()
for (const file of files) {
if (file === 'peertube-audit.log') continue
console.log('Opening %s.', file)
const stream = createReadStream(file)

View File

@ -124,7 +124,7 @@ import { PluginsCheckScheduler } from './server/lib/schedulers/plugins-check-sch
import { PeerTubeVersionCheckScheduler } from './server/lib/schedulers/peertube-version-check-scheduler'
import { Hooks } from './server/lib/plugins/hooks'
import { PluginManager } from './server/lib/plugins/plugin-manager'
import { LiveManager } from './server/lib/live-manager'
import { LiveManager } from './server/lib/live'
import { HttpStatusCode } from './shared/core-utils/miscs/http-error-codes'
import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache'
import { ServerConfigManager } from '@server/lib/server-config-manager'

View File

@ -1,7 +1,7 @@
import * as express from 'express'
import toInt from 'validator/lib/toInt'
import { doJSONRequest } from '@server/helpers/requests'
import { LiveManager } from '@server/lib/live-manager'
import { LiveManager } from '@server/lib/live'
import { openapiOperationDoc } from '@server/middlewares/doc'
import { getServerActor } from '@server/models/application/application'
import { MVideoAccountLight } from '@server/types/models'

View File

@ -1,7 +1,7 @@
import * as cors from 'cors'
import * as express from 'express'
import { mapToJSON } from '@server/helpers/core-utils'
import { LiveManager } from '@server/lib/live-manager'
import { LiveSegmentShaStore } from '@server/lib/live'
import { HttpStatusCode } from '@shared/core-utils/miscs/http-error-codes'
const liveRouter = express.Router()
@ -22,7 +22,7 @@ export {
function getSegmentsSha256 (req: express.Request, res: express.Response) {
const videoUUID = req.params.videoUUID
const result = LiveManager.Instance.getSegmentsSha256(videoUUID)
const result = LiveSegmentShaStore.Instance.getSegmentsSha256(videoUUID)
if (!result) {
return res.status(HttpStatusCode.NOT_FOUND_404).end()

View File

@ -4,7 +4,7 @@ import { Redis } from '../../redis'
import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub'
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
import { MActorSignature } from '../../../types/models'
import { LiveManager } from '@server/lib/live-manager'
import { LiveManager } from '@server/lib/live/live-manager'
async function processViewActivity (options: APProcessorOptions<ActivityCreate | ActivityView>) {
const { activity, byActor } = options

View File

@ -3,7 +3,7 @@ import { pathExists, readdir, remove } from 'fs-extra'
import { join } from 'path'
import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
import { VIDEO_LIVE } from '@server/initializers/constants'
import { LiveManager } from '@server/lib/live-manager'
import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live'
import { generateVideoMiniature } from '@server/lib/thumbnail'
import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/video-transcoding'
import { publishAndFederateIfNeeded } from '@server/lib/video'
@ -12,7 +12,7 @@ import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models'
import { MVideo, MVideoLive } from '@server/types/models'
import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
import { logger } from '../../../helpers/logger'
@ -37,7 +37,7 @@ async function processVideoLiveEnding (job: Bull.Job) {
return
}
LiveManager.Instance.cleanupShaSegments(video.uuid)
LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
if (live.saveReplay !== true) {
return cleanupLive(video, streamingPlaylist)
@ -46,19 +46,10 @@ async function processVideoLiveEnding (job: Bull.Job) {
return saveLive(video, live)
}
async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
const hlsDirectory = getHLSDirectory(video)
await remove(hlsDirectory)
await streamingPlaylist.destroy()
}
// ---------------------------------------------------------------------------
export {
processVideoLiveEnding,
cleanupLive
processVideoLiveEnding
}
// ---------------------------------------------------------------------------
@ -94,7 +85,7 @@ async function saveLive (video: MVideo, live: MVideoLive) {
let durationDone = false
for (const playlistFile of playlistFiles) {
const concatenatedTsFile = LiveManager.Instance.buildConcatenatedName(playlistFile)
const concatenatedTsFile = buildConcatenatedName(playlistFile)
const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile)
const probe = await ffprobePromise(concatenatedTsFilePath)

View File

@ -1,633 +0,0 @@
import * as Bluebird from 'bluebird'
import * as chokidar from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
import { createServer, Server } from 'net'
import { basename, join } from 'path'
import { isTestInstance } from '@server/helpers/core-utils'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
import { UserModel } from '@server/models/user/user'
import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylist, MStreamingPlaylistVideo, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { federateVideoIfNeeded } from './activitypub/videos'
import { buildSha256Segment } from './hls'
import { JobQueue } from './job-queue'
import { cleanupLive } from './job-queue/handlers/video-live-ending'
import { PeerTubeSocket } from './peertube-socket'
import { VideoTranscodingProfilesManager } from './transcoding/video-transcoding-profiles'
import { isAbleToUploadVideo } from './user'
import { getHLSDirectory } from './video-paths'
import memoizee = require('memoizee')
const NodeRtmpSession = require('node-media-server/node_rtmp_session')
const context = require('node-media-server/node_core_ctx')
const nodeMediaServerLogger = require('node-media-server/node_core_logger')
// Disable node media server logs
nodeMediaServerLogger.setLogType(0)
const config = {
rtmp: {
port: CONFIG.LIVE.RTMP.PORT,
chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
ping: VIDEO_LIVE.RTMP.PING,
ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
},
transcoding: {
ffmpeg: 'ffmpeg'
}
}
const lTags = loggerTagsFactory('live')
class LiveManager {
private static instance: LiveManager
private readonly transSessions = new Map<string, FfmpegCommand>()
private readonly videoSessions = new Map<number, string>()
// Values are Date().getTime()
private readonly watchersPerVideo = new Map<number, number[]>()
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
return isAbleToUploadVideo(userId, 1000)
}, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => {
return this.hasClientSocketsInBadHealth(sessionId)
}, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
private rtmpServer: Server
private constructor () {
}
init () {
const events = this.getContext().nodeEvent
events.on('postPublish', (sessionId: string, streamPath: string) => {
logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
const splittedPath = streamPath.split('/')
if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
return this.abortSession(sessionId)
}
this.handleSession(sessionId, streamPath, splittedPath[2])
.catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
})
events.on('donePublish', sessionId => {
logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
})
registerConfigChangedHandler(() => {
if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
this.run()
return
}
if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) {
this.stop()
}
})
// Cleanup broken lives, that were terminated by a server restart for example
this.handleBrokenLives()
.catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
}
run () {
logger.info('Running RTMP server on port %d', config.rtmp.port, lTags())
this.rtmpServer = createServer(socket => {
const session = new NodeRtmpSession(config, socket)
session.run()
})
this.rtmpServer.on('error', err => {
logger.error('Cannot run RTMP server.', { err, ...lTags() })
})
this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
}
stop () {
logger.info('Stopping RTMP server.', lTags())
this.rtmpServer.close()
this.rtmpServer = undefined
// Sessions is an object
this.getContext().sessions.forEach((session: any) => {
if (session instanceof NodeRtmpSession) {
session.stop()
}
})
}
isRunning () {
return !!this.rtmpServer
}
getSegmentsSha256 (videoUUID: string) {
return this.segmentsSha256.get(videoUUID)
}
stopSessionOf (videoId: number) {
const sessionId = this.videoSessions.get(videoId)
if (!sessionId) return
this.videoSessions.delete(videoId)
this.abortSession(sessionId)
}
getLiveQuotaUsedByUser (userId: number) {
const currentLives = this.livesPerUser.get(userId)
if (!currentLives) return 0
return currentLives.reduce((sum, obj) => sum + obj.size, 0)
}
addViewTo (videoId: number) {
if (this.videoSessions.has(videoId) === false) return
let watchers = this.watchersPerVideo.get(videoId)
if (!watchers) {
watchers = []
this.watchersPerVideo.set(videoId, watchers)
}
watchers.push(new Date().getTime())
}
cleanupShaSegments (videoUUID: string) {
this.segmentsSha256.delete(videoUUID)
}
addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
const segmentName = basename(segmentPath)
const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, this.buildConcatenatedName(segmentName))
return readFile(segmentPath)
.then(data => appendFile(dest, data))
.catch(err => logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...lTags() }))
}
buildConcatenatedName (segmentOrPlaylistPath: string) {
const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
return 'concat-' + num[1] + '.ts'
}
private processSegments (hlsVideoPath: string, videoUUID: string, videoLive: MVideoLive, segmentPaths: string[]) {
Bluebird.mapSeries(segmentPaths, async previousSegment => {
// Add sha hash of previous segments, because ffmpeg should have finished generating them
await this.addSegmentSha(videoUUID, previousSegment)
if (videoLive.saveReplay) {
await this.addSegmentToReplay(hlsVideoPath, previousSegment)
}
}).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...lTags(videoUUID) }))
}
private getContext () {
return context
}
private abortSession (id: string) {
const session = this.getContext().sessions.get(id)
if (session) {
session.stop()
this.getContext().sessions.delete(id)
}
const transSession = this.transSessions.get(id)
if (transSession) {
transSession.kill('SIGINT')
this.transSessions.delete(id)
}
}
private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
if (!videoLive) {
logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
return this.abortSession(sessionId)
}
const video = videoLive.Video
if (video.isBlacklisted()) {
logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
return this.abortSession(sessionId)
}
// Cleanup old potential live files (could happen with a permanent live)
this.cleanupShaSegments(video.uuid)
const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
if (oldStreamingPlaylist) {
await cleanupLive(video, oldStreamingPlaylist)
}
this.videoSessions.set(video.id, sessionId)
const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
const session = this.getContext().sessions.get(sessionId)
const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
const [ resolutionResult, fps ] = await Promise.all([
getVideoFileResolution(rtmpUrl),
getVideoFileFPS(rtmpUrl)
])
const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
? computeResolutionsToTranscode(resolutionResult.videoFileResolution, 'live')
: []
const allResolutions = resolutionsEnabled.concat([ session.videoHeight ])
logger.info(
'Will mux/transcode live video of original resolution %d.', session.videoHeight,
{ allResolutions, ...lTags(sessionId, video.uuid) }
)
const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
videoId: video.id,
playlistUrl,
segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive),
p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions),
p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION,
type: VideoStreamingPlaylistType.HLS
}, { returning: true }) as [ MStreamingPlaylist, boolean ]
return this.runMuxing({
sessionId,
videoLive,
playlist: Object.assign(videoStreamingPlaylist, { Video: video }),
rtmpUrl,
fps,
allResolutions
})
}
private async runMuxing (options: {
sessionId: string
videoLive: MVideoLiveVideo
playlist: MStreamingPlaylistVideo
rtmpUrl: string
fps: number
allResolutions: number[]
}) {
const { sessionId, videoLive, playlist, allResolutions, fps, rtmpUrl } = options
const startStreamDateTime = new Date().getTime()
const user = await UserModel.loadByLiveId(videoLive.id)
if (!this.livesPerUser.has(user.id)) {
this.livesPerUser.set(user.id, [])
}
const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
const livesOfUser = this.livesPerUser.get(user.id)
livesOfUser.push(currentUserLive)
for (let i = 0; i < allResolutions.length; i++) {
const resolution = allResolutions[i]
const file = new VideoFileModel({
resolution,
size: -1,
extname: '.ts',
infoHash: null,
fps,
videoStreamingPlaylistId: playlist.id
})
VideoFileModel.customUpsert(file, 'streaming-playlist', null)
.catch(err => logger.error('Cannot create file for live streaming.', { err, ...lTags(sessionId, videoLive.Video.uuid) }))
}
const outPath = getHLSDirectory(videoLive.Video)
await ensureDir(outPath)
const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
if (videoLive.saveReplay === true) {
await ensureDir(replayDirectory)
}
const videoUUID = videoLive.Video.uuid
const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
? await getLiveTranscodingCommand({
rtmpUrl,
outPath,
resolutions: allResolutions,
fps,
availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
profile: CONFIG.LIVE.TRANSCODING.PROFILE
})
: getLiveMuxingCommand(rtmpUrl, outPath)
logger.info('Running live muxing/transcoding for %s.', videoUUID, lTags(sessionId, videoUUID))
this.transSessions.set(sessionId, ffmpegExec)
const tsWatcher = chokidar.watch(outPath + '/*.ts')
const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
const playlistIdMatcher = /^([\d+])-/
const addHandler = segmentPath => {
logger.debug('Live add handler of %s.', segmentPath, lTags(sessionId, videoUUID))
const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || []
this.processSegments(outPath, videoUUID, videoLive, segmentsToProcess)
segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
logger.error(
'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
' Stopping session of video %s.', videoUUID,
lTags(sessionId, videoUUID)
)
this.stopSessionOf(videoLive.videoId)
return
}
// Duration constraint check
if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
logger.info('Stopping session of %s: max duration exceeded.', videoUUID, lTags(sessionId, videoUUID))
this.stopSessionOf(videoLive.videoId)
return
}
// Check user quota if the user enabled replay saving
if (videoLive.saveReplay === true) {
stat(segmentPath)
.then(segmentStat => {
currentUserLive.size += segmentStat.size
})
.then(() => this.isQuotaConstraintValid(user, videoLive))
.then(quotaValid => {
if (quotaValid !== true) {
logger.info('Stopping session of %s: user quota exceeded.', videoUUID, lTags(sessionId, videoUUID))
this.stopSessionOf(videoLive.videoId)
}
})
.catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err, ...lTags(sessionId, videoUUID) }))
}
}
const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath)
tsWatcher.on('add', p => addHandler(p))
tsWatcher.on('unlink', p => deleteHandler(p))
const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
masterWatcher.on('add', async () => {
try {
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
video.state = VideoState.PUBLISHED
await video.save()
videoLive.Video = video
setTimeout(() => {
federateVideoIfNeeded(video, false)
.catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...lTags(sessionId, videoUUID) }))
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
}, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
} catch (err) {
logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err, ...lTags(sessionId, videoUUID) })
} finally {
masterWatcher.close()
.catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...lTags(sessionId, videoUUID) }))
}
})
const onFFmpegEnded = () => {
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl, lTags(sessionId, videoUUID))
this.transSessions.delete(sessionId)
this.watchersPerVideo.delete(videoLive.videoId)
this.videoSessions.delete(videoLive.videoId)
const newLivesPerUser = this.livesPerUser.get(user.id)
.filter(o => o.liveId !== videoLive.id)
this.livesPerUser.set(user.id, newLivesPerUser)
setTimeout(() => {
// Wait latest segments generation, and close watchers
Promise.all([ tsWatcher.close(), masterWatcher.close() ])
.then(() => {
// Process remaining segments hash
for (const key of Object.keys(segmentsToProcessPerPlaylist)) {
this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
}
})
.catch(err => {
logger.error(
'Cannot close watchers of %s or process remaining hash segments.', outPath,
{ err, ...lTags(sessionId, videoUUID) }
)
})
this.onEndTransmuxing(videoLive.Video.id)
.catch(err => logger.error('Error in closed transmuxing.', { err, ...lTags(sessionId, videoUUID) }))
}, 1000)
}
ffmpegExec.on('error', (err, stdout, stderr) => {
onFFmpegEnded()
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
logger.error('Live transcoding error.', { err, stdout, stderr, ...lTags(sessionId, videoUUID) })
this.abortSession(sessionId)
})
ffmpegExec.on('end', () => onFFmpegEnded())
ffmpegExec.run()
}
private async onEndTransmuxing (videoUUID: string, cleanupNow = false) {
try {
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID)
if (!fullVideo) return
const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
if (!live.permanentLive) {
JobQueue.Instance.createJob({
type: 'video-live-ending',
payload: {
videoId: fullVideo.id
}
}, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
fullVideo.state = VideoState.LIVE_ENDED
} else {
fullVideo.state = VideoState.WAITING_FOR_LIVE
}
await fullVideo.save()
PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
await federateVideoIfNeeded(fullVideo, false)
} catch (err) {
logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) })
}
}
private async addSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
const shaResult = await buildSha256Segment(segmentPath)
if (!this.segmentsSha256.has(videoUUID)) {
this.segmentsSha256.set(videoUUID, new Map())
}
const filesMap = this.segmentsSha256.get(videoUUID)
filesMap.set(segmentName, shaResult)
}
private removeSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID))
const filesMap = this.segmentsSha256.get(videoUUID)
if (!filesMap) {
logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
return
}
if (!filesMap.has(segmentName)) {
logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
return
}
filesMap.delete(segmentName)
}
private isDurationConstraintValid (streamingStartTime: number) {
const maxDuration = CONFIG.LIVE.MAX_DURATION
// No limit
if (maxDuration < 0) return true
const now = new Date().getTime()
const max = streamingStartTime + maxDuration
return now <= max
}
private hasClientSocketsInBadHealth (sessionId: string) {
const rtmpSession = this.getContext().sessions.get(sessionId)
if (!rtmpSession) {
logger.warn('Cannot get session %s to check players socket health.', sessionId, lTags(sessionId))
return
}
for (const playerSessionId of rtmpSession.players) {
const playerSession = this.getContext().sessions.get(playerSessionId)
if (!playerSession) {
logger.error('Cannot get player session %s to check socket health.', playerSession, lTags(sessionId))
continue
}
if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
return true
}
}
return false
}
private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
if (live.saveReplay !== true) return true
return this.isAbleToUploadVideoWithCache(user.id)
}
private async updateLiveViews () {
if (!this.isRunning()) return
if (!isTestInstance()) logger.info('Updating live video views.', lTags())
for (const videoId of this.watchersPerVideo.keys()) {
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
const watchers = this.watchersPerVideo.get(videoId)
const numWatchers = watchers.length
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
video.views = numWatchers
await video.save()
await federateVideoIfNeeded(video, false)
PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
// Only keep not expired watchers
const newWatchers = watchers.filter(w => w > notBefore)
this.watchersPerVideo.set(videoId, newWatchers)
logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
}
}
private async handleBrokenLives () {
const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
for (const uuid of videoUUIDs) {
await this.onEndTransmuxing(uuid, true)
}
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
// ---------------------------------------------------------------------------
export {
LiveManager
}

4
server/lib/live/index.ts Normal file
View File

@ -0,0 +1,4 @@
export * from './live-manager'
export * from './live-quota-store'
export * from './live-segment-sha-store'
export * from './live-utils'

View File

@ -0,0 +1,412 @@
import { createServer, Server } from 'net'
import { isTestInstance } from '@server/helpers/core-utils'
import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
import { UserModel } from '@server/models/user/user'
import { VideoModel } from '@server/models/video/video'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { federateVideoIfNeeded } from '../activitypub/videos'
import { JobQueue } from '../job-queue'
import { PeerTubeSocket } from '../peertube-socket'
import { LiveQuotaStore } from './live-quota-store'
import { LiveSegmentShaStore } from './live-segment-sha-store'
import { cleanupLive } from './live-utils'
import { MuxingSession } from './shared'
const NodeRtmpSession = require('node-media-server/node_rtmp_session')
const context = require('node-media-server/node_core_ctx')
const nodeMediaServerLogger = require('node-media-server/node_core_logger')
// Disable node media server logs
nodeMediaServerLogger.setLogType(0)
const config = {
rtmp: {
port: CONFIG.LIVE.RTMP.PORT,
chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
ping: VIDEO_LIVE.RTMP.PING,
ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
},
transcoding: {
ffmpeg: 'ffmpeg'
}
}
const lTags = loggerTagsFactory('live')
class LiveManager {
private static instance: LiveManager
private readonly muxingSessions = new Map<string, MuxingSession>()
private readonly videoSessions = new Map<number, string>()
// Values are Date().getTime()
private readonly watchersPerVideo = new Map<number, number[]>()
private rtmpServer: Server
private constructor () {
}
init () {
const events = this.getContext().nodeEvent
events.on('postPublish', (sessionId: string, streamPath: string) => {
logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
const splittedPath = streamPath.split('/')
if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
return this.abortSession(sessionId)
}
this.handleSession(sessionId, streamPath, splittedPath[2])
.catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
})
events.on('donePublish', sessionId => {
logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
})
registerConfigChangedHandler(() => {
if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
this.run()
return
}
if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) {
this.stop()
}
})
// Cleanup broken lives, that were terminated by a server restart for example
this.handleBrokenLives()
.catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
}
run () {
logger.info('Running RTMP server on port %d', config.rtmp.port, lTags())
this.rtmpServer = createServer(socket => {
const session = new NodeRtmpSession(config, socket)
session.run()
})
this.rtmpServer.on('error', err => {
logger.error('Cannot run RTMP server.', { err, ...lTags() })
})
this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
}
stop () {
logger.info('Stopping RTMP server.', lTags())
this.rtmpServer.close()
this.rtmpServer = undefined
// Sessions is an object
this.getContext().sessions.forEach((session: any) => {
if (session instanceof NodeRtmpSession) {
session.stop()
}
})
}
isRunning () {
return !!this.rtmpServer
}
stopSessionOf (videoId: number) {
const sessionId = this.videoSessions.get(videoId)
if (!sessionId) return
this.videoSessions.delete(videoId)
this.abortSession(sessionId)
}
addViewTo (videoId: number) {
if (this.videoSessions.has(videoId) === false) return
let watchers = this.watchersPerVideo.get(videoId)
if (!watchers) {
watchers = []
this.watchersPerVideo.set(videoId, watchers)
}
watchers.push(new Date().getTime())
}
private getContext () {
return context
}
private abortSession (sessionId: string) {
const session = this.getContext().sessions.get(sessionId)
if (session) {
session.stop()
this.getContext().sessions.delete(sessionId)
}
const muxingSession = this.muxingSessions.get(sessionId)
if (muxingSession) muxingSession.abort()
}
private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
if (!videoLive) {
logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
return this.abortSession(sessionId)
}
const video = videoLive.Video
if (video.isBlacklisted()) {
logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
return this.abortSession(sessionId)
}
// Cleanup old potential live files (could happen with a permanent live)
LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
if (oldStreamingPlaylist) {
await cleanupLive(video, oldStreamingPlaylist)
}
this.videoSessions.set(video.id, sessionId)
const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
const [ { videoFileResolution }, fps ] = await Promise.all([
getVideoFileResolution(rtmpUrl),
getVideoFileFPS(rtmpUrl)
])
const allResolutions = this.buildAllResolutionsToTranscode(videoFileResolution)
logger.info(
'Will mux/transcode live video of original resolution %d.', videoFileResolution,
{ allResolutions, ...lTags(sessionId, video.uuid) }
)
const streamingPlaylist = await this.createLivePlaylist(video, allResolutions)
return this.runMuxingSession({
sessionId,
videoLive,
streamingPlaylist,
rtmpUrl,
fps,
allResolutions
})
}
private async runMuxingSession (options: {
sessionId: string
videoLive: MVideoLiveVideo
streamingPlaylist: MStreamingPlaylistVideo
rtmpUrl: string
fps: number
allResolutions: number[]
}) {
const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, rtmpUrl } = options
const videoUUID = videoLive.Video.uuid
const localLTags = lTags(sessionId, videoUUID)
const user = await UserModel.loadByLiveId(videoLive.id)
LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id)
const muxingSession = new MuxingSession({
context: this.getContext(),
user,
sessionId,
videoLive,
streamingPlaylist,
rtmpUrl,
fps,
allResolutions
})
muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags))
muxingSession.on('bad-socket-health', ({ videoId }) => {
logger.error(
'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
' Stopping session of video %s.', videoUUID,
localLTags
)
this.stopSessionOf(videoId)
})
muxingSession.on('duration-exceeded', ({ videoId }) => {
logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
this.stopSessionOf(videoId)
})
muxingSession.on('quota-exceeded', ({ videoId }) => {
logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
this.stopSessionOf(videoId)
})
muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId))
muxingSession.on('ffmpeg-end', ({ videoId }) => {
this.onMuxingFFmpegEnd(videoId)
})
muxingSession.on('after-cleanup', ({ videoId }) => {
this.muxingSessions.delete(sessionId)
return this.onAfterMuxingCleanup(videoId)
.catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
})
this.muxingSessions.set(sessionId, muxingSession)
muxingSession.runMuxing()
.catch(err => {
logger.error('Cannot run muxing.', { err, ...localLTags })
this.abortSession(sessionId)
})
}
private async publishAndFederateLive (live: MVideoLiveVideo, localLTags: { tags: string[] }) {
const videoId = live.videoId
try {
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
logger.info('Will publish and federate live %s.', video.url, localLTags)
video.state = VideoState.PUBLISHED
await video.save()
live.Video = video
setTimeout(() => {
federateVideoIfNeeded(video, false)
.catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }))
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
}, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
} catch (err) {
logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
}
}
private onMuxingFFmpegEnd (videoId: number) {
this.watchersPerVideo.delete(videoId)
this.videoSessions.delete(videoId)
}
private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) {
try {
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID)
if (!fullVideo) return
const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
if (!live.permanentLive) {
JobQueue.Instance.createJob({
type: 'video-live-ending',
payload: {
videoId: fullVideo.id
}
}, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
fullVideo.state = VideoState.LIVE_ENDED
} else {
fullVideo.state = VideoState.WAITING_FOR_LIVE
}
await fullVideo.save()
PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
await federateVideoIfNeeded(fullVideo, false)
} catch (err) {
logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) })
}
}
private async updateLiveViews () {
if (!this.isRunning()) return
if (!isTestInstance()) logger.info('Updating live video views.', lTags())
for (const videoId of this.watchersPerVideo.keys()) {
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
const watchers = this.watchersPerVideo.get(videoId)
const numWatchers = watchers.length
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
video.views = numWatchers
await video.save()
await federateVideoIfNeeded(video, false)
PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
// Only keep not expired watchers
const newWatchers = watchers.filter(w => w > notBefore)
this.watchersPerVideo.set(videoId, newWatchers)
logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
}
}
private async handleBrokenLives () {
const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
for (const uuid of videoUUIDs) {
await this.onAfterMuxingCleanup(uuid, true)
}
}
private buildAllResolutionsToTranscode (originResolution: number) {
const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
? computeResolutionsToTranscode(originResolution, 'live')
: []
return resolutionsEnabled.concat([ originResolution ])
}
private async createLivePlaylist (video: MVideo, allResolutions: number[]) {
const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
videoId: video.id,
playlistUrl,
segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive),
p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions),
p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION,
type: VideoStreamingPlaylistType.HLS
}, { returning: true }) as [ MStreamingPlaylist, boolean ]
return Object.assign(videoStreamingPlaylist, { Video: video })
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
// ---------------------------------------------------------------------------
export {
LiveManager
}

View File

@ -0,0 +1,48 @@
class LiveQuotaStore {
private static instance: LiveQuotaStore
private readonly livesPerUser = new Map<number, { liveId: number, size: number }[]>()
private constructor () {
}
addNewLive (userId: number, liveId: number) {
if (!this.livesPerUser.has(userId)) {
this.livesPerUser.set(userId, [])
}
const currentUserLive = { liveId, size: 0 }
const livesOfUser = this.livesPerUser.get(userId)
livesOfUser.push(currentUserLive)
}
removeLive (userId: number, liveId: number) {
const newLivesPerUser = this.livesPerUser.get(userId)
.filter(o => o.liveId !== liveId)
this.livesPerUser.set(userId, newLivesPerUser)
}
addQuotaTo (userId: number, liveId: number, size: number) {
const lives = this.livesPerUser.get(userId)
const live = lives.find(l => l.liveId === liveId)
live.size += size
}
getLiveQuotaOf (userId: number) {
const currentLives = this.livesPerUser.get(userId)
if (!currentLives) return 0
return currentLives.reduce((sum, obj) => sum + obj.size, 0)
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
export {
LiveQuotaStore
}

View File

@ -0,0 +1,64 @@
import { basename } from 'path'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { buildSha256Segment } from '../hls'
const lTags = loggerTagsFactory('live')
class LiveSegmentShaStore {
private static instance: LiveSegmentShaStore
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
private constructor () {
}
getSegmentsSha256 (videoUUID: string) {
return this.segmentsSha256.get(videoUUID)
}
async addSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
const shaResult = await buildSha256Segment(segmentPath)
if (!this.segmentsSha256.has(videoUUID)) {
this.segmentsSha256.set(videoUUID, new Map())
}
const filesMap = this.segmentsSha256.get(videoUUID)
filesMap.set(segmentName, shaResult)
}
removeSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID))
const filesMap = this.segmentsSha256.get(videoUUID)
if (!filesMap) {
logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
return
}
if (!filesMap.has(segmentName)) {
logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
return
}
filesMap.delete(segmentName)
}
cleanupShaSegments (videoUUID: string) {
this.segmentsSha256.delete(videoUUID)
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
export {
LiveSegmentShaStore
}

View File

@ -0,0 +1,23 @@
import { remove } from 'fs-extra'
import { basename } from 'path'
import { MStreamingPlaylist, MVideo } from '@server/types/models'
import { getHLSDirectory } from '../video-paths'
function buildConcatenatedName (segmentOrPlaylistPath: string) {
const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
return 'concat-' + num[1] + '.ts'
}
async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
const hlsDirectory = getHLSDirectory(video)
await remove(hlsDirectory)
await streamingPlaylist.destroy()
}
export {
cleanupLive,
buildConcatenatedName
}

View File

@ -0,0 +1 @@
export * from './muxing-session'

View File

@ -0,0 +1,341 @@
import * as Bluebird from 'bluebird'
import * as chokidar from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
import { basename, join } from 'path'
import { EventEmitter } from 'stream'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
import { VideoFileModel } from '@server/models/video/video-file'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
import { VideoTranscodingProfilesManager } from '../../transcoding/video-transcoding-profiles'
import { isAbleToUploadVideo } from '../../user'
import { getHLSDirectory } from '../../video-paths'
import { LiveQuotaStore } from '../live-quota-store'
import { LiveSegmentShaStore } from '../live-segment-sha-store'
import { buildConcatenatedName } from '../live-utils'
import memoizee = require('memoizee')
interface MuxingSessionEvents {
'master-playlist-created': ({ videoId: number }) => void
'bad-socket-health': ({ videoId: number }) => void
'duration-exceeded': ({ videoId: number }) => void
'quota-exceeded': ({ videoId: number }) => void
'ffmpeg-end': ({ videoId: number }) => void
'ffmpeg-error': ({ sessionId: string }) => void
'after-cleanup': ({ videoId: number }) => void
}
declare interface MuxingSession {
on<U extends keyof MuxingSessionEvents>(
event: U, listener: MuxingSessionEvents[U]
): this
emit<U extends keyof MuxingSessionEvents>(
event: U, ...args: Parameters<MuxingSessionEvents[U]>
): boolean
}
class MuxingSession extends EventEmitter {
private ffmpegCommand: FfmpegCommand
private readonly context: any
private readonly user: MUserId
private readonly sessionId: string
private readonly videoLive: MVideoLiveVideo
private readonly streamingPlaylist: MStreamingPlaylistVideo
private readonly rtmpUrl: string
private readonly fps: number
private readonly allResolutions: number[]
private readonly videoId: number
private readonly videoUUID: string
private readonly saveReplay: boolean
private readonly lTags: LoggerTagsFn
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
private tsWatcher: chokidar.FSWatcher
private masterWatcher: chokidar.FSWatcher
private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
return isAbleToUploadVideo(userId, 1000)
}, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
private readonly hasClientSocketInBadHealthWithCache = memoizee((sessionId: string) => {
return this.hasClientSocketInBadHealth(sessionId)
}, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
constructor (options: {
context: any
user: MUserId
sessionId: string
videoLive: MVideoLiveVideo
streamingPlaylist: MStreamingPlaylistVideo
rtmpUrl: string
fps: number
allResolutions: number[]
}) {
super()
this.context = options.context
this.user = options.user
this.sessionId = options.sessionId
this.videoLive = options.videoLive
this.streamingPlaylist = options.streamingPlaylist
this.rtmpUrl = options.rtmpUrl
this.fps = options.fps
this.allResolutions = options.allResolutions
this.videoId = this.videoLive.Video.id
this.videoUUID = this.videoLive.Video.uuid
this.saveReplay = this.videoLive.saveReplay
this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
}
async runMuxing () {
this.createFiles()
const outPath = await this.prepareDirectories()
this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
? await getLiveTranscodingCommand({
rtmpUrl: this.rtmpUrl,
outPath,
resolutions: this.allResolutions,
fps: this.fps,
availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
profile: CONFIG.LIVE.TRANSCODING.PROFILE
})
: getLiveMuxingCommand(this.rtmpUrl, outPath)
logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags)
this.watchTSFiles(outPath)
this.watchMasterFile(outPath)
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
this.onFFmpegError(err, stdout, stderr, outPath)
})
this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath))
this.ffmpegCommand.run()
}
abort () {
if (!this.ffmpegCommand) return false
this.ffmpegCommand.kill('SIGINT')
return true
}
private onFFmpegError (err: any, stdout: string, stderr: string, outPath: string) {
this.onFFmpegEnded(outPath)
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
logger.error('Live transcoding error.', { err, stdout, stderr, ...this.lTags })
this.emit('ffmpeg-error', ({ sessionId: this.sessionId }))
}
private onFFmpegEnded (outPath: string) {
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.rtmpUrl, this.lTags)
setTimeout(() => {
// Wait latest segments generation, and close watchers
Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ])
.then(() => {
// Process remaining segments hash
for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
this.processSegments(outPath, this.segmentsToProcessPerPlaylist[key])
}
})
.catch(err => {
logger.error(
'Cannot close watchers of %s or process remaining hash segments.', outPath,
{ err, ...this.lTags }
)
})
this.emit('after-cleanup', { videoId: this.videoId })
}, 1000)
}
private watchMasterFile (outPath: string) {
this.masterWatcher = chokidar.watch(outPath + '/master.m3u8')
this.masterWatcher.on('add', async () => {
this.emit('master-playlist-created', { videoId: this.videoId })
this.masterWatcher.close()
.catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags }))
})
}
private watchTSFiles (outPath: string) {
const startStreamDateTime = new Date().getTime()
this.tsWatcher = chokidar.watch(outPath + '/*.ts')
const playlistIdMatcher = /^([\d+])-/
const addHandler = async segmentPath => {
logger.debug('Live add handler of %s.', segmentPath, this.lTags)
const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
this.processSegments(outPath, segmentsToProcess)
this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
this.emit('bad-socket-health', { videoId: this.videoId })
return
}
// Duration constraint check
if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
this.emit('duration-exceeded', { videoId: this.videoId })
return
}
// Check user quota if the user enabled replay saving
if (await this.isQuotaExceeded(segmentPath) === true) {
this.emit('quota-exceeded', { videoId: this.videoId })
}
}
const deleteHandler = segmentPath => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath)
this.tsWatcher.on('add', p => addHandler(p))
this.tsWatcher.on('unlink', p => deleteHandler(p))
}
private async isQuotaExceeded (segmentPath: string) {
if (this.saveReplay !== true) return false
try {
const segmentStat = await stat(segmentPath)
LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.videoLive.id, segmentStat.size)
const canUpload = await this.isAbleToUploadVideoWithCache(this.user.id)
return canUpload !== true
} catch (err) {
logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags })
}
}
private createFiles () {
for (let i = 0; i < this.allResolutions.length; i++) {
const resolution = this.allResolutions[i]
const file = new VideoFileModel({
resolution,
size: -1,
extname: '.ts',
infoHash: null,
fps: this.fps,
videoStreamingPlaylistId: this.streamingPlaylist.id
})
VideoFileModel.customUpsert(file, 'streaming-playlist', null)
.catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags }))
}
}
private async prepareDirectories () {
const outPath = getHLSDirectory(this.videoLive.Video)
await ensureDir(outPath)
const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
if (this.videoLive.saveReplay === true) {
await ensureDir(replayDirectory)
}
return outPath
}
private isDurationConstraintValid (streamingStartTime: number) {
const maxDuration = CONFIG.LIVE.MAX_DURATION
// No limit
if (maxDuration < 0) return true
const now = new Date().getTime()
const max = streamingStartTime + maxDuration
return now <= max
}
private processSegments (hlsVideoPath: string, segmentPaths: string[]) {
Bluebird.mapSeries(segmentPaths, async previousSegment => {
// Add sha hash of previous segments, because ffmpeg should have finished generating them
await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment)
if (this.saveReplay) {
await this.addSegmentToReplay(hlsVideoPath, previousSegment)
}
}).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags }))
}
private hasClientSocketInBadHealth (sessionId: string) {
const rtmpSession = this.context.sessions.get(sessionId)
if (!rtmpSession) {
logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags)
return
}
for (const playerSessionId of rtmpSession.players) {
const playerSession = this.context.sessions.get(playerSessionId)
if (!playerSession) {
logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags)
continue
}
if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
return true
}
}
return false
}
private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
const segmentName = basename(segmentPath)
const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName))
try {
const data = await readFile(segmentPath)
await appendFile(dest, data)
} catch (err) {
logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags })
}
}
}
// ---------------------------------------------------------------------------
export {
MuxingSession
}

View File

@ -14,7 +14,7 @@ import { MUser, MUserDefault, MUserId } from '../types/models/user'
import { generateAndSaveActorKeys } from './activitypub/actors'
import { getLocalAccountActivityPubUrl } from './activitypub/url'
import { Emailer } from './emailer'
import { LiveManager } from './live-manager'
import { LiveQuotaStore } from './live/live-quota-store'
import { buildActorInstance } from './local-actor'
import { Redis } from './redis'
import { createLocalVideoChannel } from './video-channel'
@ -129,7 +129,7 @@ async function getOriginalVideoFileTotalFromUser (user: MUserId) {
const base = await UserModel.getTotalRawQuery(query, user.id)
return base + LiveManager.Instance.getLiveQuotaUsedByUser(user.id)
return base + LiveQuotaStore.Instance.getLiveQuotaOf(user.id)
}
// Returns cumulative size of all video files uploaded in the last 24 hours.
@ -143,10 +143,10 @@ async function getOriginalVideoFileTotalDailyFromUser (user: MUserId) {
const base = await UserModel.getTotalRawQuery(query, user.id)
return base + LiveManager.Instance.getLiveQuotaUsedByUser(user.id)
return base + LiveQuotaStore.Instance.getLiveQuotaOf(user.id)
}
async function isAbleToUploadVideo (userId: number, size: number) {
async function isAbleToUploadVideo (userId: number, newVideoSize: number) {
const user = await UserModel.loadById(userId)
if (user.videoQuota === -1 && user.videoQuotaDaily === -1) return Promise.resolve(true)
@ -156,8 +156,8 @@ async function isAbleToUploadVideo (userId: number, size: number) {
getOriginalVideoFileTotalDailyFromUser(user)
])
const uploadedTotal = size + totalBytes
const uploadedDaily = size + totalBytesDaily
const uploadedTotal = newVideoSize + totalBytes
const uploadedDaily = newVideoSize + totalBytesDaily
if (user.videoQuotaDaily === -1) return uploadedTotal < user.videoQuota
if (user.videoQuota === -1) return uploadedDaily < user.videoQuotaDaily

View File

@ -16,7 +16,7 @@ import { CONFIG } from '../initializers/config'
import { VideoBlacklistModel } from '../models/video/video-blacklist'
import { sendDeleteVideo } from './activitypub/send'
import { federateVideoIfNeeded } from './activitypub/videos'
import { LiveManager } from './live-manager'
import { LiveManager } from './live/live-manager'
import { Notifier } from './notifier'
import { Hooks } from './plugins/hooks'

View File

@ -27,7 +27,7 @@ import {
import { setAsUpdated } from '@server/helpers/database-utils'
import { buildNSFWFilter } from '@server/helpers/express-utils'
import { getPrivaciesForFederation, isPrivacyForFederation, isStateForFederation } from '@server/helpers/video'
import { LiveManager } from '@server/lib/live-manager'
import { LiveManager } from '@server/lib/live/live-manager'
import { getHLSDirectory, getVideoFilePath } from '@server/lib/video-paths'
import { getServerActor } from '@server/models/application/application'
import { ModelCache } from '@server/models/model-cache'

View File

@ -1,4 +1,6 @@
import './live-constraints'
import './live-socket-messages'
import './live-permanent'
import './live-save-replay'
import './live-views'
import './live'

View File

@ -0,0 +1,196 @@
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import 'mocha'
import * as chai from 'chai'
import { getLiveNotificationSocket } from '@shared/extra-utils/socket/socket-io'
import { VideoPrivacy, VideoState } from '@shared/models'
import {
cleanupTests,
createLive,
doubleFollow,
flushAndRunMultipleServers,
getVideoIdFromUUID,
sendRTMPStreamInVideo,
ServerInfo,
setAccessTokensToServers,
setDefaultVideoChannel,
stopFfmpeg,
updateCustomSubConfig,
viewVideo,
wait,
waitJobs,
waitUntilLiveEnded,
waitUntilLivePublishedOnAllServers
} from '../../../../shared/extra-utils'
const expect = chai.expect
describe('Test live', function () {
let servers: ServerInfo[] = []
before(async function () {
this.timeout(120000)
servers = await flushAndRunMultipleServers(2)
// Get the access tokens
await setAccessTokensToServers(servers)
await setDefaultVideoChannel(servers)
await updateCustomSubConfig(servers[0].url, servers[0].accessToken, {
live: {
enabled: true,
allowReplay: true,
transcoding: {
enabled: false
}
}
})
// Server 1 and server 2 follow each other
await doubleFollow(servers[0], servers[1])
})
describe('Live socket messages', function () {
async function createLiveWrapper () {
const liveAttributes = {
name: 'live video',
channelId: servers[0].videoChannel.id,
privacy: VideoPrivacy.PUBLIC
}
const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes)
return res.body.video.uuid
}
it('Should correctly send a message when the live starts and ends', async function () {
this.timeout(60000)
const localStateChanges: VideoState[] = []
const remoteStateChanges: VideoState[] = []
const liveVideoUUID = await createLiveWrapper()
await waitJobs(servers)
{
const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID)
const localSocket = getLiveNotificationSocket(servers[0].url)
localSocket.on('state-change', data => localStateChanges.push(data.state))
localSocket.emit('subscribe', { videoId })
}
{
const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID)
const remoteSocket = getLiveNotificationSocket(servers[1].url)
remoteSocket.on('state-change', data => remoteStateChanges.push(data.state))
remoteSocket.emit('subscribe', { videoId })
}
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID)
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
for (const stateChanges of [ localStateChanges, remoteStateChanges ]) {
expect(stateChanges).to.have.length.at.least(1)
expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.PUBLISHED)
}
await stopFfmpeg(command)
for (const server of servers) {
await waitUntilLiveEnded(server.url, server.accessToken, liveVideoUUID)
}
await waitJobs(servers)
for (const stateChanges of [ localStateChanges, remoteStateChanges ]) {
expect(stateChanges).to.have.length.at.least(2)
expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.LIVE_ENDED)
}
})
it('Should correctly send views change notification', async function () {
this.timeout(60000)
let localLastVideoViews = 0
let remoteLastVideoViews = 0
const liveVideoUUID = await createLiveWrapper()
await waitJobs(servers)
{
const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID)
const localSocket = getLiveNotificationSocket(servers[0].url)
localSocket.on('views-change', data => { localLastVideoViews = data.views })
localSocket.emit('subscribe', { videoId })
}
{
const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID)
const remoteSocket = getLiveNotificationSocket(servers[1].url)
remoteSocket.on('views-change', data => { remoteLastVideoViews = data.views })
remoteSocket.emit('subscribe', { videoId })
}
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID)
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
expect(localLastVideoViews).to.equal(0)
expect(remoteLastVideoViews).to.equal(0)
await viewVideo(servers[0].url, liveVideoUUID)
await viewVideo(servers[1].url, liveVideoUUID)
await waitJobs(servers)
await wait(5000)
await waitJobs(servers)
expect(localLastVideoViews).to.equal(2)
expect(remoteLastVideoViews).to.equal(2)
await stopFfmpeg(command)
})
it('Should not receive a notification after unsubscribe', async function () {
this.timeout(120000)
const stateChanges: VideoState[] = []
const liveVideoUUID = await createLiveWrapper()
await waitJobs(servers)
const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID)
const socket = getLiveNotificationSocket(servers[0].url)
socket.on('state-change', data => stateChanges.push(data.state))
socket.emit('subscribe', { videoId })
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID)
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
// Notifier waits before sending a notification
await wait(10000)
expect(stateChanges).to.have.lengthOf(1)
socket.emit('unsubscribe', { videoId })
await stopFfmpeg(command)
await waitJobs(servers)
expect(stateChanges).to.have.lengthOf(1)
})
})
after(async function () {
await cleanupTests(servers)
})
})

View File

@ -0,0 +1,130 @@
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import 'mocha'
import * as chai from 'chai'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { VideoDetails, VideoPrivacy } from '@shared/models'
import {
cleanupTests,
createLive,
doubleFollow,
flushAndRunMultipleServers,
getVideo,
sendRTMPStreamInVideo,
ServerInfo,
setAccessTokensToServers,
setDefaultVideoChannel,
stopFfmpeg,
updateCustomSubConfig,
viewVideo,
wait,
waitJobs,
waitUntilLivePublishedOnAllServers
} from '../../../../shared/extra-utils'
const expect = chai.expect
describe('Test live', function () {
let servers: ServerInfo[] = []
before(async function () {
this.timeout(120000)
servers = await flushAndRunMultipleServers(2)
// Get the access tokens
await setAccessTokensToServers(servers)
await setDefaultVideoChannel(servers)
await updateCustomSubConfig(servers[0].url, servers[0].accessToken, {
live: {
enabled: true,
allowReplay: true,
transcoding: {
enabled: false
}
}
})
// Server 1 and server 2 follow each other
await doubleFollow(servers[0], servers[1])
})
describe('Live views', function () {
let liveVideoId: string
let command: FfmpegCommand
async function countViews (expected: number) {
for (const server of servers) {
const res = await getVideo(server.url, liveVideoId)
const video: VideoDetails = res.body
expect(video.views).to.equal(expected)
}
}
before(async function () {
this.timeout(30000)
const liveAttributes = {
name: 'live video',
channelId: servers[0].videoChannel.id,
privacy: VideoPrivacy.PUBLIC
}
const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes)
liveVideoId = res.body.video.uuid
command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
})
it('Should display no views for a live', async function () {
await countViews(0)
})
it('Should view a live twice and display 1 view', async function () {
this.timeout(30000)
await viewVideo(servers[0].url, liveVideoId)
await viewVideo(servers[0].url, liveVideoId)
await wait(7000)
await waitJobs(servers)
await countViews(1)
})
it('Should wait and display 0 views', async function () {
this.timeout(30000)
await wait(12000)
await waitJobs(servers)
await countViews(0)
})
it('Should view a live on a remote and on local and display 2 views', async function () {
this.timeout(30000)
await viewVideo(servers[0].url, liveVideoId)
await viewVideo(servers[1].url, liveVideoId)
await viewVideo(servers[1].url, liveVideoId)
await wait(7000)
await waitJobs(servers)
await countViews(2)
})
after(async function () {
await stopFfmpeg(command)
})
})
after(async function () {
await cleanupTests(servers)
})
})

View File

@ -2,10 +2,8 @@
import 'mocha'
import * as chai from 'chai'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { join } from 'path'
import { ffprobePromise, getVideoStreamFromFile } from '@server/helpers/ffprobe-utils'
import { getLiveNotificationSocket } from '@shared/extra-utils/socket/socket-io'
import { LiveVideo, LiveVideoCreate, Video, VideoDetails, VideoPrivacy, VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { HttpStatusCode } from '../../../../shared/core-utils/miscs/http-error-codes'
import {
@ -22,7 +20,6 @@ import {
getMyVideosWithFilter,
getPlaylist,
getVideo,
getVideoIdFromUUID,
getVideosList,
getVideosWithFilters,
killallServers,
@ -40,11 +37,11 @@ import {
updateCustomSubConfig,
updateLive,
uploadVideoAndGetId,
viewVideo,
wait,
waitJobs,
waitUntilLiveEnded,
waitUntilLivePublished,
waitUntilLivePublishedOnAllServers,
waitUntilLiveSegmentGeneration
} from '../../../../shared/extra-utils'
@ -53,12 +50,6 @@ const expect = chai.expect
describe('Test live', function () {
let servers: ServerInfo[] = []
async function waitUntilLivePublishedOnAllServers (videoId: string) {
for (const server of servers) {
await waitUntilLivePublished(server.url, server.accessToken, videoId)
}
}
before(async function () {
this.timeout(120000)
@ -247,7 +238,7 @@ describe('Test live', function () {
liveVideoId = resLive.body.video.uuid
command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
await waitUntilLivePublishedOnAllServers(liveVideoId)
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
})
@ -461,7 +452,7 @@ describe('Test live', function () {
liveVideoId = await createLiveWrapper(false)
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
await waitUntilLivePublishedOnAllServers(liveVideoId)
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, [ 720 ])
@ -477,7 +468,7 @@ describe('Test live', function () {
liveVideoId = await createLiveWrapper(false)
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
await waitUntilLivePublishedOnAllServers(liveVideoId)
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, resolutions)
@ -494,7 +485,7 @@ describe('Test live', function () {
liveVideoId = await createLiveWrapper(true)
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId, 'video_short2.webm')
await waitUntilLivePublishedOnAllServers(liveVideoId)
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, resolutions)
@ -504,7 +495,7 @@ describe('Test live', function () {
await waitJobs(servers)
await waitUntilLivePublishedOnAllServers(liveVideoId)
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
const bitrateLimits = {
720: 5000 * 1000, // 60FPS
@ -559,216 +550,6 @@ describe('Test live', function () {
})
})
describe('Live views', function () {
let liveVideoId: string
let command: FfmpegCommand
async function countViews (expected: number) {
for (const server of servers) {
const res = await getVideo(server.url, liveVideoId)
const video: VideoDetails = res.body
expect(video.views).to.equal(expected)
}
}
before(async function () {
this.timeout(30000)
const liveAttributes = {
name: 'live video',
channelId: servers[0].videoChannel.id,
privacy: VideoPrivacy.PUBLIC
}
const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes)
liveVideoId = res.body.video.uuid
command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId)
await waitUntilLivePublishedOnAllServers(liveVideoId)
await waitJobs(servers)
})
it('Should display no views for a live', async function () {
await countViews(0)
})
it('Should view a live twice and display 1 view', async function () {
this.timeout(30000)
await viewVideo(servers[0].url, liveVideoId)
await viewVideo(servers[0].url, liveVideoId)
await wait(7000)
await waitJobs(servers)
await countViews(1)
})
it('Should wait and display 0 views', async function () {
this.timeout(30000)
await wait(7000)
await waitJobs(servers)
await countViews(0)
})
it('Should view a live on a remote and on local and display 2 views', async function () {
this.timeout(30000)
await viewVideo(servers[0].url, liveVideoId)
await viewVideo(servers[1].url, liveVideoId)
await viewVideo(servers[1].url, liveVideoId)
await wait(7000)
await waitJobs(servers)
await countViews(2)
})
after(async function () {
await stopFfmpeg(command)
})
})
describe('Live socket messages', function () {
async function createLiveWrapper () {
const liveAttributes = {
name: 'live video',
channelId: servers[0].videoChannel.id,
privacy: VideoPrivacy.PUBLIC
}
const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes)
return res.body.video.uuid
}
it('Should correctly send a message when the live starts and ends', async function () {
this.timeout(60000)
const localStateChanges: VideoState[] = []
const remoteStateChanges: VideoState[] = []
const liveVideoUUID = await createLiveWrapper()
await waitJobs(servers)
{
const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID)
const localSocket = getLiveNotificationSocket(servers[0].url)
localSocket.on('state-change', data => localStateChanges.push(data.state))
localSocket.emit('subscribe', { videoId })
}
{
const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID)
const remoteSocket = getLiveNotificationSocket(servers[1].url)
remoteSocket.on('state-change', data => remoteStateChanges.push(data.state))
remoteSocket.emit('subscribe', { videoId })
}
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID)
await waitUntilLivePublishedOnAllServers(liveVideoUUID)
await waitJobs(servers)
for (const stateChanges of [ localStateChanges, remoteStateChanges ]) {
expect(stateChanges).to.have.length.at.least(1)
expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.PUBLISHED)
}
await stopFfmpeg(command)
for (const server of servers) {
await waitUntilLiveEnded(server.url, server.accessToken, liveVideoUUID)
}
await waitJobs(servers)
for (const stateChanges of [ localStateChanges, remoteStateChanges ]) {
expect(stateChanges).to.have.length.at.least(2)
expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.LIVE_ENDED)
}
})
it('Should correctly send views change notification', async function () {
this.timeout(60000)
let localLastVideoViews = 0
let remoteLastVideoViews = 0
const liveVideoUUID = await createLiveWrapper()
await waitJobs(servers)
{
const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID)
const localSocket = getLiveNotificationSocket(servers[0].url)
localSocket.on('views-change', data => { localLastVideoViews = data.views })
localSocket.emit('subscribe', { videoId })
}
{
const videoId = await getVideoIdFromUUID(servers[1].url, liveVideoUUID)
const remoteSocket = getLiveNotificationSocket(servers[1].url)
remoteSocket.on('views-change', data => { remoteLastVideoViews = data.views })
remoteSocket.emit('subscribe', { videoId })
}
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID)
await waitUntilLivePublishedOnAllServers(liveVideoUUID)
await waitJobs(servers)
expect(localLastVideoViews).to.equal(0)
expect(remoteLastVideoViews).to.equal(0)
await viewVideo(servers[0].url, liveVideoUUID)
await viewVideo(servers[1].url, liveVideoUUID)
await waitJobs(servers)
await wait(5000)
await waitJobs(servers)
expect(localLastVideoViews).to.equal(2)
expect(remoteLastVideoViews).to.equal(2)
await stopFfmpeg(command)
})
it('Should not receive a notification after unsubscribe', async function () {
this.timeout(60000)
const stateChanges: VideoState[] = []
const liveVideoUUID = await createLiveWrapper()
await waitJobs(servers)
const videoId = await getVideoIdFromUUID(servers[0].url, liveVideoUUID)
const socket = getLiveNotificationSocket(servers[0].url)
socket.on('state-change', data => stateChanges.push(data.state))
socket.emit('subscribe', { videoId })
const command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoUUID)
await waitUntilLivePublishedOnAllServers(liveVideoUUID)
await waitJobs(servers)
expect(stateChanges).to.have.lengthOf(1)
socket.emit('unsubscribe', { videoId })
await stopFfmpeg(command)
await waitJobs(servers)
expect(stateChanges).to.have.lengthOf(1)
})
})
describe('After a server restart', function () {
let liveVideoId: string
let liveVideoReplayId: string

View File

@ -175,6 +175,12 @@ async function waitUntilLiveSaved (url: string, token: string, videoId: number |
} while (video.isLive === true && video.state.id !== VideoState.PUBLISHED)
}
async function waitUntilLivePublishedOnAllServers (servers: ServerInfo[], videoId: string) {
for (const server of servers) {
await waitUntilLivePublished(server.url, server.accessToken, videoId)
}
}
async function checkLiveCleanup (server: ServerInfo, videoUUID: string, resolutions: number[] = []) {
const basePath = buildServerDirectory(server, 'streaming-playlists')
const hlsPath = join(basePath, 'hls', videoUUID)
@ -226,6 +232,7 @@ export {
sendRTMPStreamInVideo,
waitUntilLiveEnded,
waitFfmpegUntilError,
waitUntilLivePublishedOnAllServers,
sendRTMPStream,
testFfmpegStreamError
}