PeerTube/server/lib/activitypub/inbox-manager.ts

48 lines
1.4 KiB
TypeScript
Raw Normal View History

2022-08-02 14:41:44 +02:00
import PQueue from 'p-queue'
2020-12-15 13:34:58 +01:00
import { logger } from '@server/helpers/logger'
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
import { MActorDefault, MActorSignature } from '@server/types/models'
import { Activity } from '@shared/models'
import { StatsManager } from '../stat-manager'
import { processActivities } from './process'
2020-12-15 13:34:58 +01:00
class InboxManager {
private static instance: InboxManager
2022-08-02 14:41:44 +02:00
private readonly inboxQueue: PQueue
2020-12-15 13:34:58 +01:00
private constructor () {
2022-08-02 14:41:44 +02:00
this.inboxQueue = new PQueue({ concurrency: 1 })
2020-12-15 13:34:58 +01:00
setInterval(() => {
StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
2021-10-22 10:28:00 +02:00
}, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
2020-12-15 13:34:58 +01:00
}
2022-08-02 14:41:44 +02:00
addInboxMessage (param: {
activities: Activity[]
signatureActor?: MActorSignature
inboxActor?: MActorDefault
}) {
this.inboxQueue.add(() => {
const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
return processActivities(param.activities, options)
}).catch(err => logger.error('Error with inbox queue.', { err }))
2020-12-15 13:34:58 +01:00
}
2021-02-18 14:44:12 +01:00
getActivityPubMessagesWaiting () {
2022-08-02 14:41:44 +02:00
return this.inboxQueue.size + this.inboxQueue.pending
2021-02-18 14:44:12 +01:00
}
2020-12-15 13:34:58 +01:00
static get Instance () {
return this.instance || (this.instance = new this())
}
}
// ---------------------------------------------------------------------------
export {
InboxManager
}