import { Server as HTTPServer } from 'http' import { Namespace, Server as SocketServer, Socket } from 'socket.io' import { isIdValid } from '@server/helpers/custom-validators/misc.js' import { Debounce } from '@server/helpers/debounce.js' import { MVideo, MVideoImmutable } from '@server/types/models/index.js' import { MRunner } from '@server/types/models/runners/index.js' import { UserNotificationModelForApi } from '@server/types/models/user/index.js' import { LiveVideoEventPayload, LiveVideoEventType } from '@peertube/peertube-models' import { logger } from '../helpers/logger.js' import { authenticateRunnerSocket, authenticateSocket } from '../middlewares/index.js' class PeerTubeSocket { private static instance: PeerTubeSocket private userNotificationSockets: { [ userId: number ]: Socket[] } = {} private liveVideosNamespace: Namespace private readonly runnerSockets = new Set() private constructor () {} init (server: HTTPServer) { const io = new SocketServer(server) io.of('/user-notifications') .use(authenticateSocket) .on('connection', socket => { const userId = socket.handshake.auth.user.id logger.debug('User %d connected to the notification system.', userId) if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = [] this.userNotificationSockets[userId].push(socket) socket.on('disconnect', () => { logger.debug('User %d disconnected from SocketIO notifications.', userId) this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) }) }) this.liveVideosNamespace = io.of('/live-videos') .on('connection', socket => { socket.on('subscribe', ({ videoId }) => { if (!isIdValid(videoId)) return /* eslint-disable @typescript-eslint/no-floating-promises */ socket.join(videoId) }) socket.on('unsubscribe', ({ videoId }) => { if (!isIdValid(videoId)) return /* eslint-disable @typescript-eslint/no-floating-promises */ socket.leave(videoId) }) }) io.of('/runners') .use(authenticateRunnerSocket) .on('connection', socket => { const runner: MRunner = socket.handshake.auth.runner logger.debug(`New runner "${runner.name}" connected to the notification system.`) this.runnerSockets.add(socket) socket.on('disconnect', () => { logger.debug(`Runner "${runner.name}" disconnected from the notification system.`) this.runnerSockets.delete(socket) }) }) } sendNotification (userId: number, notification: UserNotificationModelForApi) { const sockets = this.userNotificationSockets[userId] if (!sockets) return logger.debug('Sending user notification to user %d.', userId) const notificationMessage = notification.toFormattedJSON() for (const socket of sockets) { socket.emit('new-notification', notificationMessage) } } sendVideoLiveNewState (video: MVideo) { const data: LiveVideoEventPayload = { state: video.state } const type: LiveVideoEventType = 'state-change' logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state }) this.liveVideosNamespace .in(video.id) .emit(type, data) } sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) { const data: LiveVideoEventPayload = { viewers: numViewers } const type: LiveVideoEventType = 'views-change' logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers }) this.liveVideosNamespace .in(video.id) .emit(type, data) } @Debounce({ timeoutMS: 1000 }) sendAvailableJobsPingToRunners () { logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) for (const runners of this.runnerSockets) { runners.emit('available-jobs') } } static get Instance () { return this.instance || (this.instance = new this()) } } // --------------------------------------------------------------------------- export { PeerTubeSocket }