Live supports object storage

* Sync live files (segments, master playlist, resolution playlist,
   segment sha file) into object storage
 * Automatically delete them when the live ends
 * Segment sha file is now a file on disk, and not stored in memory
   anymore
pull/5338/head
Chocobozzz 2022-10-04 10:03:17 +02:00
parent 9c0cdc5047
commit cfd57d2ca0
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
21 changed files with 615 additions and 307 deletions

View File

@ -102,7 +102,6 @@ import {
wellKnownRouter,
lazyStaticRouter,
servicesRouter,
liveRouter,
pluginsRouter,
webfingerRouter,
trackerRouter,
@ -221,9 +220,6 @@ app.use(apiRoute, apiRouter)
// Services (oembed...)
app.use('/services', servicesRouter)
// Live streaming
app.use('/live', liveRouter)
// Plugins & themes
app.use('/', pluginsRouter)

View File

@ -6,7 +6,6 @@ export * from './feeds'
export * from './services'
export * from './static'
export * from './lazy-static'
export * from './live'
export * from './misc'
export * from './webfinger'
export * from './tracker'

View File

@ -1,32 +0,0 @@
import cors from 'cors'
import express from 'express'
import { mapToJSON } from '@server/helpers/core-utils'
import { LiveSegmentShaStore } from '@server/lib/live'
import { HttpStatusCode } from '@shared/models'
const liveRouter = express.Router()
liveRouter.use('/segments-sha256/:videoUUID',
cors(),
getSegmentsSha256
)
// ---------------------------------------------------------------------------
export {
liveRouter
}
// ---------------------------------------------------------------------------
function getSegmentsSha256 (req: express.Request, res: express.Response) {
const videoUUID = req.params.videoUUID
const result = LiveSegmentShaStore.Instance.getSegmentsSha256(videoUUID)
if (!result) {
return res.status(HttpStatusCode.NOT_FOUND_404).end()
}
return res.json(mapToJSON(result))
}

View File

@ -15,7 +15,7 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, REQUEST_TIMEOUTS } from '../initializers
import { sequelizeTypescript } from '../initializers/database'
import { VideoFileModel } from '../models/video/video-file'
import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist'
import { storeHLSFile } from './object-storage'
import { storeHLSFileFromFilename } from './object-storage'
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths'
import { VideoPathManager } from './video-path-manager'
@ -95,7 +95,7 @@ function updateMasterHLSPlaylist (video: MVideo, playlistArg: MStreamingPlaylist
await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n')
if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
playlist.playlistUrl = await storeHLSFile(playlist, playlist.playlistFilename)
playlist.playlistUrl = await storeHLSFileFromFilename(playlist, playlist.playlistFilename)
await remove(masterPlaylistPath)
}
@ -146,7 +146,7 @@ function updateSha256VODSegments (video: MVideo, playlistArg: MStreamingPlaylist
await outputJSON(outputPath, json)
if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
playlist.segmentsSha256Url = await storeHLSFile(playlist, playlist.segmentsSha256Filename)
playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlist, playlist.segmentsSha256Filename)
await remove(outputPath)
}

View File

@ -5,7 +5,7 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { updateTorrentMetadata } from '@server/helpers/webtorrent'
import { CONFIG } from '@server/initializers/config'
import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage'
import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage'
import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
import { VideoModel } from '@server/models/video/video'
@ -88,10 +88,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
// Resolution playlist
const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
await storeHLSFile(playlistWithVideo, playlistFilename)
await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
// Resolution fragmented file
const fileUrl = await storeHLSFile(playlistWithVideo, file.filename)
const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
const oldPath = join(getHLSDirectory(video), file.filename)
@ -113,9 +113,9 @@ async function doAfterLastJob (options: {
const playlistWithVideo = playlist.withVideo(video)
// Master playlist
playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename)
playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
// Sha256 segments file
playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename)
playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
playlist.storage = VideoStorage.OBJECT_STORAGE

View File

@ -4,7 +4,7 @@ import { join } from 'path'
import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
import { cleanupPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
import { generateVideoMiniature } from '@server/lib/thumbnail'
import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
@ -141,23 +141,22 @@ async function replaceLiveByReplay (options: {
}) {
const { video, liveSession, live, permanentLive, replayDirectory } = options
await cleanupTMPLiveFiles(video)
const videoWithFiles = await VideoModel.loadFull(video.id)
const hlsPlaylist = videoWithFiles.getHLSPlaylist()
await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist)
await live.destroy()
video.isLive = false
video.waitTranscoding = true
video.state = VideoState.TO_TRANSCODE
videoWithFiles.isLive = false
videoWithFiles.waitTranscoding = true
videoWithFiles.state = VideoState.TO_TRANSCODE
await video.save()
await videoWithFiles.save()
liveSession.replayVideoId = video.id
liveSession.replayVideoId = videoWithFiles.id
await liveSession.save()
// Remove old HLS playlist video files
const videoWithFiles = await VideoModel.loadFull(video.id)
const hlsPlaylist = videoWithFiles.getHLSPlaylist()
await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
// Reset playlist
@ -234,7 +233,7 @@ async function cleanupLiveAndFederate (options: {
if (streamingPlaylist) {
if (permanentLive) {
await cleanupPermanentLive(video, streamingPlaylist)
await cleanupAndDestroyPermanentLive(video, streamingPlaylist)
} else {
await cleanupUnsavedNormalLive(video, streamingPlaylist)
}

View File

@ -21,14 +21,14 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
import { pick, wait } from '@shared/core-utils'
import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models'
import { federateVideoIfNeeded } from '../activitypub/videos'
import { JobQueue } from '../job-queue'
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
import { PeerTubeSocket } from '../peertube-socket'
import { Hooks } from '../plugins/hooks'
import { LiveQuotaStore } from './live-quota-store'
import { cleanupPermanentLive } from './live-utils'
import { cleanupAndDestroyPermanentLive } from './live-utils'
import { MuxingSession } from './shared'
const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
@ -224,7 +224,7 @@ class LiveManager {
if (oldStreamingPlaylist) {
if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
await cleanupPermanentLive(video, oldStreamingPlaylist)
await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
}
this.videoSessions.set(video.id, sessionId)
@ -301,7 +301,7 @@ class LiveManager {
...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
})
muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags))
muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
muxingSession.on('bad-socket-health', ({ videoId }) => {
logger.error(
@ -485,6 +485,10 @@ class LiveManager {
playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions)
playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
? VideoStorage.OBJECT_STORAGE
: VideoStorage.FILE_SYSTEM
return playlist.save()
}

View File

@ -1,62 +1,73 @@
import { writeJson } from 'fs-extra'
import { basename } from 'path'
import { mapToJSON } from '@server/helpers/core-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { MStreamingPlaylistVideo } from '@server/types/models'
import { buildSha256Segment } from '../hls'
import { storeHLSFileFromPath } from '../object-storage'
const lTags = loggerTagsFactory('live')
class LiveSegmentShaStore {
private static instance: LiveSegmentShaStore
private readonly segmentsSha256 = new Map<string, string>()
private readonly segmentsSha256 = new Map<string, Map<string, string>>()
private readonly videoUUID: string
private readonly sha256Path: string
private readonly streamingPlaylist: MStreamingPlaylistVideo
private readonly sendToObjectStorage: boolean
private constructor () {
constructor (options: {
videoUUID: string
sha256Path: string
streamingPlaylist: MStreamingPlaylistVideo
sendToObjectStorage: boolean
}) {
this.videoUUID = options.videoUUID
this.sha256Path = options.sha256Path
this.streamingPlaylist = options.streamingPlaylist
this.sendToObjectStorage = options.sendToObjectStorage
}
getSegmentsSha256 (videoUUID: string) {
return this.segmentsSha256.get(videoUUID)
}
async addSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
async addSegmentSha (segmentPath: string) {
logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID))
const shaResult = await buildSha256Segment(segmentPath)
if (!this.segmentsSha256.has(videoUUID)) {
this.segmentsSha256.set(videoUUID, new Map())
}
const segmentName = basename(segmentPath)
this.segmentsSha256.set(segmentName, shaResult)
const filesMap = this.segmentsSha256.get(videoUUID)
filesMap.set(segmentName, shaResult)
await this.writeToDisk()
}
removeSegmentSha (videoUUID: string, segmentPath: string) {
async removeSegmentSha (segmentPath: string) {
const segmentName = basename(segmentPath)
logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID))
logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
const filesMap = this.segmentsSha256.get(videoUUID)
if (!filesMap) {
logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
if (!this.segmentsSha256.has(segmentName)) {
logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID))
return
}
if (!filesMap.has(segmentName)) {
logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
return
this.segmentsSha256.delete(segmentName)
await this.writeToDisk()
}
private async writeToDisk () {
await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256))
if (this.sendToObjectStorage) {
const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path)
if (this.streamingPlaylist.segmentsSha256Url !== url) {
this.streamingPlaylist.segmentsSha256Url = url
await this.streamingPlaylist.save()
}
}
filesMap.delete(segmentName)
}
cleanupShaSegments (videoUUID: string) {
this.segmentsSha256.delete(videoUUID)
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
export {

View File

@ -1,9 +1,10 @@
import { pathExists, readdir, remove } from 'fs-extra'
import { basename, join } from 'path'
import { logger } from '@server/helpers/logger'
import { MStreamingPlaylist, MVideo } from '@server/types/models'
import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
import { VideoStorage } from '@shared/models'
import { listHLSFileKeysOf, removeHLSFileObjectStorage, removeHLSObjectStorage } from '../object-storage'
import { getLiveDirectory } from '../paths'
import { LiveSegmentShaStore } from './live-segment-sha-store'
function buildConcatenatedName (segmentOrPlaylistPath: string) {
const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
@ -11,8 +12,8 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
return 'concat-' + num[1] + '.ts'
}
async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
await cleanupTMPLiveFiles(video)
async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
await cleanupTMPLiveFiles(video, streamingPlaylist)
await streamingPlaylist.destroy()
}
@ -20,32 +21,51 @@ async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamin
async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
const hlsDirectory = getLiveDirectory(video)
// We uploaded files to object storage too, remove them
if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
await removeHLSObjectStorage(streamingPlaylist.withVideo(video))
}
await remove(hlsDirectory)
await streamingPlaylist.destroy()
LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
}
async function cleanupTMPLiveFiles (video: MVideo) {
const hlsDirectory = getLiveDirectory(video)
async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video))
LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
await cleanupTMPLiveFilesFromFilesystem(video)
}
export {
cleanupAndDestroyPermanentLive,
cleanupUnsavedNormalLive,
cleanupTMPLiveFiles,
buildConcatenatedName
}
// ---------------------------------------------------------------------------
function isTMPLiveFile (name: string) {
return name.endsWith('.ts') ||
name.endsWith('.m3u8') ||
name.endsWith('.json') ||
name.endsWith('.mpd') ||
name.endsWith('.m4s') ||
name.endsWith('.tmp')
}
async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) {
const hlsDirectory = getLiveDirectory(video)
if (!await pathExists(hlsDirectory)) return
logger.info('Cleanup TMP live files of %s.', hlsDirectory)
logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory)
const files = await readdir(hlsDirectory)
for (const filename of files) {
if (
filename.endsWith('.ts') ||
filename.endsWith('.m3u8') ||
filename.endsWith('.mpd') ||
filename.endsWith('.m4s') ||
filename.endsWith('.tmp')
) {
if (isTMPLiveFile(filename)) {
const p = join(hlsDirectory, filename)
remove(p)
@ -54,9 +74,14 @@ async function cleanupTMPLiveFiles (video: MVideo) {
}
}
export {
cleanupPermanentLive,
cleanupUnsavedNormalLive,
cleanupTMPLiveFiles,
buildConcatenatedName
async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) {
if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
const keys = await listHLSFileKeysOf(streamingPlaylist)
for (const key of keys) {
if (isTMPLiveFile(key)) {
await removeHLSFileObjectStorage(streamingPlaylist, key)
}
}
}

View File

@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
import { VideoFileModel } from '@server/models/video/video-file'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
import { VideoStorage } from '@shared/models'
import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
import { isAbleToUploadVideo } from '../../user'
@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils'
import memoizee = require('memoizee')
interface MuxingSessionEvents {
'master-playlist-created': (options: { videoId: number }) => void
'live-ready': (options: { videoId: number }) => void
'bad-socket-health': (options: { videoId: number }) => void
'duration-exceeded': (options: { videoId: number }) => void
@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter {
private readonly outDirectory: string
private readonly replayDirectory: string
private readonly liveSegmentShaStore: LiveSegmentShaStore
private readonly lTags: LoggerTagsFn
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
private tsWatcher: FSWatcher
private masterWatcher: FSWatcher
private m3u8Watcher: FSWatcher
private masterPlaylistCreated = false
private liveReady = false
private aborted = false
@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter {
this.outDirectory = getLiveDirectory(this.videoLive.Video)
this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
this.liveSegmentShaStore = new LiveSegmentShaStore({
videoUUID: this.videoLive.Video.uuid,
sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
streamingPlaylist: this.streamingPlaylist,
sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
})
this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
}
@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter {
logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
this.watchTSFiles()
this.watchMasterFile()
this.watchTSFiles()
this.watchM3U8File()
let ffmpegShellCommand: string
this.ffmpegCommand.on('start', cmdline => {
@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter {
setTimeout(() => {
// Wait latest segments generation, and close watchers
Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ])
Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
.then(() => {
// Process remaining segments hash
for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter {
private watchMasterFile () {
this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
this.masterWatcher.on('add', () => {
this.emit('master-playlist-created', { videoId: this.videoId })
this.masterWatcher.on('add', async () => {
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
try {
const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
this.streamingPlaylist.playlistUrl = url
await this.streamingPlaylist.save()
} catch (err) {
logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
}
}
this.masterPlaylistCreated = true
this.masterWatcher.close()
.catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
})
}
private watchM3U8File () {
this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
const onChangeOrAdd = async (m3u8Path: string) => {
if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
try {
await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
} catch (err) {
logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
}
}
this.m3u8Watcher.on('change', onChangeOrAdd)
}
private watchTSFiles () {
const startStreamDateTime = new Date().getTime()
@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter {
}
}
const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath)
const deleteHandler = async (segmentPath: string) => {
try {
await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
} catch (err) {
logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
}
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
try {
await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath)
} catch (err) {
logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
}
}
}
this.tsWatcher.on('add', p => addHandler(p))
this.tsWatcher.on('unlink', p => deleteHandler(p))
@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter {
extname: '.ts',
infoHash: null,
fps: this.fps,
storage: this.streamingPlaylist.storage,
videoStreamingPlaylistId: this.streamingPlaylist.id
})
@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter {
}
private processSegments (segmentPaths: string[]) {
mapSeries(segmentPaths, async previousSegment => {
// Add sha hash of previous segments, because ffmpeg should have finished generating them
await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment)
mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
.catch(err => {
if (this.aborted) return
if (this.saveReplay) {
await this.addSegmentToReplay(previousSegment)
logger.error('Cannot process segments', { err, ...this.lTags() })
})
}
private async processSegment (segmentPath: string) {
// Add sha hash of previous segments, because ffmpeg should have finished generating them
await this.liveSegmentShaStore.addSegmentSha(segmentPath)
if (this.saveReplay) {
await this.addSegmentToReplay(segmentPath)
}
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
try {
await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
} catch (err) {
logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
}
}).catch(err => {
if (this.aborted) return
}
logger.error('Cannot process segments', { err, ...this.lTags() })
})
// Master playlist and segment JSON file are created, live is ready
if (this.masterPlaylistCreated && !this.liveReady) {
this.liveReady = true
this.emit('live-ready', { videoId: this.videoId })
}
}
private hasClientSocketInBadHealth (sessionId: string) {

View File

@ -22,6 +22,24 @@ type BucketInfo = {
PREFIX?: string
}
async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
const s3Client = getClient()
const commandPrefix = bucketInfo.PREFIX + prefix
const listCommand = new ListObjectsV2Command({
Bucket: bucketInfo.BUCKET_NAME,
Prefix: commandPrefix
})
const listedObjects = await s3Client.send(listCommand)
if (isArray(listedObjects.Contents) !== true) return []
return listedObjects.Contents.map(c => c.Key)
}
// ---------------------------------------------------------------------------
async function storeObject (options: {
inputPath: string
objectStorageKey: string
@ -36,6 +54,8 @@ async function storeObject (options: {
return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
}
// ---------------------------------------------------------------------------
async function removeObject (filename: string, bucketInfo: BucketInfo) {
const command = new DeleteObjectCommand({
Bucket: bucketInfo.BUCKET_NAME,
@ -89,6 +109,8 @@ async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
}
// ---------------------------------------------------------------------------
async function makeAvailable (options: {
key: string
destination: string
@ -122,7 +144,8 @@ export {
storeObject,
removeObject,
removePrefix,
makeAvailable
makeAvailable,
listKeysOfPrefix
}
// ---------------------------------------------------------------------------

View File

@ -1,19 +1,35 @@
import { join } from 'path'
import { basename, join } from 'path'
import { logger } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MStreamingPlaylistVideo, MVideoFile } from '@server/types/models'
import { getHLSDirectory } from '../paths'
import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys'
import { lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared'
import { listKeysOfPrefix, lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared'
function storeHLSFile (playlist: MStreamingPlaylistVideo, filename: string, path?: string) {
function listHLSFileKeysOf (playlist: MStreamingPlaylistVideo) {
return listKeysOfPrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
}
// ---------------------------------------------------------------------------
function storeHLSFileFromFilename (playlist: MStreamingPlaylistVideo, filename: string) {
return storeObject({
inputPath: path ?? join(getHLSDirectory(playlist.Video), filename),
inputPath: join(getHLSDirectory(playlist.Video), filename),
objectStorageKey: generateHLSObjectStorageKey(playlist, filename),
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
})
}
function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) {
return storeObject({
inputPath: path,
objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
})
}
// ---------------------------------------------------------------------------
function storeWebTorrentFile (filename: string) {
return storeObject({
inputPath: join(CONFIG.STORAGE.VIDEOS_DIR, filename),
@ -22,6 +38,8 @@ function storeWebTorrentFile (filename: string) {
})
}
// ---------------------------------------------------------------------------
function removeHLSObjectStorage (playlist: MStreamingPlaylistVideo) {
return removePrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
}
@ -30,10 +48,14 @@ function removeHLSFileObjectStorage (playlist: MStreamingPlaylistVideo, filename
return removeObject(generateHLSObjectStorageKey(playlist, filename), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
}
// ---------------------------------------------------------------------------
function removeWebTorrentObjectStorage (videoFile: MVideoFile) {
return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS)
}
// ---------------------------------------------------------------------------
async function makeHLSFileAvailable (playlist: MStreamingPlaylistVideo, filename: string, destination: string) {
const key = generateHLSObjectStorageKey(playlist, filename)
@ -62,9 +84,14 @@ async function makeWebTorrentFileAvailable (filename: string, destination: strin
return destination
}
// ---------------------------------------------------------------------------
export {
listHLSFileKeysOf,
storeWebTorrentFile,
storeHLSFile,
storeHLSFileFromFilename,
storeHLSFileFromPath,
removeHLSObjectStorage,
removeHLSFileObjectStorage,

View File

@ -245,21 +245,25 @@ export class VideoStreamingPlaylistModel extends Model<Partial<AttributesOnly<Vi
}
getMasterPlaylistUrl (video: MVideo) {
if (this.storage === VideoStorage.OBJECT_STORAGE) {
return getHLSPublicFileUrl(this.playlistUrl)
}
if (video.isOwned()) {
if (this.storage === VideoStorage.OBJECT_STORAGE) {
return getHLSPublicFileUrl(this.playlistUrl)
}
if (video.isOwned()) return WEBSERVER.URL + this.getMasterPlaylistStaticPath(video.uuid)
return WEBSERVER.URL + this.getMasterPlaylistStaticPath(video.uuid)
}
return this.playlistUrl
}
getSha256SegmentsUrl (video: MVideo) {
if (this.storage === VideoStorage.OBJECT_STORAGE) {
return getHLSPublicFileUrl(this.segmentsSha256Url)
}
if (video.isOwned()) {
if (this.storage === VideoStorage.OBJECT_STORAGE) {
return getHLSPublicFileUrl(this.segmentsSha256Url)
}
if (video.isOwned()) return WEBSERVER.URL + this.getSha256SegmentsStaticPath(video.uuid, video.isLive)
return WEBSERVER.URL + this.getSha256SegmentsStaticPath(video.uuid)
}
return this.segmentsSha256Url
}
@ -287,9 +291,7 @@ export class VideoStreamingPlaylistModel extends Model<Partial<AttributesOnly<Vi
return join(STATIC_PATHS.STREAMING_PLAYLISTS.HLS, videoUUID, this.playlistFilename)
}
private getSha256SegmentsStaticPath (videoUUID: string, isLive: boolean) {
if (isLive) return join('/live', 'segments-sha256', videoUUID)
private getSha256SegmentsStaticPath (videoUUID: string) {
return join(STATIC_PATHS.STREAMING_PLAYLISTS.HLS, videoUUID, this.segmentsSha256Filename)
}
}

View File

@ -59,7 +59,7 @@ describe('Fast restream in live', function () {
const video = await server.videos.get({ id: liveId })
expect(video.streamingPlaylists).to.have.lengthOf(1)
await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
await server.live.getSegmentFile({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200)
await makeRawRequest(video.streamingPlaylists[0].segmentsSha256Url, HttpStatusCode.OK_200)

View File

@ -3,7 +3,7 @@
import { expect } from 'chai'
import { basename, join } from 'path'
import { ffprobePromise, getVideoStream } from '@server/helpers/ffmpeg'
import { checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared'
import { testImage, testVideoResolutions } from '@server/tests/shared'
import { getAllFiles, wait } from '@shared/core-utils'
import {
HttpStatusCode,
@ -372,46 +372,6 @@ describe('Test live', function () {
return uuid
}
async function testVideoResolutions (liveVideoId: string, resolutions: number[]) {
for (const server of servers) {
const { data } = await server.videos.list()
expect(data.find(v => v.uuid === liveVideoId)).to.exist
const video = await server.videos.get({ id: liveVideoId })
expect(video.streamingPlaylists).to.have.lengthOf(1)
const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS)
expect(hlsPlaylist).to.exist
// Only finite files are displayed
expect(hlsPlaylist.files).to.have.lengthOf(0)
await checkResolutionsInMasterPlaylist({ server, playlistUrl: hlsPlaylist.playlistUrl, resolutions })
for (let i = 0; i < resolutions.length; i++) {
const segmentNum = 3
const segmentName = `${i}-00000${segmentNum}.ts`
await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
const subPlaylist = await servers[0].streamingPlaylists.get({
url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8`
})
expect(subPlaylist).to.contain(segmentName)
const baseUrlAndPath = servers[0].url + '/static/streaming-playlists/hls'
await checkLiveSegmentHash({
server,
baseUrlSegment: baseUrlAndPath,
videoUUID: video.uuid,
segmentName,
hlsPlaylist
})
}
}
}
function updateConf (resolutions: number[]) {
return servers[0].config.updateCustomSubConfig({
newConfig: {
@ -449,7 +409,14 @@ describe('Test live', function () {
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, [ 720 ])
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId,
resolutions: [ 720 ],
objectStorage: false,
transcoded: true
})
await stopFfmpeg(ffmpegCommand)
})
@ -477,7 +444,14 @@ describe('Test live', function () {
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, resolutions.concat([ 720 ]))
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId,
resolutions: resolutions.concat([ 720 ]),
objectStorage: false,
transcoded: true
})
await stopFfmpeg(ffmpegCommand)
})
@ -522,7 +496,14 @@ describe('Test live', function () {
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, resolutions)
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId,
resolutions,
objectStorage: false,
transcoded: true
})
await stopFfmpeg(ffmpegCommand)
await commands[0].waitUntilEnded({ videoId: liveVideoId })
@ -611,7 +592,14 @@ describe('Test live', function () {
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, resolutions)
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId,
resolutions,
objectStorage: false,
transcoded: true
})
await stopFfmpeg(ffmpegCommand)
await commands[0].waitUntilEnded({ videoId: liveVideoId })
@ -640,7 +628,14 @@ describe('Test live', function () {
await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
await waitJobs(servers)
await testVideoResolutions(liveVideoId, [ 720 ])
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId,
resolutions: [ 720 ],
objectStorage: false,
transcoded: true
})
await stopFfmpeg(ffmpegCommand)
await commands[0].waitUntilEnded({ videoId: liveVideoId })

View File

@ -1,9 +1,9 @@
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import { expect } from 'chai'
import { expectStartWith } from '@server/tests/shared'
import { expectStartWith, testVideoResolutions } from '@server/tests/shared'
import { areObjectStorageTestsDisabled } from '@shared/core-utils'
import { HttpStatusCode, LiveVideoCreate, VideoFile, VideoPrivacy } from '@shared/models'
import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models'
import {
createMultipleServers,
doubleFollow,
@ -35,41 +35,43 @@ async function createLive (server: PeerTubeServer, permanent: boolean) {
return uuid
}
async function checkFiles (files: VideoFile[]) {
for (const file of files) {
expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl())
async function checkFilesExist (servers: PeerTubeServer[], videoUUID: string, numberOfFiles: number) {
for (const server of servers) {
const video = await server.videos.get({ id: videoUUID })
await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
expect(video.files).to.have.lengthOf(0)
expect(video.streamingPlaylists).to.have.lengthOf(1)
const files = video.streamingPlaylists[0].files
expect(files).to.have.lengthOf(numberOfFiles)
for (const file of files) {
expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl())
await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
}
}
}
async function getFiles (server: PeerTubeServer, videoUUID: string) {
const video = await server.videos.get({ id: videoUUID })
async function checkFilesCleanup (server: PeerTubeServer, videoUUID: string, resolutions: number[]) {
const resolutionFiles = resolutions.map((_value, i) => `${i}.m3u8`)
expect(video.files).to.have.lengthOf(0)
expect(video.streamingPlaylists).to.have.lengthOf(1)
return video.streamingPlaylists[0].files
}
async function streamAndEnd (servers: PeerTubeServer[], liveUUID: string) {
const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveUUID })
await waitUntilLivePublishedOnAllServers(servers, liveUUID)
const videoLiveDetails = await servers[0].videos.get({ id: liveUUID })
const liveDetails = await servers[0].live.get({ videoId: liveUUID })
await stopFfmpeg(ffmpegCommand)
if (liveDetails.permanentLive) {
await waitUntilLiveWaitingOnAllServers(servers, liveUUID)
} else {
await waitUntilLiveReplacedByReplayOnAllServers(servers, liveUUID)
for (const playlistName of [ 'master.m3u8' ].concat(resolutionFiles)) {
await server.live.getPlaylistFile({
videoUUID,
playlistName,
expectedStatus: HttpStatusCode.NOT_FOUND_404,
objectStorage: true
})
}
await waitJobs(servers)
return { videoLiveDetails, liveDetails }
await server.live.getSegmentFile({
videoUUID,
playlistNumber: 0,
segment: 0,
objectStorage: true,
expectedStatus: HttpStatusCode.NOT_FOUND_404
})
}
describe('Object storage for lives', function () {
@ -100,57 +102,124 @@ describe('Object storage for lives', function () {
videoUUID = await createLive(servers[0], false)
})
it('Should create a live and save the replay on object storage', async function () {
it('Should create a live and publish it on object storage', async function () {
this.timeout(220000)
await streamAndEnd(servers, videoUUID)
const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
await waitUntilLivePublishedOnAllServers(servers, videoUUID)
for (const server of servers) {
const files = await getFiles(server, videoUUID)
expect(files).to.have.lengthOf(1)
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId: videoUUID,
resolutions: [ 720 ],
transcoded: false,
objectStorage: true
})
await checkFiles(files)
}
await stopFfmpeg(ffmpegCommand)
})
it('Should have saved the replay on object storage', async function () {
this.timeout(220000)
await waitUntilLiveReplacedByReplayOnAllServers(servers, videoUUID)
await waitJobs(servers)
await checkFilesExist(servers, videoUUID, 1)
})
it('Should have cleaned up live files from object storage', async function () {
await checkFilesCleanup(servers[0], videoUUID, [ 720 ])
})
})
describe('With live transcoding', async function () {
let videoUUIDPermanent: string
let videoUUIDNonPermanent: string
const resolutions = [ 720, 480, 360, 240, 144 ]
before(async function () {
await servers[0].config.enableLive({ transcoding: true })
videoUUIDPermanent = await createLive(servers[0], true)
videoUUIDNonPermanent = await createLive(servers[0], false)
})
it('Should create a live and save the replay on object storage', async function () {
this.timeout(240000)
describe('Normal replay', function () {
let videoUUIDNonPermanent: string
await streamAndEnd(servers, videoUUIDNonPermanent)
before(async function () {
videoUUIDNonPermanent = await createLive(servers[0], false)
})
for (const server of servers) {
const files = await getFiles(server, videoUUIDNonPermanent)
expect(files).to.have.lengthOf(5)
it('Should create a live and publish it on object storage', async function () {
this.timeout(240000)
await checkFiles(files)
}
const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUIDNonPermanent })
await waitUntilLivePublishedOnAllServers(servers, videoUUIDNonPermanent)
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId: videoUUIDNonPermanent,
resolutions,
transcoded: true,
objectStorage: true
})
await stopFfmpeg(ffmpegCommand)
})
it('Should have saved the replay on object storage', async function () {
this.timeout(220000)
await waitUntilLiveReplacedByReplayOnAllServers(servers, videoUUIDNonPermanent)
await waitJobs(servers)
await checkFilesExist(servers, videoUUIDNonPermanent, 5)
})
it('Should have cleaned up live files from object storage', async function () {
await checkFilesCleanup(servers[0], videoUUIDNonPermanent, resolutions)
})
})
it('Should create a live and save the replay of permanent live on object storage', async function () {
this.timeout(240000)
describe('Permanent replay', function () {
let videoUUIDPermanent: string
const { videoLiveDetails } = await streamAndEnd(servers, videoUUIDPermanent)
before(async function () {
videoUUIDPermanent = await createLive(servers[0], true)
})
const replay = await findExternalSavedVideo(servers[0], videoLiveDetails)
it('Should create a live and publish it on object storage', async function () {
this.timeout(240000)
for (const server of servers) {
const files = await getFiles(server, replay.uuid)
expect(files).to.have.lengthOf(5)
const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUIDPermanent })
await waitUntilLivePublishedOnAllServers(servers, videoUUIDPermanent)
await checkFiles(files)
}
await testVideoResolutions({
originServer: servers[0],
servers,
liveVideoId: videoUUIDPermanent,
resolutions,
transcoded: true,
objectStorage: true
})
await stopFfmpeg(ffmpegCommand)
})
it('Should have saved the replay on object storage', async function () {
this.timeout(220000)
await waitUntilLiveWaitingOnAllServers(servers, videoUUIDPermanent)
await waitJobs(servers)
const videoLiveDetails = await servers[0].videos.get({ id: videoUUIDPermanent })
const replay = await findExternalSavedVideo(servers[0], videoLiveDetails)
await checkFilesExist(servers, replay.uuid, 5)
})
it('Should have cleaned up live files from object storage', async function () {
await checkFilesCleanup(servers[0], videoUUIDPermanent, resolutions)
})
})
})

View File

@ -3,39 +3,92 @@
import { expect } from 'chai'
import { pathExists, readdir } from 'fs-extra'
import { join } from 'path'
import { LiveVideo } from '@shared/models'
import { PeerTubeServer } from '@shared/server-commands'
import { wait } from '@shared/core-utils'
import { LiveVideo, VideoStreamingPlaylistType } from '@shared/models'
import { ObjectStorageCommand, PeerTubeServer } from '@shared/server-commands'
import { checkLiveSegmentHash, checkResolutionsInMasterPlaylist } from './streaming-playlists'
async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, savedResolutions: number[] = []) {
let live: LiveVideo
try {
live = await server.live.get({ videoId: videoUUID })
} catch {}
const basePath = server.servers.buildDirectory('streaming-playlists')
const hlsPath = join(basePath, 'hls', videoUUID)
if (savedResolutions.length === 0) {
if (live?.permanentLive) {
expect(await pathExists(hlsPath)).to.be.true
const hlsFiles = await readdir(hlsPath)
expect(hlsFiles).to.have.lengthOf(1) // Only replays directory
const replayDir = join(hlsPath, 'replay')
expect(await pathExists(replayDir)).to.be.true
const replayFiles = await readdir(join(hlsPath, 'replay'))
expect(replayFiles).to.have.lengthOf(0)
} else {
expect(await pathExists(hlsPath)).to.be.false
}
return
return checkUnsavedLiveCleanup(server, videoUUID, hlsPath)
}
return checkSavedLiveCleanup(hlsPath, savedResolutions)
}
// ---------------------------------------------------------------------------
async function testVideoResolutions (options: {
originServer: PeerTubeServer
servers: PeerTubeServer[]
liveVideoId: string
resolutions: number[]
transcoded: boolean
objectStorage: boolean
}) {
const { originServer, servers, liveVideoId, resolutions, transcoded, objectStorage } = options
for (const server of servers) {
const { data } = await server.videos.list()
expect(data.find(v => v.uuid === liveVideoId)).to.exist
const video = await server.videos.get({ id: liveVideoId })
expect(video.streamingPlaylists).to.have.lengthOf(1)
const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS)
expect(hlsPlaylist).to.exist
expect(hlsPlaylist.files).to.have.lengthOf(0) // Only fragmented mp4 files are displayed
await checkResolutionsInMasterPlaylist({ server, playlistUrl: hlsPlaylist.playlistUrl, resolutions, transcoded })
if (objectStorage) {
expect(hlsPlaylist.playlistUrl).to.contain(ObjectStorageCommand.getPlaylistBaseUrl())
}
for (let i = 0; i < resolutions.length; i++) {
const segmentNum = 3
const segmentName = `${i}-00000${segmentNum}.ts`
await originServer.live.waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
const baseUrl = objectStorage
? ObjectStorageCommand.getPlaylistBaseUrl() + 'hls'
: originServer.url + '/static/streaming-playlists/hls'
if (objectStorage) {
// Playlist file upload
await wait(500)
expect(hlsPlaylist.segmentsSha256Url).to.contain(ObjectStorageCommand.getPlaylistBaseUrl())
}
const subPlaylist = await originServer.streamingPlaylists.get({ url: `${baseUrl}/${video.uuid}/${i}.m3u8` })
expect(subPlaylist).to.contain(segmentName)
await checkLiveSegmentHash({
server,
baseUrlSegment: baseUrl,
videoUUID: video.uuid,
segmentName,
hlsPlaylist
})
}
}
}
// ---------------------------------------------------------------------------
export {
checkLiveCleanup,
testVideoResolutions
}
// ---------------------------------------------------------------------------
async function checkSavedLiveCleanup (hlsPath: string, savedResolutions: number[] = []) {
const files = await readdir(hlsPath)
// fragmented file and playlist per resolution + master playlist + segments sha256 json file
@ -56,6 +109,27 @@ async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, save
expect(shaFile).to.exist
}
export {
checkLiveCleanup
async function checkUnsavedLiveCleanup (server: PeerTubeServer, videoUUID: string, hlsPath: string) {
let live: LiveVideo
try {
live = await server.live.get({ videoId: videoUUID })
} catch {}
if (live?.permanentLive) {
expect(await pathExists(hlsPath)).to.be.true
const hlsFiles = await readdir(hlsPath)
expect(hlsFiles).to.have.lengthOf(1) // Only replays directory
const replayDir = join(hlsPath, 'replay')
expect(await pathExists(replayDir)).to.be.true
const replayFiles = await readdir(join(hlsPath, 'replay'))
expect(replayFiles).to.have.lengthOf(0)
return
}
expect(await pathExists(hlsPath)).to.be.false
}

View File

@ -26,7 +26,7 @@ async function checkSegmentHash (options: {
const offset = parseInt(matches[2], 10)
const range = `${offset}-${offset + length - 1}`
const segmentBody = await command.getSegment({
const segmentBody = await command.getFragmentedSegment({
url: `${baseUrlSegment}/${videoName}`,
expectedStatus: HttpStatusCode.PARTIAL_CONTENT_206,
range: `bytes=${range}`
@ -46,7 +46,7 @@ async function checkLiveSegmentHash (options: {
const { server, baseUrlSegment, videoUUID, segmentName, hlsPlaylist } = options
const command = server.streamingPlaylists
const segmentBody = await command.getSegment({ url: `${baseUrlSegment}/${videoUUID}/${segmentName}` })
const segmentBody = await command.getFragmentedSegment({ url: `${baseUrlSegment}/${videoUUID}/${segmentName}` })
const shaBody = await command.getSegmentSha256({ url: hlsPlaylist.segmentsSha256Url })
expect(sha256(segmentBody)).to.equal(shaBody[segmentName])
@ -56,15 +56,16 @@ async function checkResolutionsInMasterPlaylist (options: {
server: PeerTubeServer
playlistUrl: string
resolutions: number[]
transcoded?: boolean // default true
}) {
const { server, playlistUrl, resolutions } = options
const { server, playlistUrl, resolutions, transcoded = true } = options
const masterPlaylist = await server.streamingPlaylists.get({ url: playlistUrl })
for (const resolution of resolutions) {
const reg = new RegExp(
'#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',(FRAME-RATE=\\d+,)?CODECS="avc1.64001f,mp4a.40.2"'
)
const reg = transcoded
? new RegExp('#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',(FRAME-RATE=\\d+,)?CODECS="avc1.64001f,mp4a.40.2"')
: new RegExp('#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + '')
expect(masterPlaylist).to.match(reg)
}

View File

@ -15,6 +15,7 @@ import {
VideoState
} from '@shared/models'
import { unwrapBody } from '../requests'
import { ObjectStorageCommand } from '../server'
import { AbstractCommand, OverrideCommandOptions } from '../shared'
import { sendRTMPStream, testFfmpegStreamError } from './live'
@ -34,6 +35,8 @@ export class LiveCommand extends AbstractCommand {
})
}
// ---------------------------------------------------------------------------
listSessions (options: OverrideCommandOptions & {
videoId: number | string
}) {
@ -70,6 +73,8 @@ export class LiveCommand extends AbstractCommand {
})
}
// ---------------------------------------------------------------------------
update (options: OverrideCommandOptions & {
videoId: number | string
fields: LiveVideoUpdate
@ -110,6 +115,8 @@ export class LiveCommand extends AbstractCommand {
return body.video
}
// ---------------------------------------------------------------------------
async sendRTMPStreamInVideo (options: OverrideCommandOptions & {
videoId: number | string
fixtureName?: string
@ -130,6 +137,8 @@ export class LiveCommand extends AbstractCommand {
return testFfmpegStreamError(command, options.shouldHaveError)
}
// ---------------------------------------------------------------------------
waitUntilPublished (options: OverrideCommandOptions & {
videoId: number | string
}) {
@ -163,25 +172,6 @@ export class LiveCommand extends AbstractCommand {
return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false)
}
getSegment (options: OverrideCommandOptions & {
videoUUID: string
playlistNumber: number
segment: number
}) {
const { playlistNumber, segment, videoUUID } = options
const segmentName = `${playlistNumber}-00000${segment}.ts`
const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}`
return this.getRawRequest({
...options,
url,
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
async waitUntilReplacedByReplay (options: OverrideCommandOptions & {
videoId: number | string
}) {
@ -194,6 +184,56 @@ export class LiveCommand extends AbstractCommand {
} while (video.isLive === true || video.state.id !== VideoState.PUBLISHED)
}
// ---------------------------------------------------------------------------
getSegmentFile (options: OverrideCommandOptions & {
videoUUID: string
playlistNumber: number
segment: number
objectStorage?: boolean // default false
}) {
const { playlistNumber, segment, videoUUID, objectStorage = false } = options
const segmentName = `${playlistNumber}-00000${segment}.ts`
const baseUrl = objectStorage
? ObjectStorageCommand.getPlaylistBaseUrl()
: `${this.server.url}/static/streaming-playlists/hls`
const url = `${baseUrl}/${videoUUID}/${segmentName}`
return this.getRawRequest({
...options,
url,
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
getPlaylistFile (options: OverrideCommandOptions & {
videoUUID: string
playlistName: string
objectStorage?: boolean // default false
}) {
const { playlistName, videoUUID, objectStorage = false } = options
const baseUrl = objectStorage
? ObjectStorageCommand.getPlaylistBaseUrl()
: `${this.server.url}/static/streaming-playlists/hls`
const url = `${baseUrl}/${videoUUID}/${playlistName}`
return this.getRawRequest({
...options,
url,
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
// ---------------------------------------------------------------------------
async countPlaylists (options: OverrideCommandOptions & {
videoUUID: string
}) {

View File

@ -16,7 +16,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand {
}))
}
getSegment (options: OverrideCommandOptions & {
getFragmentedSegment (options: OverrideCommandOptions & {
url: string
range?: string
}) {

View File

@ -145,7 +145,6 @@ info:
| `/api/*` |
| `/download/*` |
| `/lazy-static/*` |
| `/live/segments-sha256/*` |
| `/.well-known/webfinger` |
In addition, all routes serving ActivityPub are CORS-enabled for all origins.