mirror of https://github.com/Chocobozzz/PeerTube
				
				
				
			Fix high CPU with long live when save replay is true
							parent
							
								
									d605328a30
								
							
						
					
					
						commit
						937581b8f6
					
				| 
						 | 
				
			
			@ -190,12 +190,11 @@ async function getLiveTranscodingCommand (options: {
 | 
			
		|||
  outPath: string
 | 
			
		||||
  resolutions: number[]
 | 
			
		||||
  fps: number
 | 
			
		||||
  deleteSegments: boolean
 | 
			
		||||
 | 
			
		||||
  availableEncoders: AvailableEncoders
 | 
			
		||||
  profile: string
 | 
			
		||||
}) {
 | 
			
		||||
  const { rtmpUrl, outPath, resolutions, fps, deleteSegments, availableEncoders, profile } = options
 | 
			
		||||
  const { rtmpUrl, outPath, resolutions, fps, availableEncoders, profile } = options
 | 
			
		||||
  const input = rtmpUrl
 | 
			
		||||
 | 
			
		||||
  const command = getFFmpeg(input)
 | 
			
		||||
| 
						 | 
				
			
			@ -272,14 +271,14 @@ async function getLiveTranscodingCommand (options: {
 | 
			
		|||
    varStreamMap.push(`v:${i},a:${i}`)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  addDefaultLiveHLSParams(command, outPath, deleteSegments)
 | 
			
		||||
  addDefaultLiveHLSParams(command, outPath)
 | 
			
		||||
 | 
			
		||||
  command.outputOption('-var_stream_map', varStreamMap.join(' '))
 | 
			
		||||
 | 
			
		||||
  return command
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function getLiveMuxingCommand (rtmpUrl: string, outPath: string, deleteSegments: boolean) {
 | 
			
		||||
function getLiveMuxingCommand (rtmpUrl: string, outPath: string) {
 | 
			
		||||
  const command = getFFmpeg(rtmpUrl)
 | 
			
		||||
  command.inputOption('-fflags nobuffer')
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -288,17 +287,17 @@ function getLiveMuxingCommand (rtmpUrl: string, outPath: string, deleteSegments:
 | 
			
		|||
  command.outputOption('-map 0:a?')
 | 
			
		||||
  command.outputOption('-map 0:v?')
 | 
			
		||||
 | 
			
		||||
  addDefaultLiveHLSParams(command, outPath, deleteSegments)
 | 
			
		||||
  addDefaultLiveHLSParams(command, outPath)
 | 
			
		||||
 | 
			
		||||
  return command
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function hlsPlaylistToFragmentedMP4 (hlsDirectory: string, segmentFiles: string[], outputPath: string) {
 | 
			
		||||
  const concatFilePath = join(hlsDirectory, 'concat.txt')
 | 
			
		||||
async function hlsPlaylistToFragmentedMP4 (replayDirectory: string, segmentFiles: string[], outputPath: string) {
 | 
			
		||||
  const concatFilePath = join(replayDirectory, 'concat.txt')
 | 
			
		||||
 | 
			
		||||
  function cleaner () {
 | 
			
		||||
    remove(concatFilePath)
 | 
			
		||||
      .catch(err => logger.error('Cannot remove concat file in %s.', hlsDirectory, { err }))
 | 
			
		||||
      .catch(err => logger.error('Cannot remove concat file in %s.', replayDirectory, { err }))
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // First concat the ts files to a mp4 file
 | 
			
		||||
| 
						 | 
				
			
			@ -385,14 +384,10 @@ function addDefaultEncoderParams (options: {
 | 
			
		|||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function addDefaultLiveHLSParams (command: ffmpeg.FfmpegCommand, outPath: string, deleteSegments: boolean) {
 | 
			
		||||
function addDefaultLiveHLSParams (command: ffmpeg.FfmpegCommand, outPath: string) {
 | 
			
		||||
  command.outputOption('-hls_time ' + VIDEO_LIVE.SEGMENT_TIME_SECONDS)
 | 
			
		||||
  command.outputOption('-hls_list_size ' + VIDEO_LIVE.SEGMENTS_LIST_SIZE)
 | 
			
		||||
 | 
			
		||||
  if (deleteSegments === true) {
 | 
			
		||||
    command.outputOption('-hls_flags delete_segments')
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  command.outputOption('-hls_flags delete_segments')
 | 
			
		||||
  command.outputOption(`-hls_segment_filename ${join(outPath, '%v-%06d.ts')}`)
 | 
			
		||||
  command.outputOption('-master_pl_name master.m3u8')
 | 
			
		||||
  command.outputOption(`-f hls`)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -634,6 +634,7 @@ const VIDEO_LIVE = {
 | 
			
		|||
  CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes
 | 
			
		||||
  SEGMENT_TIME_SECONDS: 4, // 4 seconds
 | 
			
		||||
  SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist
 | 
			
		||||
  REPLAY_DIRECTORY: 'replay',
 | 
			
		||||
  EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4,
 | 
			
		||||
  RTMP: {
 | 
			
		||||
    CHUNK_SIZE: 60000,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,5 +1,5 @@
 | 
			
		|||
import * as Bull from 'bull'
 | 
			
		||||
import { readdir, remove } from 'fs-extra'
 | 
			
		||||
import { move, readdir, remove } from 'fs-extra'
 | 
			
		||||
import { join } from 'path'
 | 
			
		||||
import { hlsPlaylistToFragmentedMP4 } from '@server/helpers/ffmpeg-utils'
 | 
			
		||||
import { getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
 | 
			
		||||
| 
						 | 
				
			
			@ -14,6 +14,7 @@ import { VideoStreamingPlaylistModel } from '@server/models/video/video-streamin
 | 
			
		|||
import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models'
 | 
			
		||||
import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
 | 
			
		||||
import { logger } from '../../../helpers/logger'
 | 
			
		||||
import { VIDEO_LIVE } from '@server/initializers/constants'
 | 
			
		||||
 | 
			
		||||
async function processVideoLiveEnding (job: Bull.Job) {
 | 
			
		||||
  const payload = job.data as VideoLiveEndingPayload
 | 
			
		||||
| 
						 | 
				
			
			@ -53,24 +54,40 @@ export {
 | 
			
		|||
 | 
			
		||||
async function saveLive (video: MVideo, live: MVideoLive) {
 | 
			
		||||
  const hlsDirectory = getHLSDirectory(video, false)
 | 
			
		||||
  const files = await readdir(hlsDirectory)
 | 
			
		||||
  const replayDirectory = join(hlsDirectory, VIDEO_LIVE.REPLAY_DIRECTORY)
 | 
			
		||||
 | 
			
		||||
  const rootFiles = await readdir(hlsDirectory)
 | 
			
		||||
 | 
			
		||||
  const playlistFiles: string[] = []
 | 
			
		||||
 | 
			
		||||
  for (const file of rootFiles) {
 | 
			
		||||
    if (file.endsWith('.m3u8') !== true) continue
 | 
			
		||||
 | 
			
		||||
    await move(join(hlsDirectory, file), join(replayDirectory, file))
 | 
			
		||||
 | 
			
		||||
    if (file !== 'master.m3u8') {
 | 
			
		||||
      playlistFiles.push(file)
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  const replayFiles = await readdir(replayDirectory)
 | 
			
		||||
 | 
			
		||||
  const playlistFiles = files.filter(f => f.endsWith('.m3u8') && f !== 'master.m3u8')
 | 
			
		||||
  const resolutions: number[] = []
 | 
			
		||||
  let duration: number
 | 
			
		||||
 | 
			
		||||
  for (const playlistFile of playlistFiles) {
 | 
			
		||||
    const playlistPath = join(hlsDirectory, playlistFile)
 | 
			
		||||
    const playlistPath = join(replayDirectory, playlistFile)
 | 
			
		||||
    const { videoFileResolution } = await getVideoFileResolution(playlistPath)
 | 
			
		||||
 | 
			
		||||
    // Put the final mp4 in the hls directory, and not in the replay directory
 | 
			
		||||
    const mp4TmpPath = buildMP4TmpPath(hlsDirectory, videoFileResolution)
 | 
			
		||||
 | 
			
		||||
    // Playlist name is for example 3.m3u8
 | 
			
		||||
    // Segments names are 3-0.ts 3-1.ts etc
 | 
			
		||||
    const shouldStartWith = playlistFile.replace(/\.m3u8$/, '') + '-'
 | 
			
		||||
 | 
			
		||||
    const segmentFiles = files.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts'))
 | 
			
		||||
    await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpPath)
 | 
			
		||||
    const segmentFiles = replayFiles.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts'))
 | 
			
		||||
    await hlsPlaylistToFragmentedMP4(replayDirectory, segmentFiles, mp4TmpPath)
 | 
			
		||||
 | 
			
		||||
    if (!duration) {
 | 
			
		||||
      duration = await getDurationFromVideoFile(mp4TmpPath)
 | 
			
		||||
| 
						 | 
				
			
			@ -143,7 +160,8 @@ async function cleanupLiveFiles (hlsDirectory: string) {
 | 
			
		|||
      filename.endsWith('.m3u8') ||
 | 
			
		||||
      filename.endsWith('.mpd') ||
 | 
			
		||||
      filename.endsWith('.m4s') ||
 | 
			
		||||
      filename.endsWith('.tmp')
 | 
			
		||||
      filename.endsWith('.tmp') ||
 | 
			
		||||
      filename === VIDEO_LIVE.REPLAY_DIRECTORY
 | 
			
		||||
    ) {
 | 
			
		||||
      const p = join(hlsDirectory, filename)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,8 +1,8 @@
 | 
			
		|||
 | 
			
		||||
import * as chokidar from 'chokidar'
 | 
			
		||||
import { FfmpegCommand } from 'fluent-ffmpeg'
 | 
			
		||||
import { ensureDir, stat } from 'fs-extra'
 | 
			
		||||
import { basename } from 'path'
 | 
			
		||||
import { copy, ensureDir, stat } from 'fs-extra'
 | 
			
		||||
import { basename, join } from 'path'
 | 
			
		||||
import { isTestInstance } from '@server/helpers/core-utils'
 | 
			
		||||
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
 | 
			
		||||
import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
 | 
			
		||||
| 
						 | 
				
			
			@ -25,6 +25,7 @@ import { getHLSDirectory } from './video-paths'
 | 
			
		|||
import { availableEncoders } from './video-transcoding-profiles'
 | 
			
		||||
 | 
			
		||||
import memoizee = require('memoizee')
 | 
			
		||||
import { mkdir } from 'fs'
 | 
			
		||||
const NodeRtmpServer = require('node-media-server/node_rtmp_server')
 | 
			
		||||
const context = require('node-media-server/node_core_ctx')
 | 
			
		||||
const nodeMediaServerLogger = require('node-media-server/node_core_logger')
 | 
			
		||||
| 
						 | 
				
			
			@ -261,8 +262,13 @@ class LiveManager {
 | 
			
		|||
    const outPath = getHLSDirectory(videoLive.Video)
 | 
			
		||||
    await ensureDir(outPath)
 | 
			
		||||
 | 
			
		||||
    const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
 | 
			
		||||
 | 
			
		||||
    if (videoLive.saveReplay === true) {
 | 
			
		||||
      await ensureDir(replayDirectory)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const videoUUID = videoLive.Video.uuid
 | 
			
		||||
    const deleteSegments = videoLive.saveReplay === false
 | 
			
		||||
 | 
			
		||||
    const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
 | 
			
		||||
      ? await getLiveTranscodingCommand({
 | 
			
		||||
| 
						 | 
				
			
			@ -270,11 +276,10 @@ class LiveManager {
 | 
			
		|||
        outPath,
 | 
			
		||||
        resolutions: allResolutions,
 | 
			
		||||
        fps,
 | 
			
		||||
        deleteSegments,
 | 
			
		||||
        availableEncoders,
 | 
			
		||||
        profile: 'default'
 | 
			
		||||
      })
 | 
			
		||||
      : getLiveMuxingCommand(rtmpUrl, outPath, deleteSegments)
 | 
			
		||||
      : getLiveMuxingCommand(rtmpUrl, outPath)
 | 
			
		||||
 | 
			
		||||
    logger.info('Running live muxing/transcoding for %s.', videoUUID)
 | 
			
		||||
    this.transSessions.set(sessionId, ffmpegExec)
 | 
			
		||||
| 
						 | 
				
			
			@ -284,11 +289,18 @@ class LiveManager {
 | 
			
		|||
    const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
 | 
			
		||||
    const playlistIdMatcher = /^([\d+])-/
 | 
			
		||||
 | 
			
		||||
    const processHashSegments = (segmentsToProcess: string[]) => {
 | 
			
		||||
    const processSegments = (segmentsToProcess: string[]) => {
 | 
			
		||||
      // Add sha hash of previous segments, because ffmpeg should have finished generating them
 | 
			
		||||
      for (const previousSegment of segmentsToProcess) {
 | 
			
		||||
        this.addSegmentSha(videoUUID, previousSegment)
 | 
			
		||||
          .catch(err => logger.error('Cannot add sha segment of video %s -> %s.', videoUUID, previousSegment, { err }))
 | 
			
		||||
 | 
			
		||||
        if (videoLive.saveReplay) {
 | 
			
		||||
          const segmentName = basename(previousSegment)
 | 
			
		||||
 | 
			
		||||
          copy(previousSegment, join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY, segmentName))
 | 
			
		||||
            .catch(err => logger.error('Cannot copy segment %s to repay directory.', previousSegment, { err }))
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -298,7 +310,7 @@ class LiveManager {
 | 
			
		|||
      const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
 | 
			
		||||
 | 
			
		||||
      const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || []
 | 
			
		||||
      processHashSegments(segmentsToProcess)
 | 
			
		||||
      processSegments(segmentsToProcess)
 | 
			
		||||
 | 
			
		||||
      segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -369,7 +381,7 @@ class LiveManager {
 | 
			
		|||
        .then(() => {
 | 
			
		||||
          // Process remaining segments hash
 | 
			
		||||
          for (const key of Object.keys(segmentsToProcessPerPlaylist)) {
 | 
			
		||||
            processHashSegments(segmentsToProcessPerPlaylist[key])
 | 
			
		||||
            processSegments(segmentsToProcessPerPlaylist[key])
 | 
			
		||||
          }
 | 
			
		||||
        })
 | 
			
		||||
        .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue