diff --git a/server/controllers/activitypub/inbox.ts b/server/controllers/activitypub/inbox.ts index 67b2c0d66..14f301ab7 100644 --- a/server/controllers/activitypub/inbox.ts +++ b/server/controllers/activitypub/inbox.ts @@ -1,13 +1,11 @@ import * as express from 'express' +import { InboxManager } from '@server/lib/activitypub/inbox-manager' import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, RootActivity } from '../../../shared' +import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes' import { isActivityValid } from '../../helpers/custom-validators/activitypub/activity' import { logger } from '../../helpers/logger' -import { processActivities } from '../../lib/activitypub/process/process' import { asyncMiddleware, checkSignature, localAccountValidator, localVideoChannelValidator, signatureValidator } from '../../middlewares' import { activityPubValidator } from '../../middlewares/validators/activitypub/activity' -import { queue } from 'async' -import { MActorDefault, MActorSignature } from '../../types/models' -import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes' const inboxRouter = express.Router() @@ -41,18 +39,6 @@ export { // --------------------------------------------------------------------------- -type QueueParam = { activities: Activity[], signatureActor?: MActorSignature, inboxActor?: MActorDefault } -const inboxQueue = queue((task, cb) => { - const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } - - processActivities(task.activities, options) - .then(() => cb()) - .catch(err => { - logger.error('Error in process activities.', { err }) - cb() - }) -}) - function inboxController (req: express.Request, res: express.Response) { const rootActivity: RootActivity = req.body let activities: Activity[] @@ -74,10 +60,12 @@ function inboxController (req: express.Request, res: express.Response) { logger.info('Receiving inbox requests for %d activities by %s.', activities.length, res.locals.signature.actor.url) - inboxQueue.push({ + InboxManager.Instance.addInboxMessage({ activities, signatureActor: res.locals.signature.actor, - inboxActor: accountOrChannel ? accountOrChannel.Actor : undefined + inboxActor: accountOrChannel + ? accountOrChannel.Actor + : undefined }) return res.status(HttpStatusCode.NO_CONTENT_204).end() diff --git a/server/controllers/api/server/stats.ts b/server/controllers/api/server/stats.ts index f07301a04..3aea12450 100644 --- a/server/controllers/api/server/stats.ts +++ b/server/controllers/api/server/stats.ts @@ -1,16 +1,8 @@ import * as express from 'express' -import { ServerStats } from '../../../../shared/models/server/server-stats.model' -import { asyncMiddleware } from '../../../middlewares' -import { UserModel } from '../../../models/account/user' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' -import { VideoModel } from '../../../models/video/video' -import { VideoCommentModel } from '../../../models/video/video-comment' -import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' +import { StatsManager } from '@server/lib/stat-manager' import { ROUTE_CACHE_LIFETIME } from '../../../initializers/constants' +import { asyncMiddleware } from '../../../middlewares' import { cacheRoute } from '../../../middlewares/cache' -import { VideoFileModel } from '../../../models/video/video-file' -import { CONFIG } from '../../../initializers/config' -import { VideoRedundancyStrategyWithManual } from '@shared/models' const statsRouter = express.Router() @@ -19,48 +11,10 @@ statsRouter.get('/stats', asyncMiddleware(getStats) ) -async function getStats (req: express.Request, res: express.Response) { - const { totalLocalVideos, totalLocalVideoViews, totalVideos } = await VideoModel.getStats() - const { totalLocalVideoComments, totalVideoComments } = await VideoCommentModel.getStats() - const { totalUsers, totalDailyActiveUsers, totalWeeklyActiveUsers, totalMonthlyActiveUsers } = await UserModel.getStats() - const { totalInstanceFollowers, totalInstanceFollowing } = await ActorFollowModel.getStats() - const { totalLocalVideoFilesSize } = await VideoFileModel.getStats() +async function getStats (_req: express.Request, res: express.Response) { + const data = await StatsManager.Instance.getStats() - const strategies = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES - .map(r => ({ - strategy: r.strategy as VideoRedundancyStrategyWithManual, - size: r.size - })) - - strategies.push({ strategy: 'manual', size: null }) - - const videosRedundancyStats = await Promise.all( - strategies.map(r => { - return VideoRedundancyModel.getStats(r.strategy) - .then(stats => Object.assign(stats, { strategy: r.strategy, totalSize: r.size })) - }) - ) - - const data: ServerStats = { - totalLocalVideos, - totalLocalVideoViews, - totalLocalVideoFilesSize, - totalLocalVideoComments, - totalVideos, - totalVideoComments, - - totalUsers, - totalDailyActiveUsers, - totalWeeklyActiveUsers, - totalMonthlyActiveUsers, - - totalInstanceFollowers, - totalInstanceFollowing, - - videosRedundancy: videosRedundancyStats - } - - return res.json(data).end() + return res.json(data) } // --------------------------------------------------------------------------- diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 9e642af95..01860b575 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -194,7 +194,8 @@ const SCHEDULER_INTERVALS_MS = { checkPlugins: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, autoFollowIndexInstances: 60000 * 60 * 24, // 1 day removeOldViews: 60000 * 60 * 24, // 1 day - removeOldHistory: 60000 * 60 * 24 // 1 day + removeOldHistory: 60000 * 60 * 24, // 1 day + updateInboxStats: 1000 * 60 * 5 // 5 minutes } // --------------------------------------------------------------------------- @@ -747,6 +748,7 @@ if (isTestInstance() === true) { SCHEDULER_INTERVALS_MS.removeOldViews = 5000 SCHEDULER_INTERVALS_MS.updateVideos = 5000 SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000 + SCHEDULER_INTERVALS_MS.updateInboxStats = 5000 REPEAT_JOBS['videos-views'] = { every: 5000 } REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts new file mode 100644 index 000000000..19e112f91 --- /dev/null +++ b/server/lib/activitypub/inbox-manager.ts @@ -0,0 +1,55 @@ +import { AsyncQueue, queue } from 'async' +import { logger } from '@server/helpers/logger' +import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' +import { MActorDefault, MActorSignature } from '@server/types/models' +import { Activity } from '@shared/models' +import { processActivities } from './process' +import { StatsManager } from '../stat-manager' + +type QueueParam = { + activities: Activity[] + signatureActor?: MActorSignature + inboxActor?: MActorDefault +} + +class InboxManager { + + private static instance: InboxManager + + private readonly inboxQueue: AsyncQueue + + private messagesProcessed = 0 + + private constructor () { + this.inboxQueue = queue((task, cb) => { + const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } + + this.messagesProcessed++ + + processActivities(task.activities, options) + .then(() => cb()) + .catch(err => { + logger.error('Error in process activities.', { err }) + cb() + }) + }) + + setInterval(() => { + StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length()) + }, SCHEDULER_INTERVALS_MS.updateInboxStats) + } + + addInboxMessage (options: QueueParam) { + this.inboxQueue.push(options) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +// --------------------------------------------------------------------------- + +export { + InboxManager +} diff --git a/server/lib/stat-manager.ts b/server/lib/stat-manager.ts new file mode 100644 index 000000000..f9d69b0dc --- /dev/null +++ b/server/lib/stat-manager.ts @@ -0,0 +1,94 @@ +import { CONFIG } from '@server/initializers/config' +import { UserModel } from '@server/models/account/user' +import { ActorFollowModel } from '@server/models/activitypub/actor-follow' +import { VideoRedundancyModel } from '@server/models/redundancy/video-redundancy' +import { VideoModel } from '@server/models/video/video' +import { VideoCommentModel } from '@server/models/video/video-comment' +import { VideoFileModel } from '@server/models/video/video-file' +import { ServerStats, VideoRedundancyStrategyWithManual } from '@shared/models' + +class StatsManager { + + private static instance: StatsManager + + private readonly instanceStartDate = new Date() + + private inboxMessagesProcessed = 0 + private inboxMessagesWaiting = 0 + + private constructor () {} + + updateInboxStats (inboxMessagesProcessed: number, inboxMessagesWaiting: number) { + this.inboxMessagesProcessed = inboxMessagesProcessed + this.inboxMessagesWaiting = inboxMessagesWaiting + } + + async getStats () { + const { totalLocalVideos, totalLocalVideoViews, totalVideos } = await VideoModel.getStats() + const { totalLocalVideoComments, totalVideoComments } = await VideoCommentModel.getStats() + const { totalUsers, totalDailyActiveUsers, totalWeeklyActiveUsers, totalMonthlyActiveUsers } = await UserModel.getStats() + const { totalInstanceFollowers, totalInstanceFollowing } = await ActorFollowModel.getStats() + const { totalLocalVideoFilesSize } = await VideoFileModel.getStats() + + const videosRedundancyStats = await this.buildRedundancyStats() + + const data: ServerStats = { + totalLocalVideos, + totalLocalVideoViews, + totalLocalVideoFilesSize, + totalLocalVideoComments, + totalVideos, + totalVideoComments, + + totalUsers, + totalDailyActiveUsers, + totalWeeklyActiveUsers, + totalMonthlyActiveUsers, + + totalInstanceFollowers, + totalInstanceFollowing, + + videosRedundancy: videosRedundancyStats, + + totalActivityPubMessagesProcessed: this.inboxMessagesProcessed, + activityPubMessagesProcessedPerSecond: this.buildActivityPubMessagesProcessedPerSecond(), + totalActivityPubMessagesWaiting: this.inboxMessagesWaiting + } + + return data + } + + private buildActivityPubMessagesProcessedPerSecond () { + const now = new Date() + const startedSeconds = (now.getTime() - this.instanceStartDate.getTime()) / 1000 + + return this.inboxMessagesProcessed / startedSeconds + } + + private buildRedundancyStats () { + const strategies = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES + .map(r => ({ + strategy: r.strategy as VideoRedundancyStrategyWithManual, + size: r.size + })) + + strategies.push({ strategy: 'manual', size: null }) + + return Promise.all( + strategies.map(r => { + return VideoRedundancyModel.getStats(r.strategy) + .then(stats => Object.assign(stats, { strategy: r.strategy, totalSize: r.size })) + }) + ) + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +// --------------------------------------------------------------------------- + +export { + StatsManager +} diff --git a/server/tests/api/server/stats.ts b/server/tests/api/server/stats.ts index 0e77712cf..9f785a80e 100644 --- a/server/tests/api/server/stats.ts +++ b/server/tests/api/server/stats.ts @@ -176,6 +176,32 @@ describe('Test stats (excluding redundancy)', function () { } }) + it('Should have the correct AP stats', async function () { + this.timeout(60000) + + for (let i = 0; i < 10; i++) { + await uploadVideo(servers[0].url, servers[0].accessToken, { name: 'video' }) + } + + const res1 = await getStats(servers[1].url) + const first = res1.body as ServerStats + + await waitJobs(servers) + + const res2 = await getStats(servers[1].url) + const second: ServerStats = res2.body + + expect(second.totalActivityPubMessagesWaiting).to.equal(0) + expect(second.totalActivityPubMessagesProcessed).to.be.greaterThan(first.totalActivityPubMessagesProcessed) + + await wait(5000) + + const res3 = await getStats(servers[1].url) + const third: ServerStats = res3.body + + expect(third.activityPubMessagesProcessedPerSecond).to.be.lessThan(second.activityPubMessagesProcessedPerSecond) + }) + after(async function () { await cleanupTests(servers) }) diff --git a/shared/models/server/server-stats.model.ts b/shared/models/server/server-stats.model.ts index 75d7dc554..d17c43945 100644 --- a/shared/models/server/server-stats.model.ts +++ b/shared/models/server/server-stats.model.ts @@ -18,6 +18,10 @@ export interface ServerStats { totalInstanceFollowing: number videosRedundancy: VideosRedundancyStats[] + + totalActivityPubMessagesProcessed: number + activityPubMessagesProcessedPerSecond: number + totalActivityPubMessagesWaiting: number } export interface VideosRedundancyStats {