mirror of https://github.com/Chocobozzz/PeerTube
Add more AP stats to stats endpoint
It will help to understand if the federation correctly works or notpull/3800/head
parent
cb2e36618c
commit
543442a3be
|
@ -1,10 +1,10 @@
|
||||||
import { AsyncQueue, queue } from 'async'
|
import { queue, QueueObject } from 'async'
|
||||||
import { logger } from '@server/helpers/logger'
|
import { logger } from '@server/helpers/logger'
|
||||||
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
|
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
|
||||||
import { MActorDefault, MActorSignature } from '@server/types/models'
|
import { MActorDefault, MActorSignature } from '@server/types/models'
|
||||||
import { Activity } from '@shared/models'
|
import { Activity } from '@shared/models'
|
||||||
import { processActivities } from './process'
|
|
||||||
import { StatsManager } from '../stat-manager'
|
import { StatsManager } from '../stat-manager'
|
||||||
|
import { processActivities } from './process'
|
||||||
|
|
||||||
type QueueParam = {
|
type QueueParam = {
|
||||||
activities: Activity[]
|
activities: Activity[]
|
||||||
|
@ -16,16 +16,12 @@ class InboxManager {
|
||||||
|
|
||||||
private static instance: InboxManager
|
private static instance: InboxManager
|
||||||
|
|
||||||
private readonly inboxQueue: AsyncQueue<QueueParam>
|
private readonly inboxQueue: QueueObject<QueueParam>
|
||||||
|
|
||||||
private messagesProcessed = 0
|
|
||||||
|
|
||||||
private constructor () {
|
private constructor () {
|
||||||
this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
|
this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
|
||||||
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
|
const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
|
||||||
|
|
||||||
this.messagesProcessed++
|
|
||||||
|
|
||||||
processActivities(task.activities, options)
|
processActivities(task.activities, options)
|
||||||
.then(() => cb())
|
.then(() => cb())
|
||||||
.catch(err => {
|
.catch(err => {
|
||||||
|
@ -35,7 +31,7 @@ class InboxManager {
|
||||||
})
|
})
|
||||||
|
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting())
|
StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
|
||||||
}, SCHEDULER_INTERVALS_MS.updateInboxStats)
|
}, SCHEDULER_INTERVALS_MS.updateInboxStats)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import { processFlagActivity } from './process-flag'
|
||||||
import { processViewActivity } from './process-view'
|
import { processViewActivity } from './process-view'
|
||||||
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
|
import { APProcessorOptions } from '../../../types/activitypub-processor.model'
|
||||||
import { MActorDefault, MActorSignature } from '../../../types/models'
|
import { MActorDefault, MActorSignature } from '../../../types/models'
|
||||||
|
import { StatsManager } from '@server/lib/stat-manager'
|
||||||
|
|
||||||
const processActivity: { [ P in ActivityType ]: (options: APProcessorOptions<Activity>) => Promise<any> } = {
|
const processActivity: { [ P in ActivityType ]: (options: APProcessorOptions<Activity>) => Promise<any> } = {
|
||||||
Create: processCreateActivity,
|
Create: processCreateActivity,
|
||||||
|
@ -75,8 +76,12 @@ async function processActivities (
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await activityProcessor({ activity, byActor, inboxActor, fromFetch })
|
await activityProcessor({ activity, byActor, inboxActor, fromFetch })
|
||||||
|
|
||||||
|
StatsManager.Instance.addInboxProcessedSuccess(activity.type)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.warn('Cannot process activity %s.', activity.type, { err })
|
logger.warn('Cannot process activity %s.', activity.type, { err })
|
||||||
|
|
||||||
|
StatsManager.Instance.addInboxProcessedError(activity.type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import { VideoRedundancyModel } from '@server/models/redundancy/video-redundancy
|
||||||
import { VideoModel } from '@server/models/video/video'
|
import { VideoModel } from '@server/models/video/video'
|
||||||
import { VideoCommentModel } from '@server/models/video/video-comment'
|
import { VideoCommentModel } from '@server/models/video/video-comment'
|
||||||
import { VideoFileModel } from '@server/models/video/video-file'
|
import { VideoFileModel } from '@server/models/video/video-file'
|
||||||
import { ServerStats, VideoRedundancyStrategyWithManual } from '@shared/models'
|
import { ActivityType, ServerStats, VideoRedundancyStrategyWithManual } from '@shared/models'
|
||||||
|
|
||||||
class StatsManager {
|
class StatsManager {
|
||||||
|
|
||||||
|
@ -13,14 +13,31 @@ class StatsManager {
|
||||||
|
|
||||||
private readonly instanceStartDate = new Date()
|
private readonly instanceStartDate = new Date()
|
||||||
|
|
||||||
private inboxMessagesProcessed = 0
|
private inboxMessages = {
|
||||||
private inboxMessagesWaiting = 0
|
processed: 0,
|
||||||
|
errors: 0,
|
||||||
|
successes: 0,
|
||||||
|
waiting: 0,
|
||||||
|
errorsPerType: this.buildAPPerType(),
|
||||||
|
successesPerType: this.buildAPPerType()
|
||||||
|
}
|
||||||
|
|
||||||
private constructor () {}
|
private constructor () {}
|
||||||
|
|
||||||
updateInboxStats (inboxMessagesProcessed: number, inboxMessagesWaiting: number) {
|
updateInboxWaiting (inboxMessagesWaiting: number) {
|
||||||
this.inboxMessagesProcessed = inboxMessagesProcessed
|
this.inboxMessages.waiting = inboxMessagesWaiting
|
||||||
this.inboxMessagesWaiting = inboxMessagesWaiting
|
}
|
||||||
|
|
||||||
|
addInboxProcessedSuccess (type: ActivityType) {
|
||||||
|
this.inboxMessages.processed++
|
||||||
|
this.inboxMessages.successes++
|
||||||
|
this.inboxMessages.successesPerType[type]++
|
||||||
|
}
|
||||||
|
|
||||||
|
addInboxProcessedError (type: ActivityType) {
|
||||||
|
this.inboxMessages.processed++
|
||||||
|
this.inboxMessages.errors++
|
||||||
|
this.inboxMessages.errorsPerType[type]++
|
||||||
}
|
}
|
||||||
|
|
||||||
async getStats () {
|
async getStats () {
|
||||||
|
@ -50,9 +67,7 @@ class StatsManager {
|
||||||
|
|
||||||
videosRedundancy: videosRedundancyStats,
|
videosRedundancy: videosRedundancyStats,
|
||||||
|
|
||||||
totalActivityPubMessagesProcessed: this.inboxMessagesProcessed,
|
...this.buildAPStats()
|
||||||
activityPubMessagesProcessedPerSecond: this.buildActivityPubMessagesProcessedPerSecond(),
|
|
||||||
totalActivityPubMessagesWaiting: this.inboxMessagesWaiting
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
@ -62,7 +77,7 @@ class StatsManager {
|
||||||
const now = new Date()
|
const now = new Date()
|
||||||
const startedSeconds = (now.getTime() - this.instanceStartDate.getTime()) / 1000
|
const startedSeconds = (now.getTime() - this.instanceStartDate.getTime()) / 1000
|
||||||
|
|
||||||
return this.inboxMessagesProcessed / startedSeconds
|
return this.inboxMessages.processed / startedSeconds
|
||||||
}
|
}
|
||||||
|
|
||||||
private buildRedundancyStats () {
|
private buildRedundancyStats () {
|
||||||
|
@ -82,6 +97,63 @@ class StatsManager {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private buildAPPerType () {
|
||||||
|
return {
|
||||||
|
Create: 0,
|
||||||
|
Update: 0,
|
||||||
|
Delete: 0,
|
||||||
|
Follow: 0,
|
||||||
|
Accept: 0,
|
||||||
|
Reject: 0,
|
||||||
|
Announce: 0,
|
||||||
|
Undo: 0,
|
||||||
|
Like: 0,
|
||||||
|
Dislike: 0,
|
||||||
|
Flag: 0,
|
||||||
|
View: 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private buildAPStats () {
|
||||||
|
return {
|
||||||
|
totalActivityPubMessagesProcessed: this.inboxMessages.processed,
|
||||||
|
|
||||||
|
totalActivityPubMessagesSuccesses: this.inboxMessages.successes,
|
||||||
|
|
||||||
|
// Dirty, but simpler and with type checking
|
||||||
|
totalActivityPubCreateMessagesSuccesses: this.inboxMessages.successesPerType.Create,
|
||||||
|
totalActivityPubUpdateMessagesSuccesses: this.inboxMessages.successesPerType.Update,
|
||||||
|
totalActivityPubDeleteMessagesSuccesses: this.inboxMessages.successesPerType.Delete,
|
||||||
|
totalActivityPubFollowMessagesSuccesses: this.inboxMessages.successesPerType.Follow,
|
||||||
|
totalActivityPubAcceptMessagesSuccesses: this.inboxMessages.successesPerType.Accept,
|
||||||
|
totalActivityPubRejectMessagesSuccesses: this.inboxMessages.successesPerType.Reject,
|
||||||
|
totalActivityPubAnnounceMessagesSuccesses: this.inboxMessages.successesPerType.Announce,
|
||||||
|
totalActivityPubUndoMessagesSuccesses: this.inboxMessages.successesPerType.Undo,
|
||||||
|
totalActivityPubLikeMessagesSuccesses: this.inboxMessages.successesPerType.Like,
|
||||||
|
totalActivityPubDislikeMessagesSuccesses: this.inboxMessages.successesPerType.Dislike,
|
||||||
|
totalActivityPubFlagMessagesSuccesses: this.inboxMessages.successesPerType.Flag,
|
||||||
|
totalActivityPubViewMessagesSuccesses: this.inboxMessages.successesPerType.View,
|
||||||
|
|
||||||
|
totalActivityPubCreateMessagesErrors: this.inboxMessages.errorsPerType.Create,
|
||||||
|
totalActivityPubUpdateMessagesErrors: this.inboxMessages.errorsPerType.Update,
|
||||||
|
totalActivityPubDeleteMessagesErrors: this.inboxMessages.errorsPerType.Delete,
|
||||||
|
totalActivityPubFollowMessagesErrors: this.inboxMessages.errorsPerType.Follow,
|
||||||
|
totalActivityPubAcceptMessagesErrors: this.inboxMessages.errorsPerType.Accept,
|
||||||
|
totalActivityPubRejectMessagesErrors: this.inboxMessages.errorsPerType.Reject,
|
||||||
|
totalActivityPubAnnounceMessagesErrors: this.inboxMessages.errorsPerType.Announce,
|
||||||
|
totalActivityPubUndoMessagesErrors: this.inboxMessages.errorsPerType.Undo,
|
||||||
|
totalActivityPubLikeMessagesErrors: this.inboxMessages.errorsPerType.Like,
|
||||||
|
totalActivityPubDislikeMessagesErrors: this.inboxMessages.errorsPerType.Dislike,
|
||||||
|
totalActivityPubFlagMessagesErrors: this.inboxMessages.errorsPerType.Flag,
|
||||||
|
totalActivityPubViewMessagesErrors: this.inboxMessages.errorsPerType.View,
|
||||||
|
|
||||||
|
totalActivityPubMessagesErrors: this.inboxMessages.errors,
|
||||||
|
|
||||||
|
activityPubMessagesProcessedPerSecond: this.buildActivityPubMessagesProcessedPerSecond(),
|
||||||
|
totalActivityPubMessagesWaiting: this.inboxMessages.waiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static get Instance () {
|
static get Instance () {
|
||||||
return this.instance || (this.instance = new this())
|
return this.instance || (this.instance = new this())
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import { waitJobs } from '../../../../shared/extra-utils/server/jobs'
|
||||||
import { getStats } from '../../../../shared/extra-utils/server/stats'
|
import { getStats } from '../../../../shared/extra-utils/server/stats'
|
||||||
import { addVideoCommentThread } from '../../../../shared/extra-utils/videos/video-comments'
|
import { addVideoCommentThread } from '../../../../shared/extra-utils/videos/video-comments'
|
||||||
import { ServerStats } from '../../../../shared/models/server/server-stats.model'
|
import { ServerStats } from '../../../../shared/models/server/server-stats.model'
|
||||||
|
import { ActivityType } from '@shared/models'
|
||||||
|
|
||||||
const expect = chai.expect
|
const expect = chai.expect
|
||||||
|
|
||||||
|
@ -201,6 +202,22 @@ describe('Test stats (excluding redundancy)', function () {
|
||||||
const second: ServerStats = res2.body
|
const second: ServerStats = res2.body
|
||||||
|
|
||||||
expect(second.totalActivityPubMessagesProcessed).to.be.greaterThan(first.totalActivityPubMessagesProcessed)
|
expect(second.totalActivityPubMessagesProcessed).to.be.greaterThan(first.totalActivityPubMessagesProcessed)
|
||||||
|
const apTypes: ActivityType[] = [
|
||||||
|
'Create', 'Update', 'Delete', 'Follow', 'Accept', 'Announce', 'Undo', 'Like', 'Reject', 'View', 'Dislike', 'Flag'
|
||||||
|
]
|
||||||
|
|
||||||
|
const processed = apTypes.reduce(
|
||||||
|
(previous, type) => previous + second['totalActivityPub' + type + 'MessagesSuccesses'],
|
||||||
|
0
|
||||||
|
)
|
||||||
|
expect(second.totalActivityPubMessagesProcessed).to.equal(processed)
|
||||||
|
expect(second.totalActivityPubMessagesSuccesses).to.equal(processed)
|
||||||
|
|
||||||
|
expect(second.totalActivityPubMessagesErrors).to.equal(0)
|
||||||
|
|
||||||
|
for (const apType of apTypes) {
|
||||||
|
expect(second['totalActivityPub' + apType + 'MessagesErrors']).to.equal(0)
|
||||||
|
}
|
||||||
|
|
||||||
await wait(6000)
|
await wait(6000)
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import { VideoRedundancyStrategyWithManual } from '../redundancy'
|
import { VideoRedundancyStrategyWithManual } from '../redundancy'
|
||||||
|
|
||||||
export interface ServerStats {
|
export interface ServerStats {
|
||||||
totalUsers: number
|
totalUsers: number
|
||||||
totalDailyActiveUsers: number
|
totalDailyActiveUsers: number
|
||||||
|
@ -20,6 +19,35 @@ export interface ServerStats {
|
||||||
videosRedundancy: VideosRedundancyStats[]
|
videosRedundancy: VideosRedundancyStats[]
|
||||||
|
|
||||||
totalActivityPubMessagesProcessed: number
|
totalActivityPubMessagesProcessed: number
|
||||||
|
totalActivityPubMessagesSuccesses: number
|
||||||
|
totalActivityPubMessagesErrors: number
|
||||||
|
|
||||||
|
totalActivityPubCreateMessagesSuccesses: number
|
||||||
|
totalActivityPubUpdateMessagesSuccesses: number
|
||||||
|
totalActivityPubDeleteMessagesSuccesses: number
|
||||||
|
totalActivityPubFollowMessagesSuccesses: number
|
||||||
|
totalActivityPubAcceptMessagesSuccesses: number
|
||||||
|
totalActivityPubRejectMessagesSuccesses: number
|
||||||
|
totalActivityPubAnnounceMessagesSuccesses: number
|
||||||
|
totalActivityPubUndoMessagesSuccesses: number
|
||||||
|
totalActivityPubLikeMessagesSuccesses: number
|
||||||
|
totalActivityPubDislikeMessagesSuccesses: number
|
||||||
|
totalActivityPubFlagMessagesSuccesses: number
|
||||||
|
totalActivityPubViewMessagesSuccesses: number
|
||||||
|
|
||||||
|
totalActivityPubCreateMessagesErrors: number
|
||||||
|
totalActivityPubUpdateMessagesErrors: number
|
||||||
|
totalActivityPubDeleteMessagesErrors: number
|
||||||
|
totalActivityPubFollowMessagesErrors: number
|
||||||
|
totalActivityPubAcceptMessagesErrors: number
|
||||||
|
totalActivityPubRejectMessagesErrors: number
|
||||||
|
totalActivityPubAnnounceMessagesErrors: number
|
||||||
|
totalActivityPubUndoMessagesErrors: number
|
||||||
|
totalActivityPubLikeMessagesErrors: number
|
||||||
|
totalActivityPubDislikeMessagesErrors: number
|
||||||
|
totalActivityPubFlagMessagesErrors: number
|
||||||
|
totalActivityPubViewMessagesErrors: number
|
||||||
|
|
||||||
activityPubMessagesProcessedPerSecond: number
|
activityPubMessagesProcessedPerSecond: number
|
||||||
totalActivityPubMessagesWaiting: number
|
totalActivityPubMessagesWaiting: number
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue