mirror of https://github.com/Chocobozzz/PeerTube
Process slow followers in unicast job queue
parent
e81f6ccf98
commit
9db437c815
|
@ -134,9 +134,9 @@ const REMOTE_SCHEME = {
|
||||||
}
|
}
|
||||||
|
|
||||||
const JOB_ATTEMPTS: { [id in JobType]: number } = {
|
const JOB_ATTEMPTS: { [id in JobType]: number } = {
|
||||||
'activitypub-http-broadcast': 5,
|
'activitypub-http-broadcast': 1,
|
||||||
'activitypub-http-unicast': 5,
|
'activitypub-http-unicast': 1,
|
||||||
'activitypub-http-fetcher': 5,
|
'activitypub-http-fetcher': 2,
|
||||||
'activitypub-follow': 5,
|
'activitypub-follow': 5,
|
||||||
'activitypub-cleaner': 1,
|
'activitypub-cleaner': 1,
|
||||||
'video-file-import': 1,
|
'video-file-import': 1,
|
||||||
|
@ -153,7 +153,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
|
||||||
// Excluded keys are jobs that can be configured by admins
|
// Excluded keys are jobs that can be configured by admins
|
||||||
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
|
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
|
||||||
'activitypub-http-broadcast': 1,
|
'activitypub-http-broadcast': 1,
|
||||||
'activitypub-http-unicast': 5,
|
'activitypub-http-unicast': 10,
|
||||||
'activitypub-http-fetcher': 3,
|
'activitypub-http-fetcher': 3,
|
||||||
'activitypub-cleaner': 1,
|
'activitypub-cleaner': 1,
|
||||||
'activitypub-follow': 1,
|
'activitypub-follow': 1,
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import { Transaction } from 'sequelize'
|
import { Transaction } from 'sequelize'
|
||||||
|
import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
|
||||||
import { getServerActor } from '@server/models/application/application'
|
import { getServerActor } from '@server/models/application/application'
|
||||||
import { ContextType } from '@shared/models/activitypub/context'
|
import { ContextType } from '@shared/models/activitypub/context'
|
||||||
import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
|
import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
|
||||||
|
@ -119,16 +120,41 @@ async function broadcastToActors (
|
||||||
function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
|
function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
|
||||||
if (uris.length === 0) return undefined
|
if (uris.length === 0) return undefined
|
||||||
|
|
||||||
logger.debug('Creating broadcast job.', { uris })
|
const broadcastUris: string[] = []
|
||||||
|
const unicastUris: string[] = []
|
||||||
|
|
||||||
|
// Bad URIs could be slow to respond, prefer to process them in a dedicated queue
|
||||||
|
for (const uri of uris) {
|
||||||
|
if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
|
||||||
|
unicastUris.push(uri)
|
||||||
|
} else {
|
||||||
|
broadcastUris.push(uri)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
|
||||||
|
|
||||||
|
if (broadcastUris.length !== 0) {
|
||||||
const payload = {
|
const payload = {
|
||||||
uris,
|
uris: broadcastUris,
|
||||||
signatureActorId: byActor.id,
|
signatureActorId: byActor.id,
|
||||||
body: data,
|
body: data,
|
||||||
contextType
|
contextType
|
||||||
}
|
}
|
||||||
|
|
||||||
return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
|
JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const unicastUri of unicastUris) {
|
||||||
|
const payload = {
|
||||||
|
uri: unicastUri,
|
||||||
|
signatureActorId: byActor.id,
|
||||||
|
body: data,
|
||||||
|
contextType
|
||||||
|
}
|
||||||
|
|
||||||
|
JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
|
function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
import { logger, loggerTagsFactory } from '@server/helpers/logger'
|
import { logger, loggerTagsFactory } from '@server/helpers/logger'
|
||||||
import { PeerTubeRequestError } from '@server/helpers/requests'
|
import { PeerTubeRequestError } from '@server/helpers/requests'
|
||||||
import { ActorFollowScoreCache } from '@server/lib/files-cache'
|
|
||||||
import { VideoLoadByUrlType } from '@server/lib/model-loaders'
|
import { VideoLoadByUrlType } from '@server/lib/model-loaders'
|
||||||
import { VideoModel } from '@server/models/video/video'
|
import { VideoModel } from '@server/models/video/video'
|
||||||
import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models'
|
import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models'
|
||||||
import { HttpStatusCode } from '@shared/models'
|
import { HttpStatusCode } from '@shared/models'
|
||||||
|
import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
|
||||||
import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared'
|
import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared'
|
||||||
import { APVideoUpdater } from './updater'
|
import { APVideoUpdater } from './updater'
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: {
|
||||||
|
|
||||||
await syncVideoExternalAttributes(video, videoObject, options.syncParam)
|
await syncVideoExternalAttributes(video, videoObject, options.syncParam)
|
||||||
|
|
||||||
ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
|
ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
|
||||||
|
|
||||||
return video
|
return video
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: {
|
||||||
|
|
||||||
logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() })
|
logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() })
|
||||||
|
|
||||||
ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
|
ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
|
||||||
|
|
||||||
// Don't refresh in loop
|
// Don't refresh in loop
|
||||||
await video.setAsRefreshed()
|
await video.setAsRefreshed()
|
||||||
|
|
|
@ -1,22 +1,28 @@
|
||||||
import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants'
|
import { ACTOR_FOLLOW_SCORE } from '../initializers/constants'
|
||||||
import { logger } from '../../helpers/logger'
|
import { logger } from '../helpers/logger'
|
||||||
|
|
||||||
// Cache follows scores, instead of writing them too often in database
|
// Cache follows scores, instead of writing them too often in database
|
||||||
// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
|
// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
|
||||||
class ActorFollowScoreCache {
|
class ActorFollowHealthCache {
|
||||||
|
|
||||||
|
private static instance: ActorFollowHealthCache
|
||||||
|
|
||||||
private static instance: ActorFollowScoreCache
|
|
||||||
private pendingFollowsScore: { [ url: string ]: number } = {}
|
private pendingFollowsScore: { [ url: string ]: number } = {}
|
||||||
|
|
||||||
private pendingBadServer = new Set<number>()
|
private pendingBadServer = new Set<number>()
|
||||||
private pendingGoodServer = new Set<number>()
|
private pendingGoodServer = new Set<number>()
|
||||||
|
|
||||||
|
private badInboxes = new Set<string>()
|
||||||
|
|
||||||
private constructor () {}
|
private constructor () {}
|
||||||
|
|
||||||
static get Instance () {
|
static get Instance () {
|
||||||
return this.instance || (this.instance = new this())
|
return this.instance || (this.instance = new this())
|
||||||
}
|
}
|
||||||
|
|
||||||
updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) {
|
updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) {
|
||||||
|
this.badInboxes.clear()
|
||||||
|
|
||||||
if (goodInboxes.length === 0 && badInboxes.length === 0) return
|
if (goodInboxes.length === 0 && badInboxes.length === 0) return
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -34,9 +40,14 @@ class ActorFollowScoreCache {
|
||||||
if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
|
if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
|
||||||
|
|
||||||
this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
|
this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
|
||||||
|
this.badInboxes.add(badInbox)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isBadInbox (inboxUrl: string) {
|
||||||
|
return this.badInboxes.has(inboxUrl)
|
||||||
|
}
|
||||||
|
|
||||||
addBadServerId (serverId: number) {
|
addBadServerId (serverId: number) {
|
||||||
this.pendingBadServer.add(serverId)
|
this.pendingBadServer.add(serverId)
|
||||||
}
|
}
|
||||||
|
@ -71,5 +82,5 @@ class ActorFollowScoreCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
export {
|
export {
|
||||||
ActorFollowScoreCache
|
ActorFollowHealthCache
|
||||||
}
|
}
|
|
@ -1,3 +1,3 @@
|
||||||
export * from './actor-follow-score-cache'
|
|
||||||
export * from './videos-preview-cache'
|
export * from './videos-preview-cache'
|
||||||
export * from './videos-caption-cache'
|
export * from './videos-caption-cache'
|
||||||
|
export * from './videos-torrent-cache'
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
import { map } from 'bluebird'
|
import { map } from 'bluebird'
|
||||||
import { Job } from 'bull'
|
import { Job } from 'bull'
|
||||||
|
import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
|
||||||
import { ActivitypubHttpBroadcastPayload } from '@shared/models'
|
import { ActivitypubHttpBroadcastPayload } from '@shared/models'
|
||||||
import { logger } from '../../../helpers/logger'
|
import { logger } from '../../../helpers/logger'
|
||||||
import { doRequest } from '../../../helpers/requests'
|
import { doRequest } from '../../../helpers/requests'
|
||||||
import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
|
import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
|
||||||
import { ActorFollowScoreCache } from '../../files-cache'
|
|
||||||
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
|
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
|
||||||
|
|
||||||
async function processActivityPubHttpBroadcast (job: Job) {
|
async function processActivityPubHttpBroadcast (job: Job) {
|
||||||
|
@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) {
|
||||||
const badUrls: string[] = []
|
const badUrls: string[] = []
|
||||||
const goodUrls: string[] = []
|
const goodUrls: string[] = []
|
||||||
|
|
||||||
await map(payload.uris, uri => {
|
await map(payload.uris, async uri => {
|
||||||
return doRequest(uri, options)
|
try {
|
||||||
.then(() => goodUrls.push(uri))
|
await doRequest(uri, options)
|
||||||
.catch(() => badUrls.push(uri))
|
goodUrls.push(uri)
|
||||||
|
} catch (err) {
|
||||||
|
logger.debug('HTTP broadcast to %s failed.', uri, { err })
|
||||||
|
badUrls.push(uri)
|
||||||
|
}
|
||||||
}, { concurrency: BROADCAST_CONCURRENCY })
|
}, { concurrency: BROADCAST_CONCURRENCY })
|
||||||
|
|
||||||
return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
|
return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
|
@ -2,7 +2,7 @@ import { Job } from 'bull'
|
||||||
import { ActivitypubHttpUnicastPayload } from '@shared/models'
|
import { ActivitypubHttpUnicastPayload } from '@shared/models'
|
||||||
import { logger } from '../../../helpers/logger'
|
import { logger } from '../../../helpers/logger'
|
||||||
import { doRequest } from '../../../helpers/requests'
|
import { doRequest } from '../../../helpers/requests'
|
||||||
import { ActorFollowScoreCache } from '../../files-cache'
|
import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
|
||||||
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
|
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
|
||||||
|
|
||||||
async function processActivityPubHttpUnicast (job: Job) {
|
async function processActivityPubHttpUnicast (job: Job) {
|
||||||
|
@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await doRequest(uri, options)
|
await doRequest(uri, options)
|
||||||
ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
|
ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], [])
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
|
ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ])
|
||||||
|
|
||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils'
|
||||||
import { logger } from '../../helpers/logger'
|
import { logger } from '../../helpers/logger'
|
||||||
import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
|
import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
|
||||||
import { ActorFollowModel } from '../../models/actor/actor-follow'
|
import { ActorFollowModel } from '../../models/actor/actor-follow'
|
||||||
import { ActorFollowScoreCache } from '../files-cache'
|
import { ActorFollowHealthCache } from '../actor-follow-health-cache'
|
||||||
import { AbstractScheduler } from './abstract-scheduler'
|
import { AbstractScheduler } from './abstract-scheduler'
|
||||||
|
|
||||||
export class ActorFollowScheduler extends AbstractScheduler {
|
export class ActorFollowScheduler extends AbstractScheduler {
|
||||||
|
@ -22,13 +22,13 @@ export class ActorFollowScheduler extends AbstractScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async processPendingScores () {
|
private async processPendingScores () {
|
||||||
const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore()
|
const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore()
|
||||||
const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds()
|
const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds()
|
||||||
const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds()
|
const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds()
|
||||||
|
|
||||||
ActorFollowScoreCache.Instance.clearPendingFollowsScore()
|
ActorFollowHealthCache.Instance.clearPendingFollowsScore()
|
||||||
ActorFollowScoreCache.Instance.clearBadFollowingServerIds()
|
ActorFollowHealthCache.Instance.clearBadFollowingServerIds()
|
||||||
ActorFollowScoreCache.Instance.clearGoodFollowingServerIds()
|
ActorFollowHealthCache.Instance.clearGoodFollowingServerIds()
|
||||||
|
|
||||||
for (const inbox of Object.keys(pendingScores)) {
|
for (const inbox of Object.keys(pendingScores)) {
|
||||||
await ActorFollowModel.updateScore(inbox, pendingScores[inbox])
|
await ActorFollowModel.updateScore(inbox, pendingScores[inbox])
|
||||||
|
|
|
@ -11,6 +11,7 @@ import './jobs'
|
||||||
import './logs'
|
import './logs'
|
||||||
import './reverse-proxy'
|
import './reverse-proxy'
|
||||||
import './services'
|
import './services'
|
||||||
|
import './slow-follows'
|
||||||
import './stats'
|
import './stats'
|
||||||
import './tracker'
|
import './tracker'
|
||||||
import './no-client'
|
import './no-client'
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
|
||||||
|
|
||||||
|
import 'mocha'
|
||||||
|
import * as chai from 'chai'
|
||||||
|
import { cleanupTests, createMultipleServers, doubleFollow, PeerTubeServer, setAccessTokensToServers, waitJobs } from '@shared/extra-utils'
|
||||||
|
import { Job } from '@shared/models'
|
||||||
|
|
||||||
|
const expect = chai.expect
|
||||||
|
|
||||||
|
describe('Test slow follows', function () {
|
||||||
|
let servers: PeerTubeServer[] = []
|
||||||
|
|
||||||
|
let afterFollows: Date
|
||||||
|
|
||||||
|
before(async function () {
|
||||||
|
this.timeout(60000)
|
||||||
|
|
||||||
|
servers = await createMultipleServers(3)
|
||||||
|
|
||||||
|
// Get the access tokens
|
||||||
|
await setAccessTokensToServers(servers)
|
||||||
|
|
||||||
|
await doubleFollow(servers[0], servers[1])
|
||||||
|
await doubleFollow(servers[0], servers[2])
|
||||||
|
|
||||||
|
afterFollows = new Date()
|
||||||
|
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
await servers[0].videos.quickUpload({ name: 'video ' + i })
|
||||||
|
}
|
||||||
|
|
||||||
|
await waitJobs(servers)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should only have broadcast jobs', async function () {
|
||||||
|
const { data } = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
|
||||||
|
|
||||||
|
for (const job of data) {
|
||||||
|
expect(new Date(job.createdAt)).below(afterFollows)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should process bad follower', async function () {
|
||||||
|
this.timeout(30000)
|
||||||
|
|
||||||
|
await servers[1].kill()
|
||||||
|
|
||||||
|
// Set server 2 as bad follower
|
||||||
|
await servers[0].videos.quickUpload({ name: 'video 6' })
|
||||||
|
await waitJobs(servers[0])
|
||||||
|
|
||||||
|
afterFollows = new Date()
|
||||||
|
const filter = (job: Job) => new Date(job.createdAt) > afterFollows
|
||||||
|
|
||||||
|
// Resend another broadcast job
|
||||||
|
await servers[0].videos.quickUpload({ name: 'video 7' })
|
||||||
|
await waitJobs(servers[0])
|
||||||
|
|
||||||
|
const resBroadcast = await servers[0].jobs.list({ jobType: 'activitypub-http-broadcast', sort: '-createdAt' })
|
||||||
|
const resUnicast = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
|
||||||
|
|
||||||
|
const broadcast = resBroadcast.data.filter(filter)
|
||||||
|
const unicast = resUnicast.data.filter(filter)
|
||||||
|
|
||||||
|
expect(unicast).to.have.lengthOf(2)
|
||||||
|
expect(broadcast).to.have.lengthOf(2)
|
||||||
|
|
||||||
|
for (const u of unicast) {
|
||||||
|
expect(u.data.uri).to.equal(servers[1].url + '/inbox')
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const b of broadcast) {
|
||||||
|
expect(b.data.uris).to.have.lengthOf(1)
|
||||||
|
expect(b.data.uris[0]).to.equal(servers[2].url + '/inbox')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async function () {
|
||||||
|
await cleanupTests(servers)
|
||||||
|
})
|
||||||
|
})
|
Loading…
Reference in New Issue