mirror of https://github.com/Chocobozzz/PeerTube
				
				
				
			
		
			
				
	
	
		
			57 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			TypeScript
		
	
	
			
		
		
	
	
			57 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			TypeScript
		
	
	
import { queue, QueueObject } from 'async'
 | 
						|
import { logger } from '@server/helpers/logger'
 | 
						|
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
 | 
						|
import { MActorDefault, MActorSignature } from '@server/types/models'
 | 
						|
import { Activity } from '@shared/models'
 | 
						|
import { StatsManager } from '../stat-manager'
 | 
						|
import { processActivities } from './process'
 | 
						|
 | 
						|
type QueueParam = {
 | 
						|
  activities: Activity[]
 | 
						|
  signatureActor?: MActorSignature
 | 
						|
  inboxActor?: MActorDefault
 | 
						|
}
 | 
						|
 | 
						|
class InboxManager {
 | 
						|
 | 
						|
  private static instance: InboxManager
 | 
						|
 | 
						|
  private readonly inboxQueue: QueueObject<QueueParam>
 | 
						|
 | 
						|
  private constructor () {
 | 
						|
    this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
 | 
						|
      const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
 | 
						|
 | 
						|
      processActivities(task.activities, options)
 | 
						|
        .then(() => cb())
 | 
						|
        .catch(err => {
 | 
						|
          logger.error('Error in process activities.', { err })
 | 
						|
          cb()
 | 
						|
        })
 | 
						|
    })
 | 
						|
 | 
						|
    setInterval(() => {
 | 
						|
      StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
 | 
						|
    }, SCHEDULER_INTERVALS_MS.updateInboxStats)
 | 
						|
  }
 | 
						|
 | 
						|
  addInboxMessage (options: QueueParam) {
 | 
						|
    this.inboxQueue.push(options)
 | 
						|
      .catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
 | 
						|
  }
 | 
						|
 | 
						|
  getActivityPubMessagesWaiting () {
 | 
						|
    return this.inboxQueue.length() + this.inboxQueue.running()
 | 
						|
  }
 | 
						|
 | 
						|
  static get Instance () {
 | 
						|
    return this.instance || (this.instance = new this())
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
// ---------------------------------------------------------------------------
 | 
						|
 | 
						|
export {
 | 
						|
  InboxManager
 | 
						|
}
 |