mirror of https://github.com/Chocobozzz/PeerTube
Add recently added redundancy strategy
parent
780daa7e91
commit
3f6b6a565d
|
@ -77,6 +77,10 @@ redundancy:
|
|||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'trending' # Cache trending videos
|
||||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'recently-added' # Cache recently added videos
|
||||
# minViews: 10 # Having at least x views
|
||||
|
||||
cache:
|
||||
previews:
|
||||
|
|
|
@ -78,6 +78,10 @@ redundancy:
|
|||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'trending' # Cache trending videos
|
||||
# -
|
||||
# size: '10GB'
|
||||
# strategy: 'recently-added' # Cache recently added videos
|
||||
# minViews: 10 # Having at least x views
|
||||
|
||||
###############################################################################
|
||||
#
|
||||
|
|
|
@ -29,6 +29,10 @@ redundancy:
|
|||
-
|
||||
size: '100KB'
|
||||
strategy: 'trending'
|
||||
-
|
||||
size: '100KB'
|
||||
strategy: 'recently-added'
|
||||
minViews: 10
|
||||
|
||||
cache:
|
||||
previews:
|
||||
|
|
|
@ -171,5 +171,3 @@ function setRemoteVideoTruncatedContent (video: any) {
|
|||
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ import { parse } from 'url'
|
|||
import { CONFIG } from './constants'
|
||||
import { logger } from '../helpers/logger'
|
||||
import { getServerActor } from '../helpers/utils'
|
||||
import { VideosRedundancy } from '../../shared/models/redundancy'
|
||||
import { RecentlyAddedStrategy, VideosRedundancy } from '../../shared/models/redundancy'
|
||||
import { isArray } from '../helpers/custom-validators/misc'
|
||||
import { uniq } from 'lodash'
|
||||
|
||||
|
@ -34,24 +34,31 @@ async function checkActivityPubUrls () {
|
|||
function checkConfig () {
|
||||
const defaultNSFWPolicy = config.get<string>('instance.default_nsfw_policy')
|
||||
|
||||
// NSFW policy
|
||||
if ([ 'do_not_list', 'blur', 'display' ].indexOf(defaultNSFWPolicy) === -1) {
|
||||
return 'NSFW policy setting should be "do_not_list" or "blur" or "display" instead of ' + defaultNSFWPolicy
|
||||
}
|
||||
|
||||
// Redundancies
|
||||
const redundancyVideos = config.get<VideosRedundancy[]>('redundancy.videos')
|
||||
if (isArray(redundancyVideos)) {
|
||||
for (const r of redundancyVideos) {
|
||||
if ([ 'most-views', 'trending' ].indexOf(r.strategy) === -1) {
|
||||
if ([ 'most-views', 'trending', 'recently-added' ].indexOf(r.strategy) === -1) {
|
||||
return 'Redundancy video entries should have "most-views" strategy instead of ' + r.strategy
|
||||
}
|
||||
}
|
||||
|
||||
const filtered = uniq(redundancyVideos.map(r => r.strategy))
|
||||
if (filtered.length !== redundancyVideos.length) {
|
||||
return 'Redundancy video entries should have uniq strategies'
|
||||
return 'Redundancy video entries should have unique strategies'
|
||||
}
|
||||
}
|
||||
|
||||
const recentlyAddedStrategy = redundancyVideos.find(r => r.strategy === 'recently-added') as RecentlyAddedStrategy
|
||||
if (recentlyAddedStrategy && isNaN(recentlyAddedStrategy.minViews)) {
|
||||
return 'Min views in recently added strategy is not a number'
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { IConfig } from 'config'
|
||||
import { dirname, join } from 'path'
|
||||
import { JobType, VideoRateType, VideoRedundancyStrategy, VideoState, VideosRedundancy } from '../../shared/models'
|
||||
import { JobType, VideoRateType, VideoState, VideosRedundancy } from '../../shared/models'
|
||||
import { ActivityPubActorType } from '../../shared/models/activitypub'
|
||||
import { FollowState } from '../../shared/models/actors'
|
||||
import { VideoAbuseState, VideoImportState, VideoPrivacy } from '../../shared/models/videos'
|
||||
|
@ -741,15 +741,10 @@ function updateWebserverConfig () {
|
|||
CONFIG.WEBSERVER.HOST = sanitizeHost(CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT, REMOTE_SCHEME.HTTP)
|
||||
}
|
||||
|
||||
function buildVideosRedundancy (objs: { strategy: VideoRedundancyStrategy, size: string }[]): VideosRedundancy[] {
|
||||
function buildVideosRedundancy (objs: VideosRedundancy[]): VideosRedundancy[] {
|
||||
if (!objs) return []
|
||||
|
||||
return objs.map(obj => {
|
||||
return {
|
||||
strategy: obj.strategy,
|
||||
size: bytes.parse(obj.size)
|
||||
}
|
||||
})
|
||||
return objs.map(obj => Object.assign(obj, { size: bytes.parse(obj.size) }))
|
||||
}
|
||||
|
||||
function buildLanguages () {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { AbstractScheduler } from './abstract-scheduler'
|
||||
import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers'
|
||||
import { logger } from '../../helpers/logger'
|
||||
import { VideoRedundancyStrategy } from '../../../shared/models/redundancy'
|
||||
import { RecentlyAddedStrategy, VideoRedundancyStrategy, VideosRedundancy } from '../../../shared/models/redundancy'
|
||||
import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
|
||||
import { VideoFileModel } from '../../models/video/video-file'
|
||||
import { sortBy } from 'lodash'
|
||||
|
@ -32,16 +32,14 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
|
|||
this.executing = true
|
||||
|
||||
for (const obj of CONFIG.REDUNDANCY.VIDEOS) {
|
||||
|
||||
try {
|
||||
const videoToDuplicate = await this.findVideoToDuplicate(obj.strategy)
|
||||
const videoToDuplicate = await this.findVideoToDuplicate(obj)
|
||||
if (!videoToDuplicate) continue
|
||||
|
||||
const videoFiles = videoToDuplicate.VideoFiles
|
||||
videoFiles.forEach(f => f.Video = videoToDuplicate)
|
||||
|
||||
const videosRedundancy = await VideoRedundancyModel.getVideoFiles(obj.strategy)
|
||||
if (this.isTooHeavy(videosRedundancy, videoFiles, obj.size)) {
|
||||
if (await this.isTooHeavy(obj.strategy, videoFiles, obj.size)) {
|
||||
if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
|
||||
continue
|
||||
}
|
||||
|
@ -73,10 +71,19 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
|
|||
return this.instance || (this.instance = new this())
|
||||
}
|
||||
|
||||
private findVideoToDuplicate (strategy: VideoRedundancyStrategy) {
|
||||
if (strategy === 'most-views') return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
|
||||
private findVideoToDuplicate (cache: VideosRedundancy) {
|
||||
if (cache.strategy === 'most-views') {
|
||||
return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
|
||||
}
|
||||
|
||||
if (strategy === 'trending') return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
|
||||
if (cache.strategy === 'trending') {
|
||||
return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
|
||||
}
|
||||
|
||||
if (cache.strategy === 'recently-added') {
|
||||
const minViews = (cache as RecentlyAddedStrategy).minViews
|
||||
return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
|
||||
}
|
||||
}
|
||||
|
||||
private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) {
|
||||
|
@ -122,27 +129,10 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
// Unused, but could be useful in the future, with a custom strategy
|
||||
private async purgeVideosIfNeeded (videosRedundancy: VideoRedundancyModel[], filesToDuplicate: VideoFileModel[], maxSize: number) {
|
||||
const sortedVideosRedundancy = sortBy(videosRedundancy, 'createdAt')
|
||||
|
||||
while (this.isTooHeavy(sortedVideosRedundancy, filesToDuplicate, maxSize)) {
|
||||
const toDelete = sortedVideosRedundancy.shift()
|
||||
|
||||
const videoFile = toDelete.VideoFile
|
||||
logger.info('Purging video %s (resolution %d) from our redundancy system.', videoFile.Video.url, videoFile.resolution)
|
||||
|
||||
await removeVideoRedundancy(toDelete, undefined)
|
||||
}
|
||||
|
||||
return sortedVideosRedundancy
|
||||
}
|
||||
|
||||
private isTooHeavy (videosRedundancy: VideoRedundancyModel[], filesToDuplicate: VideoFileModel[], maxSizeArg: number) {
|
||||
private async isTooHeavy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[], maxSizeArg: number) {
|
||||
const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate)
|
||||
|
||||
const redundancyReducer = (previous: number, current: VideoRedundancyModel) => previous + current.VideoFile.size
|
||||
const totalDuplicated = videosRedundancy.reduce(redundancyReducer, 0)
|
||||
const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(strategy)
|
||||
|
||||
return totalDuplicated > maxSize
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import { VideoChannelModel } from '../video/video-channel'
|
|||
import { ServerModel } from '../server/server'
|
||||
import { sample } from 'lodash'
|
||||
import { isTestInstance } from '../../helpers/core-utils'
|
||||
import * as Bluebird from 'bluebird'
|
||||
|
||||
export enum ScopeNames {
|
||||
WITH_VIDEO = 'WITH_VIDEO'
|
||||
|
@ -144,7 +145,8 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
|
|||
return VideoRedundancyModel.findOne(query)
|
||||
}
|
||||
|
||||
static getVideoSample (rows: { id: number }[]) {
|
||||
static async getVideoSample (p: Bluebird<VideoModel[]>) {
|
||||
const rows = await p
|
||||
const ids = rows.map(r => r.id)
|
||||
const id = sample(ids)
|
||||
|
||||
|
@ -164,9 +166,7 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
|
|||
]
|
||||
}
|
||||
|
||||
const rows = await VideoModel.unscoped().findAll(query)
|
||||
|
||||
return VideoRedundancyModel.getVideoSample(rows as { id: number }[])
|
||||
return VideoRedundancyModel.getVideoSample(VideoModel.unscoped().findAll(query))
|
||||
}
|
||||
|
||||
static async findTrendingToDuplicate (randomizedFactor: number) {
|
||||
|
@ -186,24 +186,49 @@ export class VideoRedundancyModel extends Model<VideoRedundancyModel> {
|
|||
]
|
||||
}
|
||||
|
||||
const rows = await VideoModel.unscoped().findAll(query)
|
||||
|
||||
return VideoRedundancyModel.getVideoSample(rows as { id: number }[])
|
||||
return VideoRedundancyModel.getVideoSample(VideoModel.unscoped().findAll(query))
|
||||
}
|
||||
|
||||
static async getVideoFiles (strategy: VideoRedundancyStrategy) {
|
||||
const actor = await getServerActor()
|
||||
|
||||
const queryVideoFiles = {
|
||||
logging: !isTestInstance(),
|
||||
static async findRecentlyAddedToDuplicate (randomizedFactor: number, minViews: number) {
|
||||
// On VideoModel!
|
||||
const query = {
|
||||
attributes: [ 'id', 'publishedAt' ],
|
||||
// logging: !isTestInstance(),
|
||||
limit: randomizedFactor,
|
||||
order: getVideoSort('-publishedAt'),
|
||||
where: {
|
||||
actorId: actor.id,
|
||||
strategy
|
||||
}
|
||||
views: {
|
||||
[ Sequelize.Op.gte ]: minViews
|
||||
}
|
||||
},
|
||||
include: [
|
||||
await VideoRedundancyModel.buildVideoFileForDuplication(),
|
||||
VideoRedundancyModel.buildServerRedundancyInclude()
|
||||
]
|
||||
}
|
||||
|
||||
return VideoRedundancyModel.scope(ScopeNames.WITH_VIDEO)
|
||||
.findAll(queryVideoFiles)
|
||||
return VideoRedundancyModel.getVideoSample(VideoModel.unscoped().findAll(query))
|
||||
}
|
||||
|
||||
static async getTotalDuplicated (strategy: VideoRedundancyStrategy) {
|
||||
const actor = await getServerActor()
|
||||
|
||||
const options = {
|
||||
logging: !isTestInstance(),
|
||||
include: [
|
||||
{
|
||||
attributes: [],
|
||||
model: VideoRedundancyModel,
|
||||
required: true,
|
||||
where: {
|
||||
actorId: actor.id,
|
||||
strategy
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
return VideoFileModel.sum('size', options)
|
||||
}
|
||||
|
||||
static listAllExpired () {
|
||||
|
|
|
@ -14,7 +14,7 @@ import {
|
|||
setAccessTokensToServers,
|
||||
uploadVideo,
|
||||
wait,
|
||||
root, viewVideo
|
||||
root, viewVideo, immutableAssign
|
||||
} from '../../utils'
|
||||
import { waitJobs } from '../../utils/server/jobs'
|
||||
import * as magnetUtil from 'magnet-uri'
|
||||
|
@ -39,14 +39,14 @@ function checkMagnetWebseeds (file: { magnetUri: string, resolution: { id: numbe
|
|||
}
|
||||
}
|
||||
|
||||
async function runServers (strategy: VideoRedundancyStrategy) {
|
||||
async function runServers (strategy: VideoRedundancyStrategy, additionalParams: any = {}) {
|
||||
const config = {
|
||||
redundancy: {
|
||||
videos: [
|
||||
{
|
||||
immutableAssign({
|
||||
strategy: strategy,
|
||||
size: '100KB'
|
||||
}
|
||||
}, additionalParams)
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -153,11 +153,11 @@ describe('Test videos redundancy', function () {
|
|||
return check1WebSeed()
|
||||
})
|
||||
|
||||
it('Should enable redundancy on server 1', async function () {
|
||||
it('Should enable redundancy on server 1', function () {
|
||||
return enableRedundancy()
|
||||
})
|
||||
|
||||
it('Should have 2 webseed on the first video', async function () {
|
||||
it('Should have 2 webseed on the first video', function () {
|
||||
this.timeout(40000)
|
||||
|
||||
return check2Webseeds()
|
||||
|
@ -180,11 +180,58 @@ describe('Test videos redundancy', function () {
|
|||
return check1WebSeed()
|
||||
})
|
||||
|
||||
it('Should enable redundancy on server 1', async function () {
|
||||
it('Should enable redundancy on server 1', function () {
|
||||
return enableRedundancy()
|
||||
})
|
||||
|
||||
it('Should have 2 webseed on the first video', async function () {
|
||||
it('Should have 2 webseed on the first video', function () {
|
||||
this.timeout(40000)
|
||||
|
||||
return check2Webseeds()
|
||||
})
|
||||
|
||||
after(function () {
|
||||
return cleanServers()
|
||||
})
|
||||
})
|
||||
|
||||
describe('With recently added strategy', function () {
|
||||
|
||||
before(function () {
|
||||
this.timeout(120000)
|
||||
|
||||
return runServers('recently-added', { minViews: 3 })
|
||||
})
|
||||
|
||||
it('Should have 1 webseed on the first video', function () {
|
||||
return check1WebSeed()
|
||||
})
|
||||
|
||||
it('Should enable redundancy on server 1', function () {
|
||||
return enableRedundancy()
|
||||
})
|
||||
|
||||
it('Should still have 1 webseed on the first video', async function () {
|
||||
this.timeout(40000)
|
||||
|
||||
await waitJobs(servers)
|
||||
await wait(15000)
|
||||
await waitJobs(servers)
|
||||
|
||||
return check1WebSeed()
|
||||
})
|
||||
|
||||
it('Should view 2 times the first video', async function () {
|
||||
this.timeout(40000)
|
||||
|
||||
await viewVideo(servers[ 0 ].url, video1Server2UUID)
|
||||
await viewVideo(servers[ 2 ].url, video1Server2UUID)
|
||||
|
||||
await wait(10000)
|
||||
await waitJobs(servers)
|
||||
})
|
||||
|
||||
it('Should have 2 webseed on the first video', function () {
|
||||
this.timeout(40000)
|
||||
|
||||
return check2Webseeds()
|
||||
|
|
|
@ -1,6 +1,19 @@
|
|||
export type VideoRedundancyStrategy = 'most-views' | 'trending'
|
||||
export type VideoRedundancyStrategy = 'most-views' | 'trending' | 'recently-added'
|
||||
|
||||
export interface VideosRedundancy {
|
||||
strategy: VideoRedundancyStrategy
|
||||
export type MostViewsRedundancyStrategy = {
|
||||
strategy: 'most-views'
|
||||
size: number
|
||||
}
|
||||
|
||||
export type TrendingRedundancyStrategy = {
|
||||
strategy: 'trending'
|
||||
size: number
|
||||
}
|
||||
|
||||
export type RecentlyAddedStrategy = {
|
||||
strategy: 'recently-added'
|
||||
size: number
|
||||
minViews: number
|
||||
}
|
||||
|
||||
export type VideosRedundancy = MostViewsRedundancyStrategy | TrendingRedundancyStrategy | RecentlyAddedStrategy
|
||||
|
|
Loading…
Reference in New Issue