import { isTestOrDevInstance, isUsingViewersFederationV2 } from '@peertube/peertube-node-utils' import { exists } from '@server/helpers/custom-validators/misc.js' import { logger, loggerTagsFactory } from '@server/helpers/logger.js' import { VIEW_LIFETIME } from '@server/initializers/constants.js' import { sendView } from '@server/lib/activitypub/send/send-view.js' import { PeerTubeSocket } from '@server/lib/peertube-socket.js' import { getServerActor } from '@server/models/application/application.js' import { VideoModel } from '@server/models/video/video.js' import { MVideo, MVideoImmutable } from '@server/types/models/index.js' const lTags = loggerTagsFactory('views') export type ViewerScope = 'local' | 'remote' export type VideoScope = 'local' | 'remote' type Viewer = { expires: number id: string viewerScope: ViewerScope videoScope: VideoScope viewerCount: number lastFederation?: number } export class VideoViewerCounters { // expires is new Date().getTime() private readonly viewersPerVideo = new Map() private readonly idToViewer = new Map() private processingViewerCounters = false constructor () { setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER) } // --------------------------------------------------------------------------- async addLocalViewer (options: { video: MVideoImmutable sessionId: string }) { const { video, sessionId } = options logger.debug('Adding local viewer to video viewers counter %s.', video.uuid, { ...lTags(video.uuid) }) const viewerId = sessionId + '-' + video.uuid const viewer = this.idToViewer.get(viewerId) if (viewer) { viewer.expires = this.buildViewerExpireTime() await this.federateViewerIfNeeded(video, viewer) return false } const newViewer = this.addViewerToVideo({ viewerId, video, viewerScope: 'local', viewerCount: 1 }) await this.federateViewerIfNeeded(video, newViewer) return true } addRemoteViewerOnLocalVideo (options: { video: MVideo viewerId: string viewerExpires: Date }) { const { video, viewerExpires, viewerId } = options logger.debug('Adding remote viewer to local video %s.', video.uuid, { viewerId, viewerExpires, ...lTags(video.uuid) }) this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote', viewerCount: 1 }) return true } addRemoteViewerOnRemoteVideo (options: { video: MVideo viewerId: string viewerExpires: Date viewerResultCounter?: number }) { const { video, viewerExpires, viewerId, viewerResultCounter } = options logger.debug( 'Adding remote viewer to remote video %s.', video.uuid, { viewerId, viewerResultCounter, viewerExpires, ...lTags(video.uuid) } ) this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote', // The origin server sends a summary of all viewers, so we can replace our local copy replaceCurrentViewers: exists(viewerResultCounter), viewerCount: viewerResultCounter ?? 1 }) return true } // --------------------------------------------------------------------------- getTotalViewers (options: { viewerScope: ViewerScope videoScope: VideoScope }) { let total = 0 for (const viewers of this.viewersPerVideo.values()) { total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope) .reduce((p, c) => p + c.viewerCount, 0) } return total } getTotalViewersOf (video: MVideoImmutable) { const viewers = this.viewersPerVideo.get(video.id) return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0 } buildViewerExpireTime () { return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER } // --------------------------------------------------------------------------- private addViewerToVideo (options: { video: MVideoImmutable viewerId: string viewerScope: ViewerScope viewerCount: number replaceCurrentViewers?: boolean viewerExpires?: Date }) { const { video, viewerExpires, viewerId, viewerScope, viewerCount, replaceCurrentViewers } = options let watchers = this.viewersPerVideo.get(video.id) if (!watchers || replaceCurrentViewers) { watchers = [] this.viewersPerVideo.set(video.id, watchers) } const expires = viewerExpires ? viewerExpires.getTime() : this.buildViewerExpireTime() const videoScope: VideoScope = video.remote ? 'remote' : 'local' const viewer = { id: viewerId, expires, videoScope, viewerScope, viewerCount } watchers.push(viewer) this.idToViewer.set(viewerId, viewer) this.notifyClients(video) return viewer } private async cleanViewerCounters () { if (this.processingViewerCounters) return this.processingViewerCounters = true if (!isTestOrDevInstance()) logger.info('Cleaning video viewers.', lTags()) try { for (const videoId of this.viewersPerVideo.keys()) { const notBefore = new Date().getTime() const viewers = this.viewersPerVideo.get(videoId) // Only keep not expired viewers const newViewers: Viewer[] = [] // Filter new viewers for (const viewer of viewers) { if (viewer.expires > notBefore) { newViewers.push(viewer) } else { this.idToViewer.delete(viewer.id) } } if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) else this.viewersPerVideo.set(videoId, newViewers) const video = await VideoModel.loadImmutableAttributes(videoId) if (video) { this.notifyClients(video) // Let total viewers expire on remote instances if there are no more viewers if (video.remote === false && newViewers.length !== 0) { await this.federateTotalViewers(video) } } } } catch (err) { logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) } this.processingViewerCounters = false } private notifyClients (video: MVideoImmutable) { const totalViewers = this.getTotalViewersOf(video) PeerTubeSocket.Instance.sendVideoViewsUpdate(video, totalViewers) logger.debug('Video viewers update for %s is %d.', video.url, totalViewers, lTags()) } private async federateViewerIfNeeded (video: MVideoImmutable, viewer: Viewer) { // Federate the viewer if it's been a "long" time we did not const now = new Date().getTime() const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75) if (viewer.lastFederation && viewer.lastFederation > federationLimit) return if (video.remote === false && isUsingViewersFederationV2()) return await sendView({ byActor: await getServerActor(), video, viewersCount: 1, viewerIdentifier: viewer.id }) viewer.lastFederation = now } private async federateTotalViewers (video: MVideoImmutable) { if (!isUsingViewersFederationV2()) return await sendView({ byActor: await getServerActor(), video, viewersCount: this.getTotalViewersOf(video), viewerIdentifier: video.uuid }) } }