From e5565833f62b97f62ea75eba5b479963ae78b873 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 24 Sep 2018 13:07:33 +0200 Subject: [PATCH] Improve redundancy: add 'min_lifetime' configuration --- config/default.yaml | 8 +- config/production.yaml.example | 8 +- config/test.yaml | 7 +- server.ts | 12 +- server/helpers/ffmpeg-utils.ts | 2 +- server/initializers/checker-after-init.ts | 115 +++++++ .../{checker.ts => checker-before-init.ts} | 110 +------ server/initializers/constants.ts | 11 +- server/initializers/index.ts | 1 - server/initializers/installer.ts | 2 +- server/lib/activitypub/audience.ts | 7 +- server/lib/activitypub/cache-file.ts | 22 +- .../lib/activitypub/process/process-create.ts | 8 +- .../lib/activitypub/process/process-undo.ts | 2 +- .../lib/activitypub/process/process-update.ts | 28 +- server/lib/activitypub/send/send-update.ts | 4 +- server/lib/activitypub/videos.ts | 50 +-- server/lib/cache/index.ts | 1 + server/lib/redundancy.ts | 3 +- .../schedulers/videos-redundancy-scheduler.ts | 88 ++++-- server/models/activitypub/actor.ts | 31 ++ server/models/redundancy/video-redundancy.ts | 64 +++- server/models/video/video-file.ts | 6 + server/tests/api/server/redundancy.ts | 296 ++++++++++++------ server/tests/utils/server/servers.ts | 4 +- .../redundancy/videos-redundancy.model.ts | 3 + support/doc/redundancy.md | 46 +++ 27 files changed, 644 insertions(+), 295 deletions(-) create mode 100644 server/initializers/checker-after-init.ts rename server/initializers/{checker.ts => checker-before-init.ts} (51%) create mode 100644 support/doc/redundancy.md diff --git a/config/default.yaml b/config/default.yaml index fa1fb628a..0d7d948c2 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -75,14 +75,20 @@ redundancy: strategies: # - # size: '10GB' +# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances) +# min_lifetime: '48 hours' # strategy: 'most-views' # Cache videos that have the most views # - # size: '10GB' +# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances) +# min_lifetime: '48 hours' # strategy: 'trending' # Cache trending videos # - # size: '10GB' +# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances) +# min_lifetime: '48 hours' # strategy: 'recently-added' # Cache recently added videos -# minViews: 10 # Having at least x views +# min_views: 10 # Having at least x views cache: previews: diff --git a/config/production.yaml.example b/config/production.yaml.example index 4d8752206..f9da8e0dd 100644 --- a/config/production.yaml.example +++ b/config/production.yaml.example @@ -76,14 +76,20 @@ redundancy: strategies: # - # size: '10GB' +# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances) +# min_lifetime: '48 hours' # strategy: 'most-views' # Cache videos that have the most views # - # size: '10GB' +# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances) +# min_lifetime: '48 hours' # strategy: 'trending' # Cache trending videos # - # size: '10GB' +# # Minimum time the video must remain in the cache. Only accept values > 10 hours (to not overload remote instances) +# min_lifetime: '48 hours' # strategy: 'recently-added' # Cache recently added videos -# minViews: 10 # Having at least x views +# min_views: 10 # Having at least x views ############################################################################### # diff --git a/config/test.yaml b/config/test.yaml index ad94b00cd..04c999966 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -23,18 +23,21 @@ log: redundancy: videos: - check_interval: '5 seconds' + check_interval: '10 minutes' strategies: - size: '10MB' + min_lifetime: '10 minutes' strategy: 'most-views' - size: '10MB' + min_lifetime: '10 minutes' strategy: 'trending' - size: '10MB' + min_lifetime: '10 minutes' strategy: 'recently-added' - minViews: 1 + min_views: 1 cache: previews: diff --git a/server.ts b/server.ts index 8bc5e5f32..59fb820b4 100644 --- a/server.ts +++ b/server.ts @@ -1,6 +1,4 @@ // FIXME: https://github.com/nodejs/node/pull/16853 -import { VideosCaptionCache } from './server/lib/cache/videos-caption-cache' - require('tls').DEFAULT_ECDH_CURVE = 'auto' import { isTestInstance } from './server/helpers/core-utils' @@ -17,7 +15,7 @@ import * as cors from 'cors' import * as cookieParser from 'cookie-parser' import * as helmet from 'helmet' import * as useragent from 'useragent' -import * as anonymise from 'ip-anonymize' +import * as anonymize from 'ip-anonymize' process.title = 'peertube' @@ -25,7 +23,7 @@ process.title = 'peertube' const app = express() // ----------- Core checker ----------- -import { checkMissedConfig, checkFFmpeg, checkConfig, checkActivityPubUrls } from './server/initializers/checker' +import { checkMissedConfig, checkFFmpeg } from './server/initializers/checker-before-init' // Do not use barrels because we don't want to load all modules here (we need to initialize database first) import { logger } from './server/helpers/logger' @@ -43,6 +41,8 @@ checkFFmpeg(CONFIG) process.exit(-1) }) +import { checkConfig, checkActivityPubUrls } from './server/initializers/checker-after-init' + const errorMessage = checkConfig() if (errorMessage !== null) { throw new Error(errorMessage) @@ -76,7 +76,7 @@ migrate() import { installApplication } from './server/initializers' import { Emailer } from './server/lib/emailer' import { JobQueue } from './server/lib/job-queue' -import { VideosPreviewCache } from './server/lib/cache' +import { VideosPreviewCache, VideosCaptionCache } from './server/lib/cache' import { activityPubRouter, apiRouter, @@ -111,7 +111,7 @@ if (isTestInstance()) { // For the logger morgan.token('remote-addr', req => { return (req.get('DNT') === '1') ? - anonymise(req.ip || (req.connection && req.connection.remoteAddress) || undefined, + anonymize(req.ip || (req.connection && req.connection.remoteAddress) || undefined, 16, // bitmask for IPv4 16 // bitmask for IPv6 ) : diff --git a/server/helpers/ffmpeg-utils.ts b/server/helpers/ffmpeg-utils.ts index 7c45f3632..22bc25476 100644 --- a/server/helpers/ffmpeg-utils.ts +++ b/server/helpers/ffmpeg-utils.ts @@ -4,7 +4,7 @@ import { VideoResolution } from '../../shared/models/videos' import { CONFIG, FFMPEG_NICE, VIDEO_TRANSCODING_FPS } from '../initializers' import { processImage } from './image-utils' import { logger } from './logger' -import { checkFFmpegEncoders } from '../initializers/checker' +import { checkFFmpegEncoders } from '../initializers/checker-before-init' import { remove } from 'fs-extra' function computeResolutionsToTranscode (videoFileHeight: number) { diff --git a/server/initializers/checker-after-init.ts b/server/initializers/checker-after-init.ts new file mode 100644 index 000000000..588526235 --- /dev/null +++ b/server/initializers/checker-after-init.ts @@ -0,0 +1,115 @@ +import * as config from 'config' +import { promisify0, isProdInstance, parseDuration, isTestInstance } from '../helpers/core-utils' +import { UserModel } from '../models/account/user' +import { ApplicationModel } from '../models/application/application' +import { OAuthClientModel } from '../models/oauth/oauth-client' +import { parse } from 'url' +import { CONFIG } from './constants' +import { logger } from '../helpers/logger' +import { getServerActor } from '../helpers/utils' +import { RecentlyAddedStrategy, VideosRedundancy } from '../../shared/models/redundancy' +import { isArray } from '../helpers/custom-validators/misc' +import { uniq } from 'lodash' + +async function checkActivityPubUrls () { + const actor = await getServerActor() + + const parsed = parse(actor.url) + if (CONFIG.WEBSERVER.HOST !== parsed.host) { + const NODE_ENV = config.util.getEnv('NODE_ENV') + const NODE_CONFIG_DIR = config.util.getEnv('NODE_CONFIG_DIR') + + logger.warn( + 'It seems PeerTube was started (and created some data) with another domain name. ' + + 'This means you will not be able to federate! ' + + 'Please use %s %s npm run update-host to fix this.', + NODE_CONFIG_DIR ? `NODE_CONFIG_DIR=${NODE_CONFIG_DIR}` : '', + NODE_ENV ? `NODE_ENV=${NODE_ENV}` : '' + ) + } +} + +// Some checks on configuration files +// Return an error message, or null if everything is okay +function checkConfig () { + const defaultNSFWPolicy = CONFIG.INSTANCE.DEFAULT_NSFW_POLICY + + // NSFW policy + { + const available = [ 'do_not_list', 'blur', 'display' ] + if (available.indexOf(defaultNSFWPolicy) === -1) { + return 'NSFW policy setting should be ' + available.join(' or ') + ' instead of ' + defaultNSFWPolicy + } + } + + // Redundancies + const redundancyVideos = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES + if (isArray(redundancyVideos)) { + const available = [ 'most-views', 'trending', 'recently-added' ] + for (const r of redundancyVideos) { + if (available.indexOf(r.strategy) === -1) { + return 'Videos redundancy should have ' + available.join(' or ') + ' strategy instead of ' + r.strategy + } + + // Lifetime should not be < 10 hours + if (!isTestInstance() && r.minLifetime < 1000 * 3600 * 10) { + return 'Video redundancy minimum lifetime should be >= 10 hours for strategy ' + r.strategy + } + } + + const filtered = uniq(redundancyVideos.map(r => r.strategy)) + if (filtered.length !== redundancyVideos.length) { + return 'Redundancy video entries should have unique strategies' + } + + const recentlyAddedStrategy = redundancyVideos.find(r => r.strategy === 'recently-added') as RecentlyAddedStrategy + if (recentlyAddedStrategy && isNaN(recentlyAddedStrategy.minViews)) { + return 'Min views in recently added strategy is not a number' + } + } + + if (isProdInstance()) { + const configStorage = config.get('storage') + for (const key of Object.keys(configStorage)) { + if (configStorage[key].startsWith('storage/')) { + logger.warn( + 'Directory of %s should not be in the production directory of PeerTube. Please check your production configuration file.', + key + ) + } + } + } + + return null +} + +// We get db by param to not import it in this file (import orders) +async function clientsExist () { + const totalClients = await OAuthClientModel.countTotal() + + return totalClients !== 0 +} + +// We get db by param to not import it in this file (import orders) +async function usersExist () { + const totalUsers = await UserModel.countTotal() + + return totalUsers !== 0 +} + +// We get db by param to not import it in this file (import orders) +async function applicationExist () { + const totalApplication = await ApplicationModel.countTotal() + + return totalApplication !== 0 +} + +// --------------------------------------------------------------------------- + +export { + checkConfig, + clientsExist, + usersExist, + applicationExist, + checkActivityPubUrls +} diff --git a/server/initializers/checker.ts b/server/initializers/checker-before-init.ts similarity index 51% rename from server/initializers/checker.ts rename to server/initializers/checker-before-init.ts index 5b068caa1..4f46d406a 100644 --- a/server/initializers/checker.ts +++ b/server/initializers/checker-before-init.ts @@ -1,78 +1,8 @@ import * as config from 'config' -import { promisify0, isProdInstance } from '../helpers/core-utils' -import { UserModel } from '../models/account/user' -import { ApplicationModel } from '../models/application/application' -import { OAuthClientModel } from '../models/oauth/oauth-client' -import { parse } from 'url' -import { CONFIG } from './constants' -import { logger } from '../helpers/logger' -import { getServerActor } from '../helpers/utils' -import { RecentlyAddedStrategy, VideosRedundancy } from '../../shared/models/redundancy' +import { promisify0 } from '../helpers/core-utils' import { isArray } from '../helpers/custom-validators/misc' -import { uniq } from 'lodash' -async function checkActivityPubUrls () { - const actor = await getServerActor() - - const parsed = parse(actor.url) - if (CONFIG.WEBSERVER.HOST !== parsed.host) { - const NODE_ENV = config.util.getEnv('NODE_ENV') - const NODE_CONFIG_DIR = config.util.getEnv('NODE_CONFIG_DIR') - - logger.warn( - 'It seems PeerTube was started (and created some data) with another domain name. ' + - 'This means you will not be able to federate! ' + - 'Please use %s %s npm run update-host to fix this.', - NODE_CONFIG_DIR ? `NODE_CONFIG_DIR=${NODE_CONFIG_DIR}` : '', - NODE_ENV ? `NODE_ENV=${NODE_ENV}` : '' - ) - } -} - -// Some checks on configuration files -// Return an error message, or null if everything is okay -function checkConfig () { - const defaultNSFWPolicy = config.get('instance.default_nsfw_policy') - - // NSFW policy - if ([ 'do_not_list', 'blur', 'display' ].indexOf(defaultNSFWPolicy) === -1) { - return 'NSFW policy setting should be "do_not_list" or "blur" or "display" instead of ' + defaultNSFWPolicy - } - - // Redundancies - const redundancyVideos = config.get('redundancy.videos.strategies') - if (isArray(redundancyVideos)) { - for (const r of redundancyVideos) { - if ([ 'most-views', 'trending', 'recently-added' ].indexOf(r.strategy) === -1) { - return 'Redundancy video entries should have "most-views" strategy instead of ' + r.strategy - } - } - - const filtered = uniq(redundancyVideos.map(r => r.strategy)) - if (filtered.length !== redundancyVideos.length) { - return 'Redundancy video entries should have unique strategies' - } - - const recentlyAddedStrategy = redundancyVideos.find(r => r.strategy === 'recently-added') as RecentlyAddedStrategy - if (recentlyAddedStrategy && isNaN(recentlyAddedStrategy.minViews)) { - return 'Min views in recently added strategy is not a number' - } - } - - if (isProdInstance()) { - const configStorage = config.get('storage') - for (const key of Object.keys(configStorage)) { - if (configStorage[key].startsWith('storage/')) { - logger.warn( - 'Directory of %s should not be in the production directory of PeerTube. Please check your production configuration file.', - key - ) - } - } - } - - return null -} +// ONLY USE CORE MODULES IN THIS FILE! // Check the config files function checkMissedConfig () { @@ -109,6 +39,14 @@ function checkMissedConfig () { } } + const redundancyVideos = config.get('redundancy.videos.strategies') + if (isArray(redundancyVideos)) { + for (const r of redundancyVideos) { + if (!r.size) miss.push('redundancy.videos.strategies.size') + if (!r.min_lifetime) miss.push('redundancy.videos.strategies.min_lifetime') + } + } + const missingAlternatives = requiredAlternatives.filter( set => !set.find(alternative => !alternative.find(key => !config.has(key))) ) @@ -163,36 +101,10 @@ async function checkFFmpegEncoders (): Promise> { } } -// We get db by param to not import it in this file (import orders) -async function clientsExist () { - const totalClients = await OAuthClientModel.countTotal() - - return totalClients !== 0 -} - -// We get db by param to not import it in this file (import orders) -async function usersExist () { - const totalUsers = await UserModel.countTotal() - - return totalUsers !== 0 -} - -// We get db by param to not import it in this file (import orders) -async function applicationExist () { - const totalApplication = await ApplicationModel.countTotal() - - return totalApplication !== 0 -} - // --------------------------------------------------------------------------- export { - checkConfig, checkFFmpeg, checkFFmpegEncoders, - checkMissedConfig, - clientsExist, - usersExist, - applicationExist, - checkActivityPubUrls + checkMissedConfig } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 03424ffb8..947cbda28 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -601,7 +601,6 @@ const MEMOIZE_TTL = { const REDUNDANCY = { VIDEOS: { - EXPIRES_AFTER_MS: 48 * 3600 * 1000, // 2 days RANDOMIZED_FACTOR: 5 } } @@ -750,10 +749,16 @@ function updateWebserverConfig () { CONFIG.WEBSERVER.HOST = sanitizeHost(CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT, REMOTE_SCHEME.HTTP) } -function buildVideosRedundancy (objs: VideosRedundancy[]): VideosRedundancy[] { +function buildVideosRedundancy (objs: any[]): VideosRedundancy[] { if (!objs) return [] - return objs.map(obj => Object.assign(obj, { size: bytes.parse(obj.size) })) + return objs.map(obj => { + return Object.assign(obj, { + minLifetime: parseDuration(obj.min_lifetime), + size: bytes.parse(obj.size), + minViews: obj.min_views + }) + }) } function buildLanguages () { diff --git a/server/initializers/index.ts b/server/initializers/index.ts index 332702774..fe9190a9c 100644 --- a/server/initializers/index.ts +++ b/server/initializers/index.ts @@ -1,6 +1,5 @@ // Constants first, database in second! export * from './constants' export * from './database' -export * from './checker' export * from './installer' export * from './migrator' diff --git a/server/initializers/installer.ts b/server/initializers/installer.ts index 818bb04a2..c952ad46c 100644 --- a/server/initializers/installer.ts +++ b/server/initializers/installer.ts @@ -5,7 +5,7 @@ import { createApplicationActor, createUserAccountAndChannel } from '../lib/user import { UserModel } from '../models/account/user' import { ApplicationModel } from '../models/application/application' import { OAuthClientModel } from '../models/oauth/oauth-client' -import { applicationExist, clientsExist, usersExist } from './checker' +import { applicationExist, clientsExist, usersExist } from './checker-after-init' import { CACHE, CONFIG, LAST_MIGRATION_VERSION } from './constants' import { sequelizeTypescript } from './database' import { remove, ensureDir } from 'fs-extra' diff --git a/server/lib/activitypub/audience.ts b/server/lib/activitypub/audience.ts index a86428461..10277eca7 100644 --- a/server/lib/activitypub/audience.ts +++ b/server/lib/activitypub/audience.ts @@ -50,7 +50,12 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: ActorModel[]): Acti async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) { const actors = await VideoShareModel.loadActorsByShare(video.id, t) - actors.push(video.VideoChannel.Account.Actor) + + const videoActor = video.VideoChannel && video.VideoChannel.Account + ? video.VideoChannel.Account.Actor + : await ActorModel.loadAccountActorByVideoId(video.id, t) + + actors.push(videoActor) return actors } diff --git a/server/lib/activitypub/cache-file.ts b/server/lib/activitypub/cache-file.ts index 87f8a4162..5286d8e6d 100644 --- a/server/lib/activitypub/cache-file.ts +++ b/server/lib/activitypub/cache-file.ts @@ -1,7 +1,7 @@ import { CacheFileObject } from '../../../shared/index' import { VideoModel } from '../../models/video/video' -import { sequelizeTypescript } from '../../initializers' import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' +import { Transaction } from 'sequelize' function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { const url = cacheFileObject.url @@ -22,25 +22,29 @@ function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject } } -function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { - return sequelizeTypescript.transaction(async t => { - const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) +function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }, t: Transaction) { + const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) - return VideoRedundancyModel.create(attributes, { transaction: t }) - }) + return VideoRedundancyModel.create(attributes, { transaction: t }) } -function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: { id?: number }) { +function updateCacheFile ( + cacheFileObject: CacheFileObject, + redundancyModel: VideoRedundancyModel, + video: VideoModel, + byActor: { id?: number }, + t: Transaction +) { if (redundancyModel.actorId !== byActor.id) { throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.') } - const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor) + const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) redundancyModel.set('expires', attributes.expiresOn) redundancyModel.set('fileUrl', attributes.fileUrl) - return redundancyModel.save() + return redundancyModel.save({ transaction: t }) } export { diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index cff8dcfc6..ceb5413ca 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts @@ -95,7 +95,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate) if (video.isOwned()) { // Don't resend the activity to the sender const exceptions = [ byActor ] - await forwardActivity(activity, undefined, exceptions) + await forwardVideoRelatedActivity(activity, undefined, exceptions, video) } } @@ -104,12 +104,14 @@ async function processCacheFile (byActor: ActorModel, activity: ActivityCreate) const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object }) - await createCacheFile(cacheFile, video, byActor) + await sequelizeTypescript.transaction(async t => { + return createCacheFile(cacheFile, video, byActor, t) + }) if (video.isOwned()) { // Don't resend the activity to the sender const exceptions = [ byActor ] - await forwardActivity(activity, undefined, exceptions) + await forwardVideoRelatedActivity(activity, undefined, exceptions, video) } } diff --git a/server/lib/activitypub/process/process-undo.ts b/server/lib/activitypub/process/process-undo.ts index 73ca0a17c..ff019cd8c 100644 --- a/server/lib/activitypub/process/process-undo.ts +++ b/server/lib/activitypub/process/process-undo.ts @@ -100,7 +100,7 @@ async function processUndoCacheFile (byActor: ActorModel, activity: ActivityUndo return sequelizeTypescript.transaction(async t => { const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) - if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url) + if (!cacheFile) throw new Error('Unknown video cache ' + cacheFileObject.id) if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.') diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts index ed3489ebf..e092a6729 100644 --- a/server/lib/activitypub/process/process-update.ts +++ b/server/lib/activitypub/process/process-update.ts @@ -12,6 +12,7 @@ import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-vali import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' import { createCacheFile, updateCacheFile } from '../cache-file' +import { forwardVideoRelatedActivity } from '../send/utils' async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) { const objectType = activity.object.type @@ -68,18 +69,29 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate) async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { const cacheFileObject = activity.object as CacheFileObject - if (!isCacheFileObjectValid(cacheFileObject) === false) { - logger.debug('Cahe file object sent by update is not valid.', { cacheFileObject }) + if (!isCacheFileObjectValid(cacheFileObject)) { + logger.debug('Cache file object sent by update is not valid.', { cacheFileObject }) return undefined } - const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) - if (!redundancyModel) { - const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.id }) - return createCacheFile(cacheFileObject, video, byActor) - } + const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.object }) - return updateCacheFile(cacheFileObject, redundancyModel, byActor) + await sequelizeTypescript.transaction(async t => { + const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id, t) + + if (!redundancyModel) { + await createCacheFile(cacheFileObject, video, byActor, t) + } else { + await updateCacheFile(cacheFileObject, redundancyModel, video, byActor, t) + } + }) + + if (video.isOwned()) { + // Don't resend the activity to the sender + const exceptions = [ byActor ] + + await forwardVideoRelatedActivity(activity, undefined, exceptions, video) + } } async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { diff --git a/server/lib/activitypub/send/send-update.ts b/server/lib/activitypub/send/send-update.ts index ec46789b7..a68f03edf 100644 --- a/server/lib/activitypub/send/send-update.ts +++ b/server/lib/activitypub/send/send-update.ts @@ -7,8 +7,8 @@ import { VideoModel } from '../../../models/video/video' import { VideoChannelModel } from '../../../models/video/video-channel' import { VideoShareModel } from '../../../models/video/video-share' import { getUpdateActivityPubUrl } from '../url' -import { broadcastToFollowers, sendVideoRelatedActivity, unicastTo } from './utils' -import { audiencify, getActorsInvolvedInVideo, getAudience, getAudienceFromFollowersOf } from '../audience' +import { broadcastToFollowers, sendVideoRelatedActivity } from './utils' +import { audiencify, getActorsInvolvedInVideo, getAudience } from '../audience' import { logger } from '../../../helpers/logger' import { VideoCaptionModel } from '../../../models/video/video-caption' import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 48c0e0a5c..db72ef23c 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -176,7 +176,7 @@ async function getOrCreateVideoAndAccountAndChannel (options: { syncParam, refreshViews } - const p = retryTransactionWrapper(refreshVideoIfNeeded, refreshOptions) + const p = refreshVideoIfNeeded(refreshOptions) if (syncParam.refreshVideo === true) videoFromDatabase = await p return { video: videoFromDatabase } @@ -245,29 +245,37 @@ async function updateVideoFromAP (options: { generateThumbnailFromUrl(options.video, options.videoObject.icon) .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })) - // Remove old video files - const videoFileDestroyTasks: Bluebird[] = [] - for (const videoFile of options.video.VideoFiles) { - videoFileDestroyTasks.push(videoFile.destroy(sequelizeOptions)) + { + const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) + const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a)) + + // Remove video files that do not exist anymore + const destroyTasks = options.video.VideoFiles + .filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f))) + .map(f => f.destroy(sequelizeOptions)) + await Promise.all(destroyTasks) + + // Update or add other one + const upsertTasks = videoFileAttributes.map(a => VideoFileModel.upsert(a, sequelizeOptions)) + await Promise.all(upsertTasks) } - await Promise.all(videoFileDestroyTasks) - const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) - const tasks = videoFileAttributes.map(f => VideoFileModel.create(f, sequelizeOptions)) - await Promise.all(tasks) + { + // Update Tags + const tags = options.videoObject.tag.map(tag => tag.name) + const tagInstances = await TagModel.findOrCreateTags(tags, t) + await options.video.$set('Tags', tagInstances, sequelizeOptions) + } - // Update Tags - const tags = options.videoObject.tag.map(tag => tag.name) - const tagInstances = await TagModel.findOrCreateTags(tags, t) - await options.video.$set('Tags', tagInstances, sequelizeOptions) + { + // Update captions + await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) - // Update captions - await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) - - const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { - return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) - }) - await Promise.all(videoCaptionsPromises) + const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { + return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) + }) + await Promise.all(videoCaptionsPromises) + } }) logger.info('Remote video with uuid %s updated', options.videoObject.uuid) @@ -382,7 +390,7 @@ async function refreshVideoIfNeeded (options: { channel: channelActor.VideoChannel, updateViews: options.refreshViews } - await updateVideoFromAP(updateOptions) + await retryTransactionWrapper(updateVideoFromAP, updateOptions) await syncVideoExternalAttributes(video, videoObject, options.syncParam) } catch (err) { logger.warn('Cannot refresh video.', { err }) diff --git a/server/lib/cache/index.ts b/server/lib/cache/index.ts index 7bf63790a..54eb983fa 100644 --- a/server/lib/cache/index.ts +++ b/server/lib/cache/index.ts @@ -1 +1,2 @@ export * from './videos-preview-cache' +export * from './videos-caption-cache' diff --git a/server/lib/redundancy.ts b/server/lib/redundancy.ts index 78221cc3d..16b122658 100644 --- a/server/lib/redundancy.ts +++ b/server/lib/redundancy.ts @@ -6,7 +6,8 @@ import { getServerActor } from '../helpers/utils' async function removeVideoRedundancy (videoRedundancy: VideoRedundancyModel, t?: Transaction) { const serverActor = await getServerActor() - await sendUndoCacheFile(serverActor, videoRedundancy, t) + // Local cache, send undo to remote instances + if (videoRedundancy.actorId === serverActor.id) await sendUndoCacheFile(serverActor, videoRedundancy, t) await videoRedundancy.destroy({ transaction: t }) } diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts index 998d2295a..97df3e4f5 100644 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts @@ -1,7 +1,7 @@ import { AbstractScheduler } from './abstract-scheduler' -import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers' +import { CONFIG, JOB_TTL, REDUNDANCY } from '../../initializers' import { logger } from '../../helpers/logger' -import { VideoRedundancyStrategy, VideosRedundancy } from '../../../shared/models/redundancy' +import { VideosRedundancy } from '../../../shared/models/redundancy' import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' import { VideoFileModel } from '../../models/video/video-file' import { downloadWebTorrentVideo } from '../../helpers/webtorrent' @@ -12,6 +12,7 @@ import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' import { VideoModel } from '../../models/video/video' import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' import { isTestInstance } from '../../helpers/core-utils' +import { removeVideoRedundancy } from '../redundancy' export class VideosRedundancyScheduler extends AbstractScheduler { @@ -30,7 +31,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { this.executing = true for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { - if (!isTestInstance()) logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) + logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) try { const videoToDuplicate = await this.findVideoToDuplicate(obj) @@ -39,20 +40,24 @@ export class VideosRedundancyScheduler extends AbstractScheduler { const videoFiles = videoToDuplicate.VideoFiles videoFiles.forEach(f => f.Video = videoToDuplicate) - if (await this.isTooHeavy(obj.strategy, videoFiles, obj.size)) { - if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) + await this.purgeCacheIfNeeded(obj, videoFiles) + + if (await this.isTooHeavy(obj, videoFiles)) { + logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) continue } logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) - await this.createVideoRedundancy(obj.strategy, videoFiles) + await this.createVideoRedundancy(obj, videoFiles) } catch (err) { logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) } } - await this.removeExpired() + await this.extendsLocalExpiration() + + await this.purgeRemoteExpired() this.executing = false } @@ -61,16 +66,27 @@ export class VideosRedundancyScheduler extends AbstractScheduler { return this.instance || (this.instance = new this()) } - private async removeExpired () { - const expired = await VideoRedundancyModel.listAllExpired() - - for (const m of expired) { - logger.info('Removing expired video %s from our redundancy system.', this.buildEntryLogId(m)) + private async extendsLocalExpiration () { + const expired = await VideoRedundancyModel.listLocalExpired() + for (const redundancyModel of expired) { try { - await m.destroy() + const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy) + await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime) } catch (err) { - logger.error('Cannot remove %s video from our redundancy system.', this.buildEntryLogId(m)) + logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel)) + } + } + } + + private async purgeRemoteExpired () { + const expired = await VideoRedundancyModel.listRemoteExpired() + + for (const redundancyModel of expired) { + try { + await removeVideoRedundancy(redundancyModel) + } catch (err) { + logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel)) } } } @@ -90,18 +106,14 @@ export class VideosRedundancyScheduler extends AbstractScheduler { } } - private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) { + private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { const serverActor = await getServerActor() for (const file of filesToDuplicate) { const existing = await VideoRedundancyModel.loadByFileId(file.id) if (existing) { - logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', file.Video.url, file.resolution, strategy) + await this.extendsExpirationOf(existing, redundancy.minLifetime) - existing.expiresOn = this.buildNewExpiration() - await existing.save() - - await sendUpdateCacheFile(serverActor, existing) continue } @@ -109,7 +121,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) if (!video) continue - logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy) + logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy) const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) @@ -120,10 +132,10 @@ export class VideosRedundancyScheduler extends AbstractScheduler { await rename(tmpPath, destPath) const createdModel = await VideoRedundancyModel.create({ - expiresOn: new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS), + expiresOn: this.buildNewExpiration(redundancy.minLifetime), url: getVideoCacheFileActivityPubUrl(file), fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), - strategy, + strategy: redundancy.strategy, videoFileId: file.id, actorId: serverActor.id }) @@ -133,16 +145,36 @@ export class VideosRedundancyScheduler extends AbstractScheduler { } } - private async isTooHeavy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[], maxSizeArg: number) { - const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate) + private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) { + logger.info('Extending expiration of %s.', redundancy.url) - const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(strategy) + const serverActor = await getServerActor() + + redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs) + await redundancy.save() + + await sendUpdateCacheFile(serverActor, redundancy) + } + + private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { + while (this.isTooHeavy(redundancy, filesToDuplicate)) { + const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime) + if (!toDelete) return + + await removeVideoRedundancy(toDelete) + } + } + + private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) { + const maxSize = redundancy.size - this.getTotalFileSizes(filesToDuplicate) + + const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy) return totalDuplicated > maxSize } - private buildNewExpiration () { - return new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS) + private buildNewExpiration (expiresAfterMs: number) { + return new Date(Date.now() + expiresAfterMs) } private buildEntryLogId (object: VideoRedundancyModel) { diff --git a/server/models/activitypub/actor.ts b/server/models/activitypub/actor.ts index f8bb59323..12b83916e 100644 --- a/server/models/activitypub/actor.ts +++ b/server/models/activitypub/actor.ts @@ -37,6 +37,7 @@ import { ServerModel } from '../server/server' import { throwIfNotValid } from '../utils' import { VideoChannelModel } from '../video/video-channel' import { ActorFollowModel } from './actor-follow' +import { VideoModel } from '../video/video' enum ScopeNames { FULL = 'FULL' @@ -266,6 +267,36 @@ export class ActorModel extends Model { return ActorModel.unscoped().findById(id) } + static loadAccountActorByVideoId (videoId: number, transaction: Sequelize.Transaction) { + const query = { + include: [ + { + attributes: [ 'id' ], + model: AccountModel.unscoped(), + required: true, + include: [ + { + attributes: [ 'id' ], + model: VideoChannelModel.unscoped(), + required: true, + include: { + attributes: [ 'id' ], + model: VideoModel.unscoped(), + required: true, + where: { + id: videoId + } + } + } + ] + } + ], + transaction + } + + return ActorModel.unscoped().findOne(query as any) // FIXME: typings + } + static isActorUrlExist (url: string) { const query = { raw: true, diff --git a/server/models/redundancy/video-redundancy.ts b/server/models/redundancy/video-redundancy.ts index fb07287a8..970d2fe06 100644 --- a/server/models/redundancy/video-redundancy.ts +++ b/server/models/redundancy/video-redundancy.ts @@ -9,7 +9,6 @@ import { Is, Model, Scopes, - Sequelize, Table, UpdatedAt } from 'sequelize-typescript' @@ -28,6 +27,7 @@ import { ServerModel } from '../server/server' import { sample } from 'lodash' import { isTestInstance } from '../../helpers/core-utils' import * as Bluebird from 'bluebird' +import * as Sequelize from 'sequelize' export enum ScopeNames { WITH_VIDEO = 'WITH_VIDEO' @@ -116,11 +116,11 @@ export class VideoRedundancyModel extends Model { Actor: ActorModel @AfterDestroy - static removeFilesAndSendDelete (instance: VideoRedundancyModel) { + static removeFile (instance: VideoRedundancyModel) { // Not us if (!instance.strategy) return - logger.info('Removing video file %s-.', instance.VideoFile.Video.uuid, instance.VideoFile.resolution) + logger.info('Removing duplicated video file %s-%s.', instance.VideoFile.Video.uuid, instance.VideoFile.resolution) return instance.VideoFile.Video.removeFile(instance.VideoFile) } @@ -135,11 +135,12 @@ export class VideoRedundancyModel extends Model { return VideoRedundancyModel.scope(ScopeNames.WITH_VIDEO).findOne(query) } - static loadByUrl (url: string) { + static loadByUrl (url: string, transaction?: Sequelize.Transaction) { const query = { where: { url - } + }, + transaction } return VideoRedundancyModel.findOne(query) @@ -157,7 +158,6 @@ export class VideoRedundancyModel extends Model { // On VideoModel! const query = { attributes: [ 'id', 'views' ], - logging: !isTestInstance(), limit: randomizedFactor, order: getVideoSort('-views'), include: [ @@ -174,7 +174,6 @@ export class VideoRedundancyModel extends Model { const query = { attributes: [ 'id', 'views' ], subQuery: false, - logging: !isTestInstance(), group: 'VideoModel.id', limit: randomizedFactor, order: getVideoSort('-trending'), @@ -193,7 +192,6 @@ export class VideoRedundancyModel extends Model { // On VideoModel! const query = { attributes: [ 'id', 'publishedAt' ], - logging: !isTestInstance(), limit: randomizedFactor, order: getVideoSort('-publishedAt'), where: { @@ -210,11 +208,29 @@ export class VideoRedundancyModel extends Model { return VideoRedundancyModel.getVideoSample(VideoModel.unscoped().findAll(query)) } + static async loadOldestLocalThatAlreadyExpired (strategy: VideoRedundancyStrategy, expiresAfterMs: number) { + const expiredDate = new Date() + expiredDate.setMilliseconds(expiredDate.getMilliseconds() - expiresAfterMs) + + const actor = await getServerActor() + + const query = { + where: { + actorId: actor.id, + strategy, + createdAt: { + [ Sequelize.Op.lt ]: expiredDate + } + } + } + + return VideoRedundancyModel.scope([ ScopeNames.WITH_VIDEO ]).findOne(query) + } + static async getTotalDuplicated (strategy: VideoRedundancyStrategy) { const actor = await getServerActor() const options = { - logging: !isTestInstance(), include: [ { attributes: [], @@ -228,21 +244,39 @@ export class VideoRedundancyModel extends Model { ] } - return VideoFileModel.sum('size', options) + return VideoFileModel.sum('size', options as any) // FIXME: typings } - static listAllExpired () { + static async listLocalExpired () { + const actor = await getServerActor() + const query = { - logging: !isTestInstance(), where: { + actorId: actor.id, expiresOn: { [ Sequelize.Op.lt ]: new Date() } } } - return VideoRedundancyModel.scope(ScopeNames.WITH_VIDEO) - .findAll(query) + return VideoRedundancyModel.scope([ ScopeNames.WITH_VIDEO ]).findAll(query) + } + + static async listRemoteExpired () { + const actor = await getServerActor() + + const query = { + where: { + actorId: { + [Sequelize.Op.ne]: actor.id + }, + expiresOn: { + [ Sequelize.Op.lt ]: new Date() + } + } + } + + return VideoRedundancyModel.scope([ ScopeNames.WITH_VIDEO ]).findAll(query) } static async getStats (strategy: VideoRedundancyStrategy) { @@ -299,7 +333,7 @@ export class VideoRedundancyModel extends Model { const notIn = Sequelize.literal( '(' + - `SELECT "videoFileId" FROM "videoRedundancy" WHERE "actorId" = ${actor.id} AND "expiresOn" >= NOW()` + + `SELECT "videoFileId" FROM "videoRedundancy" WHERE "actorId" = ${actor.id}` + ')' ) diff --git a/server/models/video/video-file.ts b/server/models/video/video-file.ts index 0907ea569..0887a3738 100644 --- a/server/models/video/video-file.ts +++ b/server/models/video/video-file.ts @@ -106,4 +106,10 @@ export class VideoFileModel extends Model { return results.length === 1 }) } + + hasSameUniqueKeysThan (other: VideoFileModel) { + return this.fps === other.fps && + this.resolution === other.resolution && + this.videoId === other.videoId + } } diff --git a/server/tests/api/server/redundancy.ts b/server/tests/api/server/redundancy.ts index 6ce4b9dd1..a773e3de4 100644 --- a/server/tests/api/server/redundancy.ts +++ b/server/tests/api/server/redundancy.ts @@ -31,14 +31,13 @@ const expect = chai.expect let servers: ServerInfo[] = [] let video1Server2UUID: string -let video2Server2UUID: string -function checkMagnetWebseeds (file: { magnetUri: string, resolution: { id: number } }, baseWebseeds: string[]) { +function checkMagnetWebseeds (file: { magnetUri: string, resolution: { id: number } }, baseWebseeds: string[], server: ServerInfo) { const parsed = magnetUtil.decode(file.magnetUri) for (const ws of baseWebseeds) { const found = parsed.urlList.find(url => url === `${ws}-${file.resolution.id}.mp4`) - expect(found, `Webseed ${ws} not found in ${file.magnetUri}`).to.not.be.undefined + expect(found, `Webseed ${ws} not found in ${file.magnetUri} on server ${server.url}`).to.not.be.undefined } } @@ -49,6 +48,7 @@ async function runServers (strategy: VideoRedundancyStrategy, additionalParams: check_interval: '5 seconds', strategies: [ immutableAssign({ + min_lifetime: '1 hour', strategy: strategy, size: '100KB' }, additionalParams) @@ -68,11 +68,6 @@ async function runServers (strategy: VideoRedundancyStrategy, additionalParams: await viewVideo(servers[ 1 ].url, video1Server2UUID) } - { - const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 2 server 2' }) - video2Server2UUID = res.body.video.uuid - } - await waitJobs(servers) // Server 1 and server 2 follow each other @@ -85,36 +80,82 @@ async function runServers (strategy: VideoRedundancyStrategy, additionalParams: await waitJobs(servers) } -async function check1WebSeed (strategy: VideoRedundancyStrategy) { +async function check1WebSeed (strategy: VideoRedundancyStrategy, videoUUID?: string) { + if (!videoUUID) videoUUID = video1Server2UUID + const webseeds = [ - 'http://localhost:9002/static/webseed/' + video1Server2UUID + 'http://localhost:9002/static/webseed/' + videoUUID ] for (const server of servers) { { - const res = await getVideo(server.url, video1Server2UUID) + const res = await getVideo(server.url, videoUUID) const video: VideoDetails = res.body - video.files.forEach(f => checkMagnetWebseeds(f, webseeds)) - } - - { - const res = await getStats(server.url) - const data: ServerStats = res.body - - expect(data.videosRedundancy).to.have.lengthOf(1) - - const stat = data.videosRedundancy[0] - expect(stat.strategy).to.equal(strategy) - expect(stat.totalSize).to.equal(102400) - expect(stat.totalUsed).to.equal(0) - expect(stat.totalVideoFiles).to.equal(0) - expect(stat.totalVideos).to.equal(0) + for (const f of video.files) { + checkMagnetWebseeds(f, webseeds, server) + } } } } -async function enableRedundancy () { +async function checkStatsWith2Webseed (strategy: VideoRedundancyStrategy) { + const res = await getStats(servers[0].url) + const data: ServerStats = res.body + + expect(data.videosRedundancy).to.have.lengthOf(1) + const stat = data.videosRedundancy[0] + + expect(stat.strategy).to.equal(strategy) + expect(stat.totalSize).to.equal(102400) + expect(stat.totalUsed).to.be.at.least(1).and.below(102401) + expect(stat.totalVideoFiles).to.equal(4) + expect(stat.totalVideos).to.equal(1) +} + +async function checkStatsWith1Webseed (strategy: VideoRedundancyStrategy) { + const res = await getStats(servers[0].url) + const data: ServerStats = res.body + + expect(data.videosRedundancy).to.have.lengthOf(1) + + const stat = data.videosRedundancy[0] + expect(stat.strategy).to.equal(strategy) + expect(stat.totalSize).to.equal(102400) + expect(stat.totalUsed).to.equal(0) + expect(stat.totalVideoFiles).to.equal(0) + expect(stat.totalVideos).to.equal(0) +} + +async function check2Webseeds (strategy: VideoRedundancyStrategy, videoUUID?: string) { + if (!videoUUID) videoUUID = video1Server2UUID + + const webseeds = [ + 'http://localhost:9001/static/webseed/' + videoUUID, + 'http://localhost:9002/static/webseed/' + videoUUID + ] + + for (const server of servers) { + { + const res = await getVideo(server.url, videoUUID) + + const video: VideoDetails = res.body + + for (const file of video.files) { + checkMagnetWebseeds(file, webseeds, server) + } + } + } + + const files = await readdir(join(root(), 'test1', 'videos')) + expect(files).to.have.lengthOf(4) + + for (const resolution of [ 240, 360, 480, 720 ]) { + expect(files.find(f => f === `${videoUUID}-${resolution}.mp4`)).to.not.be.undefined + } +} + +async function enableRedundancyOnServer1 () { await updateRedundancy(servers[ 0 ].url, servers[ 0 ].accessToken, servers[ 1 ].host, true) const res = await getFollowingListPaginationAndSort(servers[ 0 ].url, 0, 5, '-createdAt') @@ -129,50 +170,6 @@ async function enableRedundancy () { expect(server2.following.hostRedundancyAllowed).to.be.true } -async function check2Webseeds (strategy: VideoRedundancyStrategy) { - await waitJobs(servers) - await wait(15000) - await waitJobs(servers) - - const webseeds = [ - 'http://localhost:9001/static/webseed/' + video1Server2UUID, - 'http://localhost:9002/static/webseed/' + video1Server2UUID - ] - - for (const server of servers) { - { - const res = await getVideo(server.url, video1Server2UUID) - - const video: VideoDetails = res.body - - for (const file of video.files) { - checkMagnetWebseeds(file, webseeds) - } - } - } - - const files = await readdir(join(root(), 'test1', 'videos')) - expect(files).to.have.lengthOf(4) - - for (const resolution of [ 240, 360, 480, 720 ]) { - expect(files.find(f => f === `${video1Server2UUID}-${resolution}.mp4`)).to.not.be.undefined - } - - { - const res = await getStats(servers[0].url) - const data: ServerStats = res.body - - expect(data.videosRedundancy).to.have.lengthOf(1) - const stat = data.videosRedundancy[0] - - expect(stat.strategy).to.equal(strategy) - expect(stat.totalSize).to.equal(102400) - expect(stat.totalUsed).to.be.at.least(1).and.below(102401) - expect(stat.totalVideoFiles).to.equal(4) - expect(stat.totalVideos).to.equal(1) - } -} - async function cleanServers () { killallServers(servers) } @@ -188,18 +185,24 @@ describe('Test videos redundancy', function () { return runServers(strategy) }) - it('Should have 1 webseed on the first video', function () { - return check1WebSeed(strategy) + it('Should have 1 webseed on the first video', async function () { + await check1WebSeed(strategy) + await checkStatsWith1Webseed(strategy) }) it('Should enable redundancy on server 1', function () { - return enableRedundancy() + return enableRedundancyOnServer1() }) - it('Should have 2 webseed on the first video', function () { + it('Should have 2 webseed on the first video', async function () { this.timeout(40000) - return check2Webseeds(strategy) + await waitJobs(servers) + await wait(15000) + await waitJobs(servers) + + await check2Webseeds(strategy) + await checkStatsWith2Webseed(strategy) }) after(function () { @@ -216,18 +219,24 @@ describe('Test videos redundancy', function () { return runServers(strategy) }) - it('Should have 1 webseed on the first video', function () { - return check1WebSeed(strategy) + it('Should have 1 webseed on the first video', async function () { + await check1WebSeed(strategy) + await checkStatsWith1Webseed(strategy) }) it('Should enable redundancy on server 1', function () { - return enableRedundancy() + return enableRedundancyOnServer1() }) - it('Should have 2 webseed on the first video', function () { + it('Should have 2 webseed on the first video', async function () { this.timeout(40000) - return check2Webseeds(strategy) + await waitJobs(servers) + await wait(15000) + await waitJobs(servers) + + await check2Webseeds(strategy) + await checkStatsWith2Webseed(strategy) }) after(function () { @@ -241,15 +250,16 @@ describe('Test videos redundancy', function () { before(function () { this.timeout(120000) - return runServers(strategy, { minViews: 3 }) + return runServers(strategy, { min_views: 3 }) }) - it('Should have 1 webseed on the first video', function () { - return check1WebSeed(strategy) + it('Should have 1 webseed on the first video', async function () { + await check1WebSeed(strategy) + await checkStatsWith1Webseed(strategy) }) it('Should enable redundancy on server 1', function () { - return enableRedundancy() + return enableRedundancyOnServer1() }) it('Should still have 1 webseed on the first video', async function () { @@ -259,10 +269,11 @@ describe('Test videos redundancy', function () { await wait(15000) await waitJobs(servers) - return check1WebSeed(strategy) + await check1WebSeed(strategy) + await checkStatsWith1Webseed(strategy) }) - it('Should view 2 times the first video', async function () { + it('Should view 2 times the first video to have > min_views config', async function () { this.timeout(40000) await viewVideo(servers[ 0 ].url, video1Server2UUID) @@ -272,10 +283,117 @@ describe('Test videos redundancy', function () { await waitJobs(servers) }) - it('Should have 2 webseed on the first video', function () { + it('Should have 2 webseed on the first video', async function () { this.timeout(40000) - return check2Webseeds(strategy) + await waitJobs(servers) + await wait(15000) + await waitJobs(servers) + + await check2Webseeds(strategy) + await checkStatsWith2Webseed(strategy) + }) + + after(function () { + return cleanServers() + }) + }) + + describe('Test expiration', function () { + const strategy = 'recently-added' + + async function checkContains (servers: ServerInfo[], str: string) { + for (const server of servers) { + const res = await getVideo(server.url, video1Server2UUID) + const video: VideoDetails = res.body + + for (const f of video.files) { + expect(f.magnetUri).to.contain(str) + } + } + } + + async function checkNotContains (servers: ServerInfo[], str: string) { + for (const server of servers) { + const res = await getVideo(server.url, video1Server2UUID) + const video: VideoDetails = res.body + + for (const f of video.files) { + expect(f.magnetUri).to.not.contain(str) + } + } + } + + before(async function () { + this.timeout(120000) + + await runServers(strategy, { min_lifetime: '7 seconds', min_views: 0 }) + + await enableRedundancyOnServer1() + }) + + it('Should still have 2 webseeds after 10 seconds', async function () { + this.timeout(40000) + + await wait(10000) + + try { + await checkContains(servers, 'http%3A%2F%2Flocalhost%3A9001') + } catch { + // Maybe a server deleted a redundancy in the scheduler + await wait(2000) + + await checkContains(servers, 'http%3A%2F%2Flocalhost%3A9001') + } + }) + + it('Should stop server 1 and expire video redundancy', async function () { + this.timeout(40000) + + killallServers([ servers[0] ]) + + await wait(10000) + + await checkNotContains([ servers[1], servers[2] ], 'http%3A%2F%2Flocalhost%3A9001') + }) + + after(function () { + return killallServers([ servers[1], servers[2] ]) + }) + }) + + describe('Test file replacement', function () { + let video2Server2UUID: string + const strategy = 'recently-added' + + before(async function () { + this.timeout(120000) + + await runServers(strategy, { min_lifetime: '7 seconds', min_views: 0 }) + + await enableRedundancyOnServer1() + + await waitJobs(servers) + await wait(5000) + await waitJobs(servers) + + await check2Webseeds(strategy) + await checkStatsWith2Webseed(strategy) + + const res = await uploadVideo(servers[ 1 ].url, servers[ 1 ].accessToken, { name: 'video 2 server 2' }) + video2Server2UUID = res.body.video.uuid + }) + + it('Should cache video 2 webseed on the first video', async function () { + this.timeout(40000) + this.retries(3) + + await waitJobs(servers) + + await wait(7000) + + await check1WebSeed(strategy, video1Server2UUID) + await check2Webseeds(strategy, video2Server2UUID) }) after(function () { diff --git a/server/tests/utils/server/servers.ts b/server/tests/utils/server/servers.ts index 26ab4e1bb..fbfc83ca1 100644 --- a/server/tests/utils/server/servers.ts +++ b/server/tests/utils/server/servers.ts @@ -144,8 +144,8 @@ function runServer (serverNumber: number, configOverride?: Object) { }) } -async function reRunServer (server: ServerInfo) { - const newServer = await runServer(server.serverNumber) +async function reRunServer (server: ServerInfo, configOverride?: any) { + const newServer = await runServer(server.serverNumber, configOverride) server.app = newServer.app return server diff --git a/shared/models/redundancy/videos-redundancy.model.ts b/shared/models/redundancy/videos-redundancy.model.ts index 436394c1e..a8c2743c1 100644 --- a/shared/models/redundancy/videos-redundancy.model.ts +++ b/shared/models/redundancy/videos-redundancy.model.ts @@ -3,17 +3,20 @@ export type VideoRedundancyStrategy = 'most-views' | 'trending' | 'recently-adde export type MostViewsRedundancyStrategy = { strategy: 'most-views' size: number + minLifetime: number } export type TrendingRedundancyStrategy = { strategy: 'trending' size: number + minLifetime: number } export type RecentlyAddedStrategy = { strategy: 'recently-added' size: number minViews: number + minLifetime: number } export type VideosRedundancy = MostViewsRedundancyStrategy | TrendingRedundancyStrategy | RecentlyAddedStrategy diff --git a/support/doc/redundancy.md b/support/doc/redundancy.md new file mode 100644 index 000000000..62c6365a9 --- /dev/null +++ b/support/doc/redundancy.md @@ -0,0 +1,46 @@ +# Redundancy + +A PeerTube instance can cache other PeerTube videos to improve bandwidth of popular videos or small instances. + +## How it works + +The instance administrator can choose between multiple redundancy strategies (cache trending videos or recently uploaded videos etc), set their maximum size and the minimum duplication lifetime. +Then, they choose the instances they want to cache in `Manage follows -> Following` admin table. + +Videos are kept in the cache for at least `min_lifetime`, and then evicted when the cache is full. + +When PeerTube chooses a video to duplicate, it imports all the resolution files (to avoid consistency issues) using their magnet URI and put them in the `storage.videos` directory. +Then it sends a `Create -> CacheFile` ActivityPub message to other federated instances. This new instance is injected as [WebSeed](https://github.com/Chocobozzz/PeerTube/blob/develop/FAQ.md#what-is-webseed) in the magnet URI by instances that received this ActivityPub message. + +## Stats + +See the `/api/v1/server/stats` endpoint. For example: + +``` +{ + ... + "videosRedundancy": [ + { + "totalUsed": 0, + "totalVideos": 0, + "totalVideoFiles": 0, + "strategy": "most-views", + "totalSize": 104857600 + }, + { + "totalUsed": 0, + "totalVideos": 0, + "totalVideoFiles": 0, + "strategy": "trending", + "totalSize": 104857600 + }, + { + "totalUsed": 0, + "totalVideos": 0, + "totalVideoFiles": 0, + "strategy": "recently-added", + "totalSize": 104857600 + } + ] +} +``` \ No newline at end of file