mirror of https://github.com/Chocobozzz/PeerTube
Add AP stats
parent
48586fe070
commit
99afa081bc
|
@ -1,13 +1,11 @@
|
|||
import * as express from 'express'
|
||||
import { InboxManager } from '@server/lib/activitypub/inbox-manager'
|
||||
import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, RootActivity } from '../../../shared'
|
||||
import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
|
||||
import { isActivityValid } from '../../helpers/custom-validators/activitypub/activity'
|
||||
import { logger } from '../../helpers/logger'
|
||||
import { processActivities } from '../../lib/activitypub/process/process'
|
||||
import { asyncMiddleware, checkSignature, localAccountValidator, localVideoChannelValidator, signatureValidator } from '../../middlewares'
|
||||
import { activityPubValidator } from '../../middlewares/validators/activitypub/activity'
|
||||
import { queue } from 'async'
|
||||
import { MActorDefault, MActorSignature } from '../../types/models'
|
||||
import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
|
||||
|
||||
const inboxRouter = express.Router()
|
||||
|
||||
|
@ -41,18 +39,6 @@ export {
|
|||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type QueueParam = { activities: Activity[], signatureActor?: MActorSignature, inboxActor?: MActorDefault }
|
||||
const inboxQueue = queue<QueueParam, Error>((task, cb) => {
|
||||
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
|
||||
|
||||
processActivities(task.activities, options)
|
||||
.then(() => cb())
|
||||
.catch(err => {
|
||||
logger.error('Error in process activities.', { err })
|
||||
cb()
|
||||
})
|
||||
})
|
||||
|
||||
function inboxController (req: express.Request, res: express.Response) {
|
||||
const rootActivity: RootActivity = req.body
|
||||
let activities: Activity[]
|
||||
|
@ -74,10 +60,12 @@ function inboxController (req: express.Request, res: express.Response) {
|
|||
|
||||
logger.info('Receiving inbox requests for %d activities by %s.', activities.length, res.locals.signature.actor.url)
|
||||
|
||||
inboxQueue.push({
|
||||
InboxManager.Instance.addInboxMessage({
|
||||
activities,
|
||||
signatureActor: res.locals.signature.actor,
|
||||
inboxActor: accountOrChannel ? accountOrChannel.Actor : undefined
|
||||
inboxActor: accountOrChannel
|
||||
? accountOrChannel.Actor
|
||||
: undefined
|
||||
})
|
||||
|
||||
return res.status(HttpStatusCode.NO_CONTENT_204).end()
|
||||
|
|
|
@ -1,16 +1,8 @@
|
|||
import * as express from 'express'
|
||||
import { ServerStats } from '../../../../shared/models/server/server-stats.model'
|
||||
import { asyncMiddleware } from '../../../middlewares'
|
||||
import { UserModel } from '../../../models/account/user'
|
||||
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
|
||||
import { VideoModel } from '../../../models/video/video'
|
||||
import { VideoCommentModel } from '../../../models/video/video-comment'
|
||||
import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
|
||||
import { StatsManager } from '@server/lib/stat-manager'
|
||||
import { ROUTE_CACHE_LIFETIME } from '../../../initializers/constants'
|
||||
import { asyncMiddleware } from '../../../middlewares'
|
||||
import { cacheRoute } from '../../../middlewares/cache'
|
||||
import { VideoFileModel } from '../../../models/video/video-file'
|
||||
import { CONFIG } from '../../../initializers/config'
|
||||
import { VideoRedundancyStrategyWithManual } from '@shared/models'
|
||||
|
||||
const statsRouter = express.Router()
|
||||
|
||||
|
@ -19,48 +11,10 @@ statsRouter.get('/stats',
|
|||
asyncMiddleware(getStats)
|
||||
)
|
||||
|
||||
async function getStats (req: express.Request, res: express.Response) {
|
||||
const { totalLocalVideos, totalLocalVideoViews, totalVideos } = await VideoModel.getStats()
|
||||
const { totalLocalVideoComments, totalVideoComments } = await VideoCommentModel.getStats()
|
||||
const { totalUsers, totalDailyActiveUsers, totalWeeklyActiveUsers, totalMonthlyActiveUsers } = await UserModel.getStats()
|
||||
const { totalInstanceFollowers, totalInstanceFollowing } = await ActorFollowModel.getStats()
|
||||
const { totalLocalVideoFilesSize } = await VideoFileModel.getStats()
|
||||
async function getStats (_req: express.Request, res: express.Response) {
|
||||
const data = await StatsManager.Instance.getStats()
|
||||
|
||||
const strategies = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES
|
||||
.map(r => ({
|
||||
strategy: r.strategy as VideoRedundancyStrategyWithManual,
|
||||
size: r.size
|
||||
}))
|
||||
|
||||
strategies.push({ strategy: 'manual', size: null })
|
||||
|
||||
const videosRedundancyStats = await Promise.all(
|
||||
strategies.map(r => {
|
||||
return VideoRedundancyModel.getStats(r.strategy)
|
||||
.then(stats => Object.assign(stats, { strategy: r.strategy, totalSize: r.size }))
|
||||
})
|
||||
)
|
||||
|
||||
const data: ServerStats = {
|
||||
totalLocalVideos,
|
||||
totalLocalVideoViews,
|
||||
totalLocalVideoFilesSize,
|
||||
totalLocalVideoComments,
|
||||
totalVideos,
|
||||
totalVideoComments,
|
||||
|
||||
totalUsers,
|
||||
totalDailyActiveUsers,
|
||||
totalWeeklyActiveUsers,
|
||||
totalMonthlyActiveUsers,
|
||||
|
||||
totalInstanceFollowers,
|
||||
totalInstanceFollowing,
|
||||
|
||||
videosRedundancy: videosRedundancyStats
|
||||
}
|
||||
|
||||
return res.json(data).end()
|
||||
return res.json(data)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
|
@ -194,7 +194,8 @@ const SCHEDULER_INTERVALS_MS = {
|
|||
checkPlugins: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
|
||||
autoFollowIndexInstances: 60000 * 60 * 24, // 1 day
|
||||
removeOldViews: 60000 * 60 * 24, // 1 day
|
||||
removeOldHistory: 60000 * 60 * 24 // 1 day
|
||||
removeOldHistory: 60000 * 60 * 24, // 1 day
|
||||
updateInboxStats: 1000 * 60 * 5 // 5 minutes
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
@ -747,6 +748,7 @@ if (isTestInstance() === true) {
|
|||
SCHEDULER_INTERVALS_MS.removeOldViews = 5000
|
||||
SCHEDULER_INTERVALS_MS.updateVideos = 5000
|
||||
SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000
|
||||
SCHEDULER_INTERVALS_MS.updateInboxStats = 5000
|
||||
REPEAT_JOBS['videos-views'] = { every: 5000 }
|
||||
|
||||
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
import { AsyncQueue, queue } from 'async'
|
||||
import { logger } from '@server/helpers/logger'
|
||||
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
|
||||
import { MActorDefault, MActorSignature } from '@server/types/models'
|
||||
import { Activity } from '@shared/models'
|
||||
import { processActivities } from './process'
|
||||
import { StatsManager } from '../stat-manager'
|
||||
|
||||
type QueueParam = {
|
||||
activities: Activity[]
|
||||
signatureActor?: MActorSignature
|
||||
inboxActor?: MActorDefault
|
||||
}
|
||||
|
||||
class InboxManager {
|
||||
|
||||
private static instance: InboxManager
|
||||
|
||||
private readonly inboxQueue: AsyncQueue<QueueParam>
|
||||
|
||||
private messagesProcessed = 0
|
||||
|
||||
private constructor () {
|
||||
this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
|
||||
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
|
||||
|
||||
this.messagesProcessed++
|
||||
|
||||
processActivities(task.activities, options)
|
||||
.then(() => cb())
|
||||
.catch(err => {
|
||||
logger.error('Error in process activities.', { err })
|
||||
cb()
|
||||
})
|
||||
})
|
||||
|
||||
setInterval(() => {
|
||||
StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length())
|
||||
}, SCHEDULER_INTERVALS_MS.updateInboxStats)
|
||||
}
|
||||
|
||||
addInboxMessage (options: QueueParam) {
|
||||
this.inboxQueue.push(options)
|
||||
}
|
||||
|
||||
static get Instance () {
|
||||
return this.instance || (this.instance = new this())
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
InboxManager
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
import { CONFIG } from '@server/initializers/config'
|
||||
import { UserModel } from '@server/models/account/user'
|
||||
import { ActorFollowModel } from '@server/models/activitypub/actor-follow'
|
||||
import { VideoRedundancyModel } from '@server/models/redundancy/video-redundancy'
|
||||
import { VideoModel } from '@server/models/video/video'
|
||||
import { VideoCommentModel } from '@server/models/video/video-comment'
|
||||
import { VideoFileModel } from '@server/models/video/video-file'
|
||||
import { ServerStats, VideoRedundancyStrategyWithManual } from '@shared/models'
|
||||
|
||||
class StatsManager {
|
||||
|
||||
private static instance: StatsManager
|
||||
|
||||
private readonly instanceStartDate = new Date()
|
||||
|
||||
private inboxMessagesProcessed = 0
|
||||
private inboxMessagesWaiting = 0
|
||||
|
||||
private constructor () {}
|
||||
|
||||
updateInboxStats (inboxMessagesProcessed: number, inboxMessagesWaiting: number) {
|
||||
this.inboxMessagesProcessed = inboxMessagesProcessed
|
||||
this.inboxMessagesWaiting = inboxMessagesWaiting
|
||||
}
|
||||
|
||||
async getStats () {
|
||||
const { totalLocalVideos, totalLocalVideoViews, totalVideos } = await VideoModel.getStats()
|
||||
const { totalLocalVideoComments, totalVideoComments } = await VideoCommentModel.getStats()
|
||||
const { totalUsers, totalDailyActiveUsers, totalWeeklyActiveUsers, totalMonthlyActiveUsers } = await UserModel.getStats()
|
||||
const { totalInstanceFollowers, totalInstanceFollowing } = await ActorFollowModel.getStats()
|
||||
const { totalLocalVideoFilesSize } = await VideoFileModel.getStats()
|
||||
|
||||
const videosRedundancyStats = await this.buildRedundancyStats()
|
||||
|
||||
const data: ServerStats = {
|
||||
totalLocalVideos,
|
||||
totalLocalVideoViews,
|
||||
totalLocalVideoFilesSize,
|
||||
totalLocalVideoComments,
|
||||
totalVideos,
|
||||
totalVideoComments,
|
||||
|
||||
totalUsers,
|
||||
totalDailyActiveUsers,
|
||||
totalWeeklyActiveUsers,
|
||||
totalMonthlyActiveUsers,
|
||||
|
||||
totalInstanceFollowers,
|
||||
totalInstanceFollowing,
|
||||
|
||||
videosRedundancy: videosRedundancyStats,
|
||||
|
||||
totalActivityPubMessagesProcessed: this.inboxMessagesProcessed,
|
||||
activityPubMessagesProcessedPerSecond: this.buildActivityPubMessagesProcessedPerSecond(),
|
||||
totalActivityPubMessagesWaiting: this.inboxMessagesWaiting
|
||||
}
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
private buildActivityPubMessagesProcessedPerSecond () {
|
||||
const now = new Date()
|
||||
const startedSeconds = (now.getTime() - this.instanceStartDate.getTime()) / 1000
|
||||
|
||||
return this.inboxMessagesProcessed / startedSeconds
|
||||
}
|
||||
|
||||
private buildRedundancyStats () {
|
||||
const strategies = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES
|
||||
.map(r => ({
|
||||
strategy: r.strategy as VideoRedundancyStrategyWithManual,
|
||||
size: r.size
|
||||
}))
|
||||
|
||||
strategies.push({ strategy: 'manual', size: null })
|
||||
|
||||
return Promise.all(
|
||||
strategies.map(r => {
|
||||
return VideoRedundancyModel.getStats(r.strategy)
|
||||
.then(stats => Object.assign(stats, { strategy: r.strategy, totalSize: r.size }))
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
static get Instance () {
|
||||
return this.instance || (this.instance = new this())
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export {
|
||||
StatsManager
|
||||
}
|
|
@ -176,6 +176,32 @@ describe('Test stats (excluding redundancy)', function () {
|
|||
}
|
||||
})
|
||||
|
||||
it('Should have the correct AP stats', async function () {
|
||||
this.timeout(60000)
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await uploadVideo(servers[0].url, servers[0].accessToken, { name: 'video' })
|
||||
}
|
||||
|
||||
const res1 = await getStats(servers[1].url)
|
||||
const first = res1.body as ServerStats
|
||||
|
||||
await waitJobs(servers)
|
||||
|
||||
const res2 = await getStats(servers[1].url)
|
||||
const second: ServerStats = res2.body
|
||||
|
||||
expect(second.totalActivityPubMessagesWaiting).to.equal(0)
|
||||
expect(second.totalActivityPubMessagesProcessed).to.be.greaterThan(first.totalActivityPubMessagesProcessed)
|
||||
|
||||
await wait(5000)
|
||||
|
||||
const res3 = await getStats(servers[1].url)
|
||||
const third: ServerStats = res3.body
|
||||
|
||||
expect(third.activityPubMessagesProcessedPerSecond).to.be.lessThan(second.activityPubMessagesProcessedPerSecond)
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
await cleanupTests(servers)
|
||||
})
|
||||
|
|
|
@ -18,6 +18,10 @@ export interface ServerStats {
|
|||
totalInstanceFollowing: number
|
||||
|
||||
videosRedundancy: VideosRedundancyStats[]
|
||||
|
||||
totalActivityPubMessagesProcessed: number
|
||||
activityPubMessagesProcessedPerSecond: number
|
||||
totalActivityPubMessagesWaiting: number
|
||||
}
|
||||
|
||||
export interface VideosRedundancyStats {
|
||||
|
|
Loading…
Reference in New Issue