Fix live RAM usage when ffmpeg is too slow

pull/3383/head
Chocobozzz 2021-01-27 10:51:03 +01:00
parent a4a8cd3971
commit 00b87c5791
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
2 changed files with 60 additions and 8 deletions

View File

@ -644,6 +644,7 @@ const VIDEO_LIVE = {
SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist
REPLAY_DIRECTORY: 'replay', REPLAY_DIRECTORY: 'replay',
EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4, EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4,
MAX_SOCKET_WAITING_DATA: 1024 * 1000 * 100, // 100MB
RTMP: { RTMP: {
CHUNK_SIZE: 60000, CHUNK_SIZE: 60000,
GOP_CACHE: true, GOP_CACHE: true,
@ -656,7 +657,8 @@ const VIDEO_LIVE = {
const MEMOIZE_TTL = { const MEMOIZE_TTL = {
OVERVIEWS_SAMPLE: 1000 * 3600 * 4, // 4 hours OVERVIEWS_SAMPLE: 1000 * 3600 * 4, // 4 hours
INFO_HASH_EXISTS: 1000 * 3600 * 12, // 12 hours INFO_HASH_EXISTS: 1000 * 3600 * 12, // 12 hours
LIVE_ABLE_TO_UPLOAD: 1000 * 60 // 1 minute LIVE_ABLE_TO_UPLOAD: 1000 * 60, // 1 minute
LIVE_CHECK_SOCKET_HEALTH: 1000 * 60 // 1 minute
} }
const MEMOIZE_LENGTH = { const MEMOIZE_LENGTH = {

View File

@ -3,6 +3,7 @@ import * as Bluebird from 'bluebird'
import * as chokidar from 'chokidar' import * as chokidar from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg' import { FfmpegCommand } from 'fluent-ffmpeg'
import { appendFile, ensureDir, readFile, stat } from 'fs-extra' import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
import { createServer, Server } from 'net'
import { basename, join } from 'path' import { basename, join } from 'path'
import { isTestInstance } from '@server/helpers/core-utils' import { isTestInstance } from '@server/helpers/core-utils'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
@ -27,8 +28,7 @@ import { getHLSDirectory } from './video-paths'
import { availableEncoders } from './video-transcoding-profiles' import { availableEncoders } from './video-transcoding-profiles'
import memoizee = require('memoizee') import memoizee = require('memoizee')
const NodeRtmpSession = require('node-media-server/node_rtmp_session')
const NodeRtmpServer = require('node-media-server/node_rtmp_server')
const context = require('node-media-server/node_core_ctx') const context = require('node-media-server/node_core_ctx')
const nodeMediaServerLogger = require('node-media-server/node_core_logger') const nodeMediaServerLogger = require('node-media-server/node_core_logger')
@ -63,7 +63,11 @@ class LiveManager {
return isAbleToUploadVideo(userId, 1000) return isAbleToUploadVideo(userId, 1000)
}, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
private rtmpServer: any private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => {
return this.hasClientSocketsInBadHealth(sessionId)
}, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
private rtmpServer: Server
private constructor () { private constructor () {
} }
@ -108,19 +112,31 @@ class LiveManager {
run () { run () {
logger.info('Running RTMP server on port %d', config.rtmp.port) logger.info('Running RTMP server on port %d', config.rtmp.port)
this.rtmpServer = new NodeRtmpServer(config) this.rtmpServer = createServer(socket => {
this.rtmpServer.tcpServer.on('error', err => { const session = new NodeRtmpSession(config, socket)
session.run()
})
this.rtmpServer.on('error', err => {
logger.error('Cannot run RTMP server.', { err }) logger.error('Cannot run RTMP server.', { err })
}) })
this.rtmpServer.run() this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
} }
stop () { stop () {
logger.info('Stopping RTMP server.') logger.info('Stopping RTMP server.')
this.rtmpServer.stop() this.rtmpServer.close()
this.rtmpServer = undefined this.rtmpServer = undefined
// Sessions is an object
this.getContext().sessions.forEach((session: any) => {
if (session instanceof NodeRtmpSession) {
session.stop()
}
})
} }
isRunning () { isRunning () {
@ -344,11 +360,21 @@ class LiveManager {
segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
logger.error(
'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
' Stopping session of video %s.', videoUUID)
this.stopSessionOf(videoLive.videoId)
return
}
// Duration constraint check // Duration constraint check
if (this.isDurationConstraintValid(startStreamDateTime) !== true) { if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
logger.info('Stopping session of %s: max duration exceeded.', videoUUID) logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
this.stopSessionOf(videoLive.videoId) this.stopSessionOf(videoLive.videoId)
return
} }
// Check user quota if the user enabled replay saving // Check user quota if the user enabled replay saving
@ -517,6 +543,30 @@ class LiveManager {
return now <= max return now <= max
} }
private hasClientSocketsInBadHealth (sessionId: string) {
const rtmpSession = this.getContext().sessions.get(sessionId)
if (!rtmpSession) {
logger.warn('Cannot get session %s to check players socket health.', sessionId)
return
}
for (const playerSessionId of rtmpSession.players) {
const playerSession = this.getContext().sessions.get(playerSessionId)
if (!playerSession) {
logger.error('Cannot get player session %s to check socket health.', playerSession)
continue
}
if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
return true
}
}
return false
}
private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) { private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
if (live.saveReplay !== true) return true if (live.saveReplay !== true) return true