PeerTube/server/lib/schedulers/videos-redundancy-scheduler.ts

359 lines
13 KiB
TypeScript
Raw Normal View History

import { move } from 'fs-extra'
2020-11-20 11:21:08 +01:00
import { join } from 'path'
import { getServerActor } from '@server/models/application/application'
2021-02-18 10:15:11 +01:00
import { TrackerModel } from '@server/models/server/tracker'
2020-11-20 11:21:08 +01:00
import { VideoModel } from '@server/models/video/video'
2019-08-15 11:53:26 +02:00
import {
2020-11-20 11:21:08 +01:00
MStreamingPlaylistFiles,
2019-08-15 11:53:26 +02:00
MVideoAccountLight,
MVideoFile,
MVideoFileVideo,
MVideoRedundancyFileVideo,
MVideoRedundancyStreamingPlaylistVideo,
MVideoRedundancyVideo,
MVideoWithAllFiles
2020-06-18 10:45:25 +02:00
} from '@server/types/models'
2020-11-20 11:21:08 +01:00
import { VideosRedundancyStrategy } from '../../../shared/models/redundancy'
import { logger } from '../../helpers/logger'
import { downloadWebTorrentVideo, generateMagnetUri } from '../../helpers/webtorrent'
import { CONFIG } from '../../initializers/config'
import { HLS_REDUNDANCY_DIRECTORY, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants'
2020-11-20 11:21:08 +01:00
import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url'
2021-06-02 15:47:05 +02:00
import { getOrCreateAPVideo } from '../activitypub/videos'
2020-11-20 11:21:08 +01:00
import { downloadPlaylistSegments } from '../hls'
import { removeVideoRedundancy } from '../redundancy'
Add support for saving video files to object storage (#4290) * Add support for saving video files to object storage * Add support for custom url generation on s3 stored files Uses two config keys to support url generation that doesn't directly go to (compatible s3). Can be used to generate urls to any cache server or CDN. * Upload files to s3 concurrently and delete originals afterwards * Only publish after move to object storage is complete * Use base url instead of url template * Fix mistyped config field * Add rudenmentary way to download before transcode * Implement Chocobozzz suggestions https://github.com/Chocobozzz/PeerTube/pull/4290#issuecomment-891670478 The remarks in question: Try to use objectStorage prefix instead of s3 prefix for your function/variables/config names Prefer to use a tree for the config: s3.streaming_playlists_bucket -> object_storage.streaming_playlists.bucket Use uppercase for config: S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket -> OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET (maybe BUCKET_NAME instead of BUCKET) I suggest to rename moveJobsRunning to pendingMovingJobs (or better, create a dedicated videoJobInfo table with a pendingMove & videoId columns so we could also use this table to track pending transcoding jobs) https://github.com/Chocobozzz/PeerTube/pull/4290/files#diff-3e26d41ca4bda1de8e1747af70ca2af642abcc1e9e0bfb94239ff2165acfbde5R19 uses a string instead of an integer I think we should store the origin object storage URL in fileUrl, without base_url injection. Instead, inject the base_url at "runtime" so admins can easily change this configuration without running a script to update DB URLs * Import correct function * Support multipart upload * Remove import of node 15.0 module stream/promises * Extend maximum upload job length Using the same value as for redundancy downloading seems logical * Use dynamic part size for really large uploads Also adds very small part size for local testing * Fix decreasePendingMove query * Resolve various PR comments * Move to object storage after optimize * Make upload size configurable and increase default * Prune webtorrent files that are stored in object storage * Move files after transcoding jobs * Fix federation * Add video path manager * Support move to external storage job in client * Fix live object storage tests Co-authored-by: Chocobozzz <me@florianbigard.com>
2021-08-17 08:26:20 +02:00
import { generateHLSRedundancyUrl, generateWebTorrentRedundancyUrl } from '../video-urls'
2020-11-20 11:21:08 +01:00
import { AbstractScheduler } from './abstract-scheduler'
2019-01-29 08:37:25 +01:00
type CandidateToDuplicate = {
2020-01-31 16:56:52 +01:00
redundancy: VideosRedundancyStrategy
video: MVideoWithAllFiles
files: MVideoFile[]
2019-11-21 12:16:27 +01:00
streamingPlaylists: MStreamingPlaylistFiles[]
2019-08-15 11:53:26 +02:00
}
2019-08-20 13:52:49 +02:00
function isMVideoRedundancyFileVideo (
o: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo
): o is MVideoRedundancyFileVideo {
2019-08-15 11:53:26 +02:00
return !!(o as MVideoRedundancyFileVideo).VideoFile
2019-01-29 08:37:25 +01:00
}
2018-09-11 16:27:07 +02:00
export class VideosRedundancyScheduler extends AbstractScheduler {
2020-01-10 10:11:28 +01:00
private static instance: VideosRedundancyScheduler
2018-09-11 16:27:07 +02:00
protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
2018-09-11 16:27:07 +02:00
private constructor () {
super()
}
2020-01-10 10:11:28 +01:00
async createManualRedundancy (videoId: number) {
const videoToDuplicate = await VideoModel.loadWithFiles(videoId)
if (!videoToDuplicate) {
logger.warn('Video to manually duplicate %d does not exist anymore.', videoId)
return
}
return this.createVideoRedundancies({
video: videoToDuplicate,
redundancy: null,
files: videoToDuplicate.VideoFiles,
streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
})
}
protected async internalExecute () {
2019-01-29 08:37:25 +01:00
for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy)
2018-09-11 16:27:07 +02:00
try {
2019-01-29 08:37:25 +01:00
const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig)
2018-09-11 16:27:07 +02:00
if (!videoToDuplicate) continue
2019-01-29 08:37:25 +01:00
const candidateToDuplicate = {
video: videoToDuplicate,
redundancy: redundancyConfig,
files: videoToDuplicate.VideoFiles,
streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
}
2018-09-11 16:27:07 +02:00
2019-01-29 08:37:25 +01:00
await this.purgeCacheIfNeeded(candidateToDuplicate)
2019-01-29 08:37:25 +01:00
if (await this.isTooHeavy(candidateToDuplicate)) {
logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
2018-09-11 16:27:07 +02:00
continue
}
2019-01-29 08:37:25 +01:00
logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, redundancyConfig.strategy)
2018-09-11 16:27:07 +02:00
2019-01-29 08:37:25 +01:00
await this.createVideoRedundancies(candidateToDuplicate)
2018-09-11 16:27:07 +02:00
} catch (err) {
2019-01-29 08:37:25 +01:00
logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err })
2018-09-11 16:27:07 +02:00
}
}
await this.extendsLocalExpiration()
await this.purgeRemoteExpired()
}
static get Instance () {
return this.instance || (this.instance = new this())
}
private async extendsLocalExpiration () {
const expired = await VideoRedundancyModel.listLocalExpired()
for (const redundancyModel of expired) {
try {
2019-01-29 08:37:25 +01:00
const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
2020-01-10 10:11:28 +01:00
const candidate: CandidateToDuplicate = {
2019-01-29 08:37:25 +01:00
redundancy: redundancyConfig,
video: null,
files: [],
streamingPlaylists: []
}
// If the administrator disabled the redundancy or decreased the cache size, remove this redundancy instead of extending it
if (!redundancyConfig || await this.isTooHeavy(candidate)) {
logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy)
await removeVideoRedundancy(redundancyModel)
} else {
await this.extendsRedundancy(redundancyModel)
}
} catch (err) {
2019-01-29 08:37:25 +01:00
logger.error(
'Cannot extend or remove expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel),
{ err }
)
}
}
}
2018-09-11 16:27:07 +02:00
2019-08-15 11:53:26 +02:00
private async extendsRedundancy (redundancyModel: MVideoRedundancyVideo) {
const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
2019-01-29 08:37:25 +01:00
// Redundancy strategy disabled, remove our redundancy instead of extending expiration
2019-08-12 08:46:46 +02:00
if (!redundancy) {
await removeVideoRedundancy(redundancyModel)
return
}
2019-01-29 08:37:25 +01:00
await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
}
private async purgeRemoteExpired () {
const expired = await VideoRedundancyModel.listRemoteExpired()
2018-09-11 16:27:07 +02:00
for (const redundancyModel of expired) {
2018-09-11 16:27:07 +02:00
try {
await removeVideoRedundancy(redundancyModel)
2018-09-11 16:27:07 +02:00
} catch (err) {
logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel))
2018-09-11 16:27:07 +02:00
}
}
}
2020-01-10 10:11:28 +01:00
private findVideoToDuplicate (cache: VideosRedundancyStrategy) {
2018-09-14 11:05:38 +02:00
if (cache.strategy === 'most-views') {
return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
}
if (cache.strategy === 'trending') {
return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
}
2018-09-14 09:57:21 +02:00
2018-09-14 11:05:38 +02:00
if (cache.strategy === 'recently-added') {
2018-09-14 11:52:23 +02:00
const minViews = cache.minViews
2018-09-14 11:05:38 +02:00
return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
}
2018-09-11 16:27:07 +02:00
}
2019-01-29 08:37:25 +01:00
private async createVideoRedundancies (data: CandidateToDuplicate) {
const video = await this.loadAndRefreshVideo(data.video.url)
if (!video) {
logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url)
2018-09-11 16:27:07 +02:00
2019-01-29 08:37:25 +01:00
return
}
2019-01-29 08:37:25 +01:00
for (const file of data.files) {
const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id)
if (existingRedundancy) {
2019-01-29 08:37:25 +01:00
await this.extendsRedundancy(existingRedundancy)
2018-09-11 16:27:07 +02:00
continue
}
2019-01-29 08:37:25 +01:00
await this.createVideoFileRedundancy(data.redundancy, video, file)
}
for (const streamingPlaylist of data.streamingPlaylists) {
const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id)
if (existingRedundancy) {
await this.extendsRedundancy(existingRedundancy)
continue
}
2018-09-11 16:27:07 +02:00
2019-01-29 08:37:25 +01:00
await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist)
}
}
2018-09-11 16:27:07 +02:00
2020-01-10 10:11:28 +01:00
private async createVideoFileRedundancy (redundancy: VideosRedundancyStrategy | null, video: MVideoAccountLight, fileArg: MVideoFile) {
let strategy = 'manual'
let expiresOn: Date = null
if (redundancy) {
strategy = redundancy.strategy
expiresOn = this.buildNewExpiration(redundancy.minLifetime)
}
2019-08-15 11:53:26 +02:00
const file = fileArg as MVideoFileVideo
2019-01-29 08:37:25 +01:00
file.Video = video
2018-09-11 16:27:07 +02:00
2019-01-29 08:37:25 +01:00
const serverActor = await getServerActor()
2018-09-11 16:27:07 +02:00
2020-01-10 10:11:28 +01:00
logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy)
2018-09-11 16:27:07 +02:00
2021-02-18 10:15:11 +01:00
const trackerUrls = await TrackerModel.listUrlsByVideoId(video.id)
const magnetUri = generateMagnetUri(video, file, trackerUrls)
2018-09-11 16:27:07 +02:00
2019-01-29 08:37:25 +01:00
const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT)
2018-10-02 09:04:19 +02:00
const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, file.filename)
2019-08-22 11:03:50 +02:00
await move(tmpPath, destPath, { overwrite: true })
2019-01-29 08:37:25 +01:00
2019-08-15 11:53:26 +02:00
const createdModel: MVideoRedundancyFileVideo = await VideoRedundancyModel.create({
2020-01-10 10:11:28 +01:00
expiresOn,
2020-11-20 11:21:08 +01:00
url: getLocalVideoCacheFileActivityPubUrl(file),
fileUrl: generateWebTorrentRedundancyUrl(file),
2020-01-10 10:11:28 +01:00
strategy,
2019-01-29 08:37:25 +01:00
videoFileId: file.id,
actorId: serverActor.id
})
createdModel.VideoFile = file
await sendCreateCacheFile(serverActor, video, createdModel)
logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url)
}
2019-08-15 11:53:26 +02:00
private async createStreamingPlaylistRedundancy (
2020-01-10 10:11:28 +01:00
redundancy: VideosRedundancyStrategy,
2019-08-15 11:53:26 +02:00
video: MVideoAccountLight,
playlistArg: MStreamingPlaylistFiles
2019-08-15 11:53:26 +02:00
) {
2020-01-10 10:11:28 +01:00
let strategy = 'manual'
let expiresOn: Date = null
if (redundancy) {
strategy = redundancy.strategy
expiresOn = this.buildNewExpiration(redundancy.minLifetime)
}
const playlist = Object.assign(playlistArg, { Video: video })
2019-01-29 08:37:25 +01:00
const serverActor = await getServerActor()
2020-01-10 10:11:28 +01:00
logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy)
2019-01-29 08:37:25 +01:00
const destDirectory = join(HLS_REDUNDANCY_DIRECTORY, video.uuid)
2021-07-23 11:20:00 +02:00
const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video)
const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000
const toleranceKB = maxSizeKB + ((5 * maxSizeKB) / 100) // 5% more tolerance
await downloadPlaylistSegments(masterPlaylistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT, toleranceKB)
2019-01-29 08:37:25 +01:00
2019-08-15 11:53:26 +02:00
const createdModel: MVideoRedundancyStreamingPlaylistVideo = await VideoRedundancyModel.create({
2020-01-10 10:11:28 +01:00
expiresOn,
2020-11-20 11:21:08 +01:00
url: getLocalVideoCacheStreamingPlaylistActivityPubUrl(video, playlist),
fileUrl: generateHLSRedundancyUrl(video, playlistArg),
2020-01-10 10:11:28 +01:00
strategy,
2019-01-29 08:37:25 +01:00
videoStreamingPlaylistId: playlist.id,
actorId: serverActor.id
})
createdModel.VideoStreamingPlaylist = playlist
await sendCreateCacheFile(serverActor, video, createdModel)
2021-07-23 11:20:00 +02:00
logger.info('Duplicated playlist %s -> %s.', masterPlaylistUrl, createdModel.url)
2018-09-11 16:27:07 +02:00
}
2019-08-15 11:53:26 +02:00
private async extendsExpirationOf (redundancy: MVideoRedundancyVideo, expiresAfterMs: number) {
logger.info('Extending expiration of %s.', redundancy.url)
const serverActor = await getServerActor()
redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
await redundancy.save()
await sendUpdateCacheFile(serverActor, redundancy)
}
2019-01-29 08:37:25 +01:00
private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) {
2019-09-04 14:40:29 +02:00
while (await this.isTooHeavy(candidateToDuplicate)) {
2019-01-29 08:37:25 +01:00
const redundancy = candidateToDuplicate.redundancy
2019-08-15 11:53:26 +02:00
const toDelete = await VideoRedundancyModel.loadOldestLocalExpired(redundancy.strategy, redundancy.minLifetime)
if (!toDelete) return
2021-02-02 08:48:48 +01:00
const videoId = toDelete.VideoFile
? toDelete.VideoFile.videoId
: toDelete.VideoStreamingPlaylist.videoId
const redundancies = await VideoRedundancyModel.listLocalByVideoId(videoId)
for (const redundancy of redundancies) {
await removeVideoRedundancy(redundancy)
}
}
}
2019-01-29 08:37:25 +01:00
private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) {
const maxSize = candidateToDuplicate.redundancy.size
2018-09-11 16:27:07 +02:00
2021-05-26 09:44:16 +02:00
const { totalUsed } = await VideoRedundancyModel.getStats(candidateToDuplicate.redundancy.strategy)
const totalWillDuplicate = totalUsed + this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists)
2018-09-11 16:27:07 +02:00
2018-11-15 10:07:44 +01:00
return totalWillDuplicate > maxSize
2018-09-11 16:27:07 +02:00
}
private buildNewExpiration (expiresAfterMs: number) {
return new Date(Date.now() + expiresAfterMs)
2018-09-11 16:27:07 +02:00
}
2019-08-15 11:53:26 +02:00
private buildEntryLogId (object: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo) {
if (isMVideoRedundancyFileVideo(object)) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
2019-01-29 08:37:25 +01:00
2021-07-23 11:20:00 +02:00
return `${object.VideoStreamingPlaylist.getMasterPlaylistUrl(object.VideoStreamingPlaylist.Video)}`
2018-09-11 16:27:07 +02:00
}
private getTotalFileSizes (files: MVideoFile[], playlists: MStreamingPlaylistFiles[]): number {
2019-08-15 11:53:26 +02:00
const fileReducer = (previous: number, current: MVideoFile) => previous + current.size
2018-09-11 16:27:07 +02:00
2019-11-21 12:16:27 +01:00
let allFiles = files
for (const p of playlists) {
allFiles = allFiles.concat(p.VideoFiles)
}
2019-06-20 10:33:01 +02:00
2019-11-21 12:16:27 +01:00
return allFiles.reduce(fileReducer, 0)
2018-09-11 16:27:07 +02:00
}
private async loadAndRefreshVideo (videoUrl: string) {
// We need more attributes and check if the video still exists
const getVideoOptions = {
videoObject: videoUrl,
syncParam: { likes: false, dislikes: false, shares: false, comments: false, thumbnail: false, refreshVideo: true },
fetchType: 'all' as 'all'
}
2021-06-02 15:47:05 +02:00
const { video } = await getOrCreateAPVideo(getVideoOptions)
return video
}
2018-09-11 16:27:07 +02:00
}