diff --git a/client/src/app/+admin/system/jobs/jobs.component.ts b/client/src/app/+admin/system/jobs/jobs.component.ts index b12d7f80a..2cf1bff7a 100644 --- a/client/src/app/+admin/system/jobs/jobs.component.ts +++ b/client/src/app/+admin/system/jobs/jobs.component.ts @@ -36,7 +36,7 @@ export class JobsComponent extends RestTable implements OnInit { 'video-live-ending', 'video-redundancy', 'video-transcoding', - 'videos-views', + 'videos-views-stats', 'move-to-object-storage' ] diff --git a/client/src/app/+videos/+video-watch/video-watch.component.ts b/client/src/app/+videos/+video-watch/video-watch.component.ts index 5ca9d5fa9..fd61bcbf0 100644 --- a/client/src/app/+videos/+video-watch/video-watch.component.ts +++ b/client/src/app/+videos/+video-watch/video-watch.component.ts @@ -658,7 +658,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { return this.peertubeSocket.getLiveVideosObservable() .subscribe(({ type, payload }) => { if (type === 'state-change') return this.handleLiveStateChange(payload.state) - if (type === 'views-change') return this.handleLiveViewsChange(payload.views) + if (type === 'views-change') return this.handleLiveViewsChange(payload.viewers) }) } @@ -677,7 +677,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { this.loadVideo(videoUUID) } - private handleLiveViewsChange (newViews: number) { + private handleLiveViewsChange (newViewers: number) { if (!this.video) { console.error('Cannot update video live views because video is no defined.') return @@ -685,7 +685,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { console.log('Updating live views.') - this.video.views = newViews + this.video.viewers = newViewers } private initHotkeys () { diff --git a/client/src/app/shared/shared-main/video/video.model.ts b/client/src/app/shared/shared-main/video/video.model.ts index b11316471..472a8c810 100644 --- a/client/src/app/shared/shared-main/video/video.model.ts +++ b/client/src/app/shared/shared-main/video/video.model.ts @@ -57,6 +57,9 @@ export class Video implements VideoServerModel { url: string views: number + // If live + viewers?: number + likes: number dislikes: number nsfw: boolean @@ -150,6 +153,7 @@ export class Video implements VideoServerModel { this.url = hash.url this.views = hash.views + this.viewers = hash.viewers this.likes = hash.likes this.dislikes = hash.dislikes diff --git a/client/src/app/shared/shared-video/video-views-counter.component.html b/client/src/app/shared/shared-video/video-views-counter.component.html index a6679f74d..b19c8b137 100644 --- a/client/src/app/shared/shared-video/video-views-counter.component.html +++ b/client/src/app/shared/shared-video/video-views-counter.component.html @@ -4,6 +4,6 @@ - {video.views, plural, =1 {1 viewer} other {{{ video.views | myNumberFormatter }} viewers}} + {video.viewers, plural, =1 {1 viewer} other {{{ video.viewers | myNumberFormatter }} viewers}} diff --git a/config/default.yaml b/config/default.yaml index c30c29a6b..ee7acb437 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -232,6 +232,11 @@ views: remote: max_age: '30 days' + # PeerTube buffers local video views before updating and federating the video + local_buffer_update_interval: '30 minutes' + + ip_view_expiration: '1 hour' + plugins: # The website PeerTube will ask for available PeerTube plugins and themes # This is an unmoderated plugin index, so only install plugins/themes you trust diff --git a/config/production.yaml.example b/config/production.yaml.example index 4dc5c281d..0175c7a12 100644 --- a/config/production.yaml.example +++ b/config/production.yaml.example @@ -230,6 +230,11 @@ views: remote: max_age: '30 days' + # PeerTube buffers local video views before updating and federating the video + local_buffer_update_interval: '30 minutes' + + ip_view_expiration: '1 hour' + plugins: # The website PeerTube will ask for available PeerTube plugins and themes # This is an unmoderated plugin index, so only install plugins/themes you trust diff --git a/config/test.yaml b/config/test.yaml index e9731d863..2e7f982d3 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -160,3 +160,6 @@ views: videos: remote: max_age: -1 + + local_buffer_update_interval: '5 seconds' + ip_view_expiration: '1 second' diff --git a/server.ts b/server.ts index b8c1d1251..6a7dad0cd 100644 --- a/server.ts +++ b/server.ts @@ -117,6 +117,7 @@ import { VideosRedundancyScheduler } from './server/lib/schedulers/videos-redund import { RemoveOldHistoryScheduler } from './server/lib/schedulers/remove-old-history-scheduler' import { AutoFollowIndexInstances } from './server/lib/schedulers/auto-follow-index-instances' import { RemoveDanglingResumableUploadsScheduler } from './server/lib/schedulers/remove-dangling-resumable-uploads-scheduler' +import { VideoViewsBufferScheduler } from './server/lib/schedulers/video-views-buffer-scheduler' import { isHTTPSignatureDigestValid } from './server/helpers/peertube-crypto' import { PeerTubeSocket } from './server/lib/peertube-socket' import { updateStreamingPlaylistsInfohashesIfNeeded } from './server/lib/hls' @@ -128,6 +129,7 @@ import { LiveManager } from './server/lib/live' import { HttpStatusCode } from './shared/models/http/http-error-codes' import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache' import { ServerConfigManager } from '@server/lib/server-config-manager' +import { VideoViews } from '@server/lib/video-views' // ----------- Command line ----------- @@ -296,11 +298,11 @@ async function startApplication () { PeerTubeVersionCheckScheduler.Instance.enable() AutoFollowIndexInstances.Instance.enable() RemoveDanglingResumableUploadsScheduler.Instance.enable() + VideoViewsBufferScheduler.Instance.enable() - // Redis initialization Redis.Instance.init() - PeerTubeSocket.Instance.init(server) + VideoViews.Instance.init() updateStreamingPlaylistsInfohashesIfNeeded() .catch(err => logger.error('Cannot update streaming playlist infohashes.', { err })) diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index 821161c64..72b382595 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -2,7 +2,7 @@ import express from 'express' import toInt from 'validator/lib/toInt' import { pickCommonVideoQuery } from '@server/helpers/query' import { doJSONRequest } from '@server/helpers/requests' -import { LiveManager } from '@server/lib/live' +import { VideoViews } from '@server/lib/video-views' import { openapiOperationDoc } from '@server/middlewares/doc' import { getServerActor } from '@server/models/application/application' import { guessAdditionalAttributesFromQuery } from '@server/models/video/formatter/video-format-utils' @@ -17,7 +17,6 @@ import { sequelizeTypescript } from '../../../initializers/database' import { sendView } from '../../../lib/activitypub/send/send-view' import { JobQueue } from '../../../lib/job-queue' import { Hooks } from '../../../lib/plugins/hooks' -import { Redis } from '../../../lib/redis' import { asyncMiddleware, asyncRetryTransactionMiddleware, @@ -107,7 +106,7 @@ videosRouter.get('/:id', ) videosRouter.post('/:id/views', openapiOperationDoc({ operationId: 'addView' }), - asyncMiddleware(videosCustomGetValidator('only-immutable-attributes')), + asyncMiddleware(videosCustomGetValidator('only-video')), asyncMiddleware(viewVideo) ) @@ -153,45 +152,18 @@ function getVideo (_req: express.Request, res: express.Response) { } async function viewVideo (req: express.Request, res: express.Response) { - const immutableVideoAttrs = res.locals.onlyImmutableVideo + const video = res.locals.onlyVideo const ip = req.ip - const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) - if (exists) { - logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid) - return res.status(HttpStatusCode.NO_CONTENT_204).end() - } + const success = await VideoViews.Instance.processView({ video, ip }) - const video = await VideoModel.load(immutableVideoAttrs.id) - - const promises: Promise[] = [ - Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive) - ] - - let federateView = true - - // Increment our live manager - if (video.isLive && video.isOwned()) { - LiveManager.Instance.addViewTo(video.id) - - // Views of our local live will be sent by our live manager - federateView = false - } - - // Increment our video views cache counter - if (!video.isLive) { - promises.push(Redis.Instance.addVideoView(video.id)) - } - - if (federateView) { + if (success) { const serverActor = await getServerActor() - promises.push(sendView(serverActor, video, undefined)) + await sendView(serverActor, video, undefined) + + Hooks.runAction('action:api.video.viewed', { video: video, ip }) } - await Promise.all(promises) - - Hooks.runAction('action:api.video.viewed', { video, ip }) - return res.status(HttpStatusCode.NO_CONTENT_204).end() } diff --git a/server/initializers/checker-before-init.ts b/server/initializers/checker-before-init.ts index 1015c5e45..51c396548 100644 --- a/server/initializers/checker-before-init.ts +++ b/server/initializers/checker-before-init.ts @@ -38,7 +38,7 @@ function checkMissedConfig () { 'services.twitter.username', 'services.twitter.whitelisted', 'followers.instance.enabled', 'followers.instance.manual_approval', 'tracker.enabled', 'tracker.private', 'tracker.reject_too_many_announces', - 'history.videos.max_age', 'views.videos.remote.max_age', + 'history.videos.max_age', 'views.videos.remote.max_age', 'views.videos.local_buffer_update_interval', 'views.videos.ip_view_expiration', 'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', 'theme.default', 'remote_redundancy.videos.accept_from', diff --git a/server/initializers/config.ts b/server/initializers/config.ts index 1288768d8..dadda2a77 100644 --- a/server/initializers/config.ts +++ b/server/initializers/config.ts @@ -182,7 +182,9 @@ const CONFIG = { VIDEOS: { REMOTE: { MAX_AGE: parseDurationToMs(config.get('views.videos.remote.max_age')) - } + }, + LOCAL_BUFFER_UPDATE_INTERVAL: parseDurationToMs(config.get('views.videos.local_buffer_update_interval')), + IP_VIEW_EXPIRATION: parseDurationToMs(config.get('views.videos.ip_view_expiration')) } }, PLUGINS: { diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 845576667..b65741bbd 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -148,7 +148,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { 'video-import': 1, 'email': 5, 'actor-keys': 3, - 'videos-views': 1, + 'videos-views-stats': 1, 'activitypub-refresher': 1, 'video-redundancy': 1, 'video-live-ending': 1, @@ -164,7 +164,7 @@ const JOB_CONCURRENCY: { [id in Exclude) { +async function processViewActivity (options: APProcessorOptions) { const { activity, byActor } = options + return processCreateView(activity, byActor) } @@ -19,10 +19,8 @@ export { // --------------------------------------------------------------------------- -async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { - const videoObject = activity.type === 'View' - ? activity.object - : (activity.object as ViewObject).object +async function processCreateView (activity: ActivityView, byActor: MActorSignature) { + const videoObject = activity.object const { video } = await getOrCreateAPVideo({ videoObject, @@ -30,17 +28,13 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct allowRefresh: false }) - if (!video.isLive) { - await Redis.Instance.addVideoView(video.id) - } + const viewerExpires = activity.expires + ? new Date(activity.expires) + : undefined + + await VideoViews.Instance.processView({ video, ip: null, viewerExpires }) if (video.isOwned()) { - // Our live manager will increment the counter and send the view to followers - if (video.isLive) { - LiveManager.Instance.addViewTo(video.id) - return - } - // Forward the view but don't resend the activity to the sender const exceptions = [ byActor ] await forwardVideoRelatedActivity(activity, undefined, exceptions, video) diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 153e94295..b12583e26 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts @@ -1,4 +1,5 @@ import { Transaction } from 'sequelize' +import { VideoViews } from '@server/lib/video-views' import { MActorAudience, MVideoImmutable, MVideoUrl } from '@server/types/models' import { ActivityAudience, ActivityView } from '../../../../shared/models/activitypub' import { logger } from '../../../helpers/logger' @@ -27,7 +28,8 @@ function buildViewActivity (url: string, byActor: MActorAudience, video: MVideoU id: url, type: 'View' as 'View', actor: byActor.url, - object: video.url + object: video.url, + expires: new Date(VideoViews.Instance.buildViewerExpireTime()).toISOString() }, audience ) diff --git a/server/lib/activitypub/videos/updater.ts b/server/lib/activitypub/videos/updater.ts index 157569414..f786bb196 100644 --- a/server/lib/activitypub/videos/updater.ts +++ b/server/lib/activitypub/videos/updater.ts @@ -81,7 +81,6 @@ export class APVideoUpdater extends APVideoAbstractBuilder { if (videoUpdated.isLive) { PeerTubeSocket.Instance.sendVideoLiveNewState(videoUpdated) - PeerTubeSocket.Instance.sendVideoViewsUpdate(videoUpdated) } logger.info('Remote video with uuid %s updated', this.videoObject.uuid, this.lTags()) diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views-stats.ts similarity index 52% rename from server/lib/job-queue/handlers/video-views.ts rename to server/lib/job-queue/handlers/video-views-stats.ts index 86d0a271f..caf5f6962 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views-stats.ts @@ -1,11 +1,10 @@ -import { Redis } from '../../redis' +import { isTestInstance } from '../../../helpers/core-utils' import { logger } from '../../../helpers/logger' import { VideoModel } from '../../../models/video/video' import { VideoViewModel } from '../../../models/video/video-view' -import { isTestInstance } from '../../../helpers/core-utils' -import { federateVideoIfNeeded } from '../../activitypub/videos' +import { Redis } from '../../redis' -async function processVideosViews () { +async function processVideosViewsStats () { const lastHour = new Date() // In test mode, we run this function multiple times per hour, so we don't want the values of the previous hour @@ -15,23 +14,23 @@ async function processVideosViews () { const startDate = lastHour.setMinutes(0, 0, 0) const endDate = lastHour.setMinutes(59, 59, 999) - const videoIds = await Redis.Instance.getVideosIdViewed(hour) + const videoIds = await Redis.Instance.listVideosViewedForStats(hour) if (videoIds.length === 0) return - logger.info('Processing videos views in job for hour %d.', hour) + logger.info('Processing videos views stats in job for hour %d.', hour) for (const videoId of videoIds) { try { - const views = await Redis.Instance.getVideoViews(videoId, hour) - await Redis.Instance.deleteVideoViews(videoId, hour) + const views = await Redis.Instance.getVideoViewsStats(videoId, hour) + await Redis.Instance.deleteVideoViewsStats(videoId, hour) if (views) { - logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) + logger.debug('Adding %d views to video %d stats in hour %d.', views, videoId, hour) try { - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + const video = await VideoModel.load(videoId) if (!video) { - logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId) + logger.debug('Video %d does not exist anymore, skipping videos view stats.', videoId) continue } @@ -41,21 +40,12 @@ async function processVideosViews () { views, videoId }) - - if (video.isOwned()) { - // If this is a remote video, the origin instance will send us an update - await VideoModel.incrementViews(videoId, views) - - // Send video update - video.views += views - await federateVideoIfNeeded(video, false) - } } catch (err) { - logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) + logger.error('Cannot create video views stats for video %d in hour %d.', videoId, hour, { err }) } } } catch (err) { - logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) + logger.error('Cannot update video views stats of video %d in hour %d.', videoId, hour, { err }) } } } @@ -63,5 +53,5 @@ async function processVideosViews () { // --------------------------------------------------------------------------- export { - processVideosViews + processVideosViewsStats } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0eab720d9..4c1597b33 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -36,7 +36,7 @@ import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoTranscoding } from './handlers/video-transcoding' -import { processVideosViews } from './handlers/video-views' +import { processVideosViewsStats } from './handlers/video-views-stats' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -49,7 +49,7 @@ type CreateJobArgument = { type: 'email', payload: EmailPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | - { type: 'videos-views', payload: {} } | + { type: 'videos-views-stats', payload: {} } | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'actor-keys', payload: ActorKeysPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } | @@ -71,7 +71,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'video-transcoding': processVideoTranscoding, 'email': processEmail, 'video-import': processVideoImport, - 'videos-views': processVideosViews, + 'videos-views-stats': processVideosViewsStats, 'activitypub-refresher': refreshAPObject, 'video-live-ending': processVideoLiveEnding, 'actor-keys': processActorKeys, @@ -89,7 +89,7 @@ const jobTypes: JobType[] = [ 'video-transcoding', 'video-file-import', 'video-import', - 'videos-views', + 'videos-views-stats', 'activitypub-refresher', 'video-redundancy', 'actor-keys', @@ -247,8 +247,8 @@ class JobQueue { } private addRepeatableJobs () { - this.queues['videos-views'].add({}, { - repeat: REPEAT_JOBS['videos-views'] + this.queues['videos-views-stats'].add({}, { + repeat: REPEAT_JOBS['videos-views-stats'] }).catch(err => logger.error('Cannot add repeatable job.', { err })) if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1b7b9dd4d..2562edb75 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -2,7 +2,6 @@ import { readFile } from 'fs-extra' import { createServer, Server } from 'net' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' -import { isTestInstance } from '@server/helpers/core-utils' import { computeResolutionsToTranscode, ffprobePromise, @@ -12,7 +11,7 @@ import { } from '@server/helpers/ffprobe-utils' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' +import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' @@ -53,8 +52,6 @@ class LiveManager { private readonly muxingSessions = new Map() private readonly videoSessions = new Map() - // Values are Date().getTime() - private readonly watchersPerVideo = new Map() private rtmpServer: Server private rtmpsServer: ServerTLS @@ -99,8 +96,6 @@ class LiveManager { // Cleanup broken lives, that were terminated by a server restart for example this.handleBrokenLives() .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) - - setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } async run () { @@ -184,19 +179,6 @@ class LiveManager { this.abortSession(sessionId) } - addViewTo (videoId: number) { - if (this.videoSessions.has(videoId) === false) return - - let watchers = this.watchersPerVideo.get(videoId) - - if (!watchers) { - watchers = [] - this.watchersPerVideo.set(videoId, watchers) - } - - watchers.push(new Date().getTime()) - } - private getContext () { return context } @@ -377,7 +359,6 @@ class LiveManager { } private onMuxingFFmpegEnd (videoId: number) { - this.watchersPerVideo.delete(videoId) this.videoSessions.delete(videoId) } @@ -411,34 +392,6 @@ class LiveManager { } } - private async updateLiveViews () { - if (!this.isRunning()) return - - if (!isTestInstance()) logger.info('Updating live video views.', lTags()) - - for (const videoId of this.watchersPerVideo.keys()) { - const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE - - const watchers = this.watchersPerVideo.get(videoId) - - const numWatchers = watchers.length - - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) - video.views = numWatchers - await video.save() - - await federateVideoIfNeeded(video, false) - - PeerTubeSocket.Instance.sendVideoViewsUpdate(video) - - // Only keep not expired watchers - const newWatchers = watchers.filter(w => w > notBefore) - this.watchersPerVideo.set(videoId, newWatchers) - - logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) - } - } - private async handleBrokenLives () { const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() diff --git a/server/lib/peertube-socket.ts b/server/lib/peertube-socket.ts index 901435dea..0398ca61d 100644 --- a/server/lib/peertube-socket.ts +++ b/server/lib/peertube-socket.ts @@ -1,7 +1,7 @@ import { Server as HTTPServer } from 'http' import { Namespace, Server as SocketServer, Socket } from 'socket.io' import { isIdValid } from '@server/helpers/custom-validators/misc' -import { MVideo } from '@server/types/models' +import { MVideo, MVideoImmutable } from '@server/types/models' import { UserNotificationModelForApi } from '@server/types/models/user' import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' import { logger } from '../helpers/logger' @@ -78,11 +78,11 @@ class PeerTubeSocket { .emit(type, data) } - sendVideoViewsUpdate (video: MVideo) { - const data: LiveVideoEventPayload = { views: video.views } + sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { + const data: LiveVideoEventPayload = { viewers: numViewers, views: numViewers } const type: LiveVideoEventType = 'views-change' - logger.debug('Sending video live views update notification of %s.', video.url, { views: video.views }) + logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) this.liveVideosNamespace .in(video.id) diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 46617b07e..76b7868e8 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -13,6 +13,7 @@ import { RESUMABLE_UPLOAD_SESSION_LIFETIME } from '../initializers/constants' import { CONFIG } from '../initializers/config' +import { exists } from '@server/helpers/custom-validators/misc' type CachedRoute = { body: string @@ -119,16 +120,20 @@ class Redis { /* ************ Views per IP ************ */ - setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { - const lifetime = isLive - ? VIEW_LIFETIME.LIVE - : VIEW_LIFETIME.VIDEO + setIPVideoView (ip: string, videoUUID: string) { + return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW) + } - return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) + setIPVideoViewer (ip: string, videoUUID: string) { + return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER) } async doesVideoIPViewExist (ip: string, videoUUID: string) { - return this.exists(this.generateViewKey(ip, videoUUID)) + return this.exists(this.generateIPViewKey(ip, videoUUID)) + } + + async doesVideoIPViewerExist (ip: string, videoUUID: string) { + return this.exists(this.generateIPViewerKey(ip, videoUUID)) } /* ************ Tracker IP block ************ */ @@ -160,46 +165,85 @@ class Redis { return this.setObject(this.generateCachedRouteKey(req), cached, lifetime) } - /* ************ Video views ************ */ + /* ************ Video views stats ************ */ - addVideoView (videoId: number) { - const keyIncr = this.generateVideoViewKey(videoId) - const keySet = this.generateVideosViewKey() + addVideoViewStats (videoId: number) { + const { videoKey, setKey } = this.generateVideoViewStatsKeys({ videoId }) return Promise.all([ - this.addToSet(keySet, videoId.toString()), - this.increment(keyIncr) + this.addToSet(setKey, videoId.toString()), + this.increment(videoKey) ]) } - async getVideoViews (videoId: number, hour: number) { - const key = this.generateVideoViewKey(videoId, hour) + async getVideoViewsStats (videoId: number, hour: number) { + const { videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) - const valueString = await this.getValue(key) + const valueString = await this.getValue(videoKey) const valueInt = parseInt(valueString, 10) if (isNaN(valueInt)) { - logger.error('Cannot get videos views of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) + logger.error('Cannot get videos views stats of video %d in hour %d: views number is NaN (%s).', videoId, hour, valueString) return undefined } return valueInt } - async getVideosIdViewed (hour: number) { - const key = this.generateVideosViewKey(hour) + async listVideosViewedForStats (hour: number) { + const { setKey } = this.generateVideoViewStatsKeys({ hour }) - const stringIds = await this.getSet(key) + const stringIds = await this.getSet(setKey) return stringIds.map(s => parseInt(s, 10)) } - deleteVideoViews (videoId: number, hour: number) { - const keySet = this.generateVideosViewKey(hour) - const keyIncr = this.generateVideoViewKey(videoId, hour) + deleteVideoViewsStats (videoId: number, hour: number) { + const { setKey, videoKey } = this.generateVideoViewStatsKeys({ videoId, hour }) return Promise.all([ - this.deleteFromSet(keySet, videoId.toString()), - this.deleteKey(keyIncr) + this.deleteFromSet(setKey, videoId.toString()), + this.deleteKey(videoKey) + ]) + } + + /* ************ Local video views buffer ************ */ + + addLocalVideoView (videoId: number) { + const { videoKey, setKey } = this.generateLocalVideoViewsKeys(videoId) + + return Promise.all([ + this.addToSet(setKey, videoId.toString()), + this.increment(videoKey) + ]) + } + + async getLocalVideoViews (videoId: number) { + const { videoKey } = this.generateLocalVideoViewsKeys(videoId) + + const valueString = await this.getValue(videoKey) + const valueInt = parseInt(valueString, 10) + + if (isNaN(valueInt)) { + logger.error('Cannot get videos views of video %d: views number is NaN (%s).', videoId, valueString) + return undefined + } + + return valueInt + } + + async listLocalVideosViewed () { + const { setKey } = this.generateLocalVideoViewsKeys() + + const stringIds = await this.getSet(setKey) + return stringIds.map(s => parseInt(s, 10)) + } + + deleteLocalVideoViews (videoId: number) { + const { setKey, videoKey } = this.generateLocalVideoViewsKeys(videoId) + + return Promise.all([ + this.deleteFromSet(setKey, videoId.toString()), + this.deleteKey(videoKey) ]) } @@ -233,16 +277,16 @@ class Redis { return req.method + '-' + req.originalUrl } - private generateVideosViewKey (hour?: number) { - if (!hour) hour = new Date().getHours() - - return `videos-view-h${hour}` + private generateLocalVideoViewsKeys (videoId?: Number) { + return { setKey: `local-video-views-buffer`, videoKey: `local-video-views-buffer-${videoId}` } } - private generateVideoViewKey (videoId: number, hour?: number) { - if (hour === undefined || hour === null) hour = new Date().getHours() + private generateVideoViewStatsKeys (options: { videoId?: number, hour?: number }) { + const hour = exists(options.hour) + ? options.hour + : new Date().getHours() - return `video-view-${videoId}-h${hour}` + return { setKey: `videos-view-h${hour}`, videoKey: `video-view-${options.videoId}-h${hour}` } } private generateResetPasswordKey (userId: number) { @@ -253,10 +297,14 @@ class Redis { return 'verify-email-' + userId } - private generateViewKey (ip: string, videoUUID: string) { + private generateIPViewKey (ip: string, videoUUID: string) { return `views-${videoUUID}-${ip}` } + private generateIPViewerKey (ip: string, videoUUID: string) { + return `viewer-${videoUUID}-${ip}` + } + private generateTrackerBlockIPKey (ip: string) { return `tracker-block-ip-${ip}` } diff --git a/server/lib/schedulers/video-views-buffer-scheduler.ts b/server/lib/schedulers/video-views-buffer-scheduler.ts new file mode 100644 index 000000000..c0e72c461 --- /dev/null +++ b/server/lib/schedulers/video-views-buffer-scheduler.ts @@ -0,0 +1,52 @@ +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { VideoModel } from '@server/models/video/video' +import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants' +import { federateVideoIfNeeded } from '../activitypub/videos' +import { Redis } from '../redis' +import { AbstractScheduler } from './abstract-scheduler' + +const lTags = loggerTagsFactory('views') + +export class VideoViewsBufferScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.VIDEO_VIEWS_BUFFER_UPDATE + + private constructor () { + super() + } + + protected async internalExecute () { + const videoIds = await Redis.Instance.listLocalVideosViewed() + if (videoIds.length === 0) return + + logger.info('Processing local video views buffer.', { videoIds, ...lTags() }) + + for (const videoId of videoIds) { + try { + const views = await Redis.Instance.getLocalVideoViews(videoId) + await Redis.Instance.deleteLocalVideoViews(videoId) + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + if (!video) { + logger.debug('Video %d does not exist anymore, skipping videos view addition.', videoId, lTags()) + continue + } + + // If this is a remote video, the origin instance will send us an update + await VideoModel.incrementViews(videoId, views) + + // Send video update + video.views += views + await federateVideoIfNeeded(video, false) + } catch (err) { + logger.error('Cannot process local video views buffer of video %d.', videoId, { err, ...lTags() }) + } + } + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/lib/video-views.ts b/server/lib/video-views.ts new file mode 100644 index 000000000..220b509c2 --- /dev/null +++ b/server/lib/video-views.ts @@ -0,0 +1,130 @@ +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { VIEW_LIFETIME } from '@server/initializers/constants' +import { VideoModel } from '@server/models/video/video' +import { MVideo } from '@server/types/models' +import { PeerTubeSocket } from './peertube-socket' +import { Redis } from './redis' + +const lTags = loggerTagsFactory('views') + +export class VideoViews { + + // Values are Date().getTime() + private readonly viewersPerVideo = new Map() + + private static instance: VideoViews + + private constructor () { + } + + init () { + setInterval(() => this.cleanViewers(), VIEW_LIFETIME.VIEWER) + } + + async processView (options: { + video: MVideo + ip: string | null + viewerExpires?: Date + }) { + const { video, ip, viewerExpires } = options + + logger.debug('Processing view for %s and ip %s.', video.url, ip, lTags()) + + let success = await this.addView(video, ip) + + if (video.isLive) { + const successViewer = await this.addViewer(video, ip, viewerExpires) + success ||= successViewer + } + + return success + } + + getViewers (video: MVideo) { + const viewers = this.viewersPerVideo.get(video.id) + if (!viewers) return 0 + + return viewers.length + } + + buildViewerExpireTime () { + return new Date().getTime() + VIEW_LIFETIME.VIEWER + } + + private async addView (video: MVideo, ip: string | null) { + const promises: Promise[] = [] + + if (ip !== null) { + const viewExists = await Redis.Instance.doesVideoIPViewExist(ip, video.uuid) + if (viewExists) return false + + promises.push(Redis.Instance.setIPVideoView(ip, video.uuid)) + } + + if (video.isOwned()) { + promises.push(Redis.Instance.addLocalVideoView(video.id)) + } + + promises.push(Redis.Instance.addVideoViewStats(video.id)) + + await Promise.all(promises) + + return true + } + + private async addViewer (video: MVideo, ip: string | null, viewerExpires?: Date) { + if (ip !== null) { + const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid) + if (viewExists) return false + + await Redis.Instance.setIPVideoViewer(ip, video.uuid) + } + + let watchers = this.viewersPerVideo.get(video.id) + + if (!watchers) { + watchers = [] + this.viewersPerVideo.set(video.id, watchers) + } + + const expiration = viewerExpires + ? viewerExpires.getTime() + : this.buildViewerExpireTime() + + watchers.push(expiration) + await this.notifyClients(video.id, watchers.length) + + return true + } + + private async cleanViewers () { + logger.info('Cleaning video viewers.', lTags()) + + for (const videoId of this.viewersPerVideo.keys()) { + const notBefore = new Date().getTime() + + const viewers = this.viewersPerVideo.get(videoId) + + // Only keep not expired viewers + const newViewers = viewers.filter(w => w > notBefore) + + if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) + else this.viewersPerVideo.set(videoId, newViewers) + + await this.notifyClients(videoId, newViewers.length) + } + } + + private async notifyClients (videoId: string | number, viewersLength: number) { + const video = await VideoModel.loadImmutableAttributes(videoId) + if (!video) return + + PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) + + logger.debug('Live video views update for %s is %d.', video.url, viewersLength, lTags()) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/models/video/formatter/video-format-utils.ts b/server/models/video/formatter/video-format-utils.ts index ba49e41ae..461e296df 100644 --- a/server/models/video/formatter/video-format-utils.ts +++ b/server/models/video/formatter/video-format-utils.ts @@ -1,6 +1,7 @@ import { uuidToShort } from '@server/helpers/uuid' import { generateMagnetUri } from '@server/helpers/webtorrent' import { getLocalVideoFileMetadataUrl } from '@server/lib/video-urls' +import { VideoViews } from '@server/lib/video-views' import { VideosCommonQueryAfterSanitize } from '@shared/models' import { VideoFile } from '@shared/models/videos/video-file.model' import { ActivityTagObject, ActivityUrlObject, VideoObject } from '../../../../shared/models/activitypub/objects' @@ -121,6 +122,10 @@ function videoModelToFormattedJSON (video: MVideoFormattable, options: VideoForm pluginData: (video as any).pluginData } + if (video.isLive) { + videoObject.viewers = VideoViews.Instance.getViewers(video) + } + const add = options.additionalAttributes if (add?.state === true) { videoObject.state = { diff --git a/server/tests/api/live/live-views.ts b/server/tests/api/live/live-views.ts index 5e3a79c64..9186af8e7 100644 --- a/server/tests/api/live/live-views.ts +++ b/server/tests/api/live/live-views.ts @@ -19,7 +19,7 @@ import { const expect = chai.expect -describe('Test live', function () { +describe('Live views', function () { let servers: PeerTubeServer[] = [] before(async function () { @@ -47,79 +47,86 @@ describe('Test live', function () { await doubleFollow(servers[0], servers[1]) }) - describe('Live views', function () { - let liveVideoId: string - let command: FfmpegCommand + let liveVideoId: string + let command: FfmpegCommand - async function countViews (expected: number) { - for (const server of servers) { - const video = await server.videos.get({ id: liveVideoId }) - expect(video.views).to.equal(expected) - } + async function countViewers (expectedViewers: number) { + for (const server of servers) { + const video = await server.videos.get({ id: liveVideoId }) + expect(video.viewers).to.equal(expectedViewers) + } + } + + async function countViews (expectedViews: number) { + for (const server of servers) { + const video = await server.videos.get({ id: liveVideoId }) + expect(video.views).to.equal(expectedViews) + } + } + + before(async function () { + this.timeout(30000) + + const liveAttributes = { + name: 'live video', + channelId: servers[0].store.channel.id, + privacy: VideoPrivacy.PUBLIC } - before(async function () { - this.timeout(30000) + const live = await servers[0].live.create({ fields: liveAttributes }) + liveVideoId = live.uuid - const liveAttributes = { - name: 'live video', - channelId: servers[0].store.channel.id, - privacy: VideoPrivacy.PUBLIC - } + command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) + await waitUntilLivePublishedOnAllServers(servers, liveVideoId) + await waitJobs(servers) + }) - const live = await servers[0].live.create({ fields: liveAttributes }) - liveVideoId = live.uuid + it('Should display no views and viewers for a live', async function () { + await countViews(0) + await countViewers(0) + }) - command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoId }) - await waitUntilLivePublishedOnAllServers(servers, liveVideoId) - await waitJobs(servers) - }) + it('Should view a live twice and display 1 view/viewer', async function () { + this.timeout(30000) - it('Should display no views for a live', async function () { - await countViews(0) - }) + await servers[0].videos.view({ id: liveVideoId }) + await servers[0].videos.view({ id: liveVideoId }) - it('Should view a live twice and display 1 view', async function () { - this.timeout(30000) + await waitJobs(servers) + await countViewers(1) - await servers[0].videos.view({ id: liveVideoId }) - await servers[0].videos.view({ id: liveVideoId }) + await wait(7000) + await countViews(1) + }) - await wait(7000) + it('Should wait and display 0 viewers while still have 1 view', async function () { + this.timeout(30000) - await waitJobs(servers) + await wait(12000) + await waitJobs(servers) - await countViews(1) - }) + await countViews(1) + await countViewers(0) + }) - it('Should wait and display 0 views', async function () { - this.timeout(30000) + it('Should view a live on a remote and on local and display 2 viewers and 3 views', async function () { + this.timeout(30000) - await wait(12000) - await waitJobs(servers) + await servers[0].videos.view({ id: liveVideoId }) + await servers[1].videos.view({ id: liveVideoId }) + await servers[1].videos.view({ id: liveVideoId }) + await waitJobs(servers) - await countViews(0) - }) + await countViewers(2) - it('Should view a live on a remote and on local and display 2 views', async function () { - this.timeout(30000) + await wait(7000) + await waitJobs(servers) - await servers[0].videos.view({ id: liveVideoId }) - await servers[1].videos.view({ id: liveVideoId }) - await servers[1].videos.view({ id: liveVideoId }) - - await wait(7000) - await waitJobs(servers) - - await countViews(2) - }) - - after(async function () { - await stopFfmpeg(command) - }) + await countViews(3) }) after(async function () { + await stopFfmpeg(command) await cleanupTests(servers) }) }) diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 8c4e01226..5d946f5e8 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts @@ -56,7 +56,7 @@ describe('Test jobs', function () { let job = body.data[0] // Skip repeat jobs - if (job.type === 'videos-views') job = body.data[1] + if (job.type === 'videos-views-stats') job = body.data[1] expect(job.state).to.equal('completed') expect(job.type.startsWith('activitypub-')).to.be.true diff --git a/shared/extra-utils/server/jobs.ts b/shared/extra-utils/server/jobs.ts index 79b8c3183..afaaa5cd6 100644 --- a/shared/extra-utils/server/jobs.ts +++ b/shared/extra-utils/server/jobs.ts @@ -1,5 +1,5 @@ -import { JobState } from '../../models' +import { JobState, JobType } from '../../models' import { wait } from '../miscs' import { PeerTubeServer } from './server' @@ -16,7 +16,7 @@ async function waitJobs (serversArg: PeerTubeServer[] | PeerTubeServer, skipDela const states: JobState[] = [ 'waiting', 'active' ] if (!skipDelayed) states.push('delayed') - const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ] + const repeatableJobs: JobType[] = [ 'videos-views-stats', 'activitypub-cleaner' ] let pendingRequests: boolean function tasksBuilder () { diff --git a/shared/models/activitypub/activity.ts b/shared/models/activitypub/activity.ts index 548d8858e..d6284e283 100644 --- a/shared/models/activitypub/activity.ts +++ b/shared/models/activitypub/activity.ts @@ -6,7 +6,6 @@ import { DislikeObject } from './objects/dislike-object' import { APObject } from './objects/object.model' import { PlaylistObject } from './objects/playlist-object' import { VideoCommentObject } from './objects/video-comment-object' -import { ViewObject } from './objects/view-object' export type Activity = ActivityCreate | @@ -53,7 +52,7 @@ export interface BaseActivity { export interface ActivityCreate extends BaseActivity { type: 'Create' - object: VideoObject | AbuseObject | ViewObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject + object: VideoObject | AbuseObject | DislikeObject | VideoCommentObject | CacheFileObject | PlaylistObject } export interface ActivityUpdate extends BaseActivity { @@ -100,6 +99,7 @@ export interface ActivityView extends BaseActivity { type: 'View' actor: string object: APObject + expires: string } export interface ActivityDislike extends BaseActivity { diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index 12e0fcf85..6da2753b3 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -14,7 +14,7 @@ export type JobType = | 'video-transcoding' | 'email' | 'video-import' - | 'videos-views' + | 'videos-views-stats' | 'activitypub-refresher' | 'video-redundancy' | 'video-live-ending' diff --git a/shared/models/videos/live/live-video-event-payload.model.ts b/shared/models/videos/live/live-video-event-payload.model.ts index 6cd7540e8..1a9ac512c 100644 --- a/shared/models/videos/live/live-video-event-payload.model.ts +++ b/shared/models/videos/live/live-video-event-payload.model.ts @@ -2,5 +2,9 @@ import { VideoState } from '../video-state.enum' export interface LiveVideoEventPayload { state?: VideoState + + // FIXME: deprecated in 4.0 in favour of viewers views?: number + + viewers?: number } diff --git a/shared/models/videos/video.model.ts b/shared/models/videos/video.model.ts index 26cb595e7..8d223cded 100644 --- a/shared/models/videos/video.model.ts +++ b/shared/models/videos/video.model.ts @@ -39,6 +39,9 @@ export interface Video { url: string views: number + // If live + viewers?: number + likes: number dislikes: number nsfw: boolean diff --git a/support/doc/api/openapi.yaml b/support/doc/api/openapi.yaml index 0f72b08d2..13757152c 100644 --- a/support/doc/api/openapi.yaml +++ b/support/doc/api/openapi.yaml @@ -4892,7 +4892,7 @@ components: - video-transcoding - video-file-import - video-import - - videos-views + - videos-views-stats - activitypub-refresher - video-redundancy - video-live-ending @@ -5397,6 +5397,9 @@ components: - $ref: '#/components/schemas/Video' - type: object properties: + viewers: + type: integer + description: If the video is a live, you have the amount of current viewers descriptionPath: type: string example: /api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/description @@ -6300,7 +6303,7 @@ components: - video-transcoding - email - video-import - - videos-views + - videos-views-stats - activitypub-refresher - video-redundancy data: