Merge branch 'release/4.2.0' into develop

pull/5067/head
Chocobozzz 2022-06-17 14:17:06 +02:00
commit fba911e2c8
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
12 changed files with 154 additions and 69 deletions

View File

@ -25,6 +25,7 @@ export class JobsComponent extends RestTable implements OnInit {
'activitypub-follow',
'activitypub-http-broadcast',
'activitypub-http-broadcast-parallel',
'activitypub-http-fetcher',
'activitypub-http-unicast',
'activitypub-refresher',

View File

@ -0,0 +1,89 @@
import Bluebird from 'bluebird'
import { wait } from '@shared/core-utils'
import {
createSingleServer,
doubleFollow,
killallServers,
PeerTubeServer,
setAccessTokensToServers,
waitJobs
} from '@shared/server-commands'
let servers: PeerTubeServer[]
const viewers: { xForwardedFor: string }[] = []
let videoId: string
run()
.then(() => process.exit(0))
.catch(err => console.error(err))
.finally(() => killallServers(servers))
async function run () {
await prepare()
while (true) {
await runViewers()
}
}
async function prepare () {
console.log('Preparing servers...')
const config = {
log: {
level: 'info'
},
rates_limit: {
api: {
max: 5_000_000
}
},
views: {
videos: {
local_buffer_update_interval: '30 minutes',
ip_view_expiration: '1 hour'
}
}
}
servers = await Promise.all([
createSingleServer(1, config, { nodeArgs: [ '--inspect' ] }),
createSingleServer(2, config),
createSingleServer(3, config)
])
await setAccessTokensToServers(servers)
await doubleFollow(servers[0], servers[1])
await doubleFollow(servers[0], servers[2])
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
videoId = uuid
await waitJobs(servers)
const THOUSAND_VIEWERS = 2
for (let i = 2; i < 252; i++) {
for (let j = 2; j < 6; j++) {
for (let k = 2; k < THOUSAND_VIEWERS + 2; k++) {
viewers.push({ xForwardedFor: `0.${k}.${j}.${i},127.0.0.1` })
}
}
}
console.log('Servers preparation finished.')
}
async function runViewers () {
console.log('Will run views of %d viewers.', viewers.length)
const before = new Date().getTime()
await Bluebird.map(viewers, viewer => {
return servers[0].views.simulateView({ id: videoId, xForwardedFor: viewer.xForwardedFor })
}, { concurrency: 100 })
console.log('Finished to run views in %d seconds.', (new Date().getTime() - before) / 1000)
await wait(5000)
}

View File

@ -139,6 +139,7 @@ const REMOTE_SCHEME = {
const JOB_ATTEMPTS: { [id in JobType]: number } = {
'activitypub-http-broadcast': 1,
'activitypub-http-broadcast-parallel': 1,
'activitypub-http-unicast': 1,
'activitypub-http-fetcher': 2,
'activitypub-follow': 5,
@ -159,6 +160,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
// Excluded keys are jobs that can be configured by admins
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
'activitypub-http-broadcast': 1,
'activitypub-http-broadcast-parallel': 30,
'activitypub-http-unicast': 10,
'activitypub-http-fetcher': 3,
'activitypub-cleaner': 1,
@ -176,6 +178,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
}
const JOB_TTL: { [id in JobType]: number } = {
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
'activitypub-http-broadcast-parallel': 60000 * 10, // 10 minutes
'activitypub-http-unicast': 60000 * 10, // 10 minutes
'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
'activitypub-follow': 60000 * 10, // 10 minutes
@ -371,7 +374,7 @@ const VIEW_LIFETIME = {
VIEWER_STATS: 60000 * 60 // 1 hour
}
const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 10
const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 100
let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour

View File

@ -23,7 +23,7 @@ async function createOrUpdateLocalVideoViewer (watchAction: WatchActionObject, v
: null,
videoId: video.id
})
}, { transaction: t })
await LocalVideoViewerWatchSectionModel.bulkCreateSections({
localVideoViewerId: localVideoViewer.id,
@ -31,7 +31,9 @@ async function createOrUpdateLocalVideoViewer (watchAction: WatchActionObject, v
watchSections: watchAction.watchSections.map(s => ({
start: s.startTimestamp,
end: s.endTimestamp
}))
})),
transaction: t
})
}

View File

@ -26,7 +26,7 @@ async function sendView (options: {
return buildViewActivity({ url, byActor, video, audience, type })
}
return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' })
return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
}
// ---------------------------------------------------------------------------

View File

@ -3,7 +3,7 @@ import { ACTIVITY_PUB } from '@server/initializers/constants'
import { ActorModel } from '@server/models/actor/actor'
import { VideoModel } from '@server/models/video/video'
import { VideoShareModel } from '@server/models/video/video-share'
import { MActorFollowersUrl, MActorLight, MActorUrl, MCommentOwner, MCommentOwnerVideo, MVideoId } from '@server/types/models'
import { MActorFollowersUrl, MActorUrl, MCommentOwner, MCommentOwnerVideo, MVideoId } from '@server/types/models'
import { ActivityAudience } from '@shared/models'
function getOriginVideoAudience (accountActor: MActorUrl, actorsInvolvedInVideo: MActorFollowersUrl[] = []): ActivityAudience {
@ -51,13 +51,13 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: MActorFollowersUrl[
}
async function getActorsInvolvedInVideo (video: MVideoId, t: Transaction) {
const actors: MActorLight[] = await VideoShareModel.loadActorsByShare(video.id, t)
const actors = await VideoShareModel.listActorIdsAndFollowerUrlsByShare(video.id, t)
const videoAll = video as VideoModel
const videoActor = videoAll.VideoChannel?.Account
? videoAll.VideoChannel.Account.Actor
: await ActorModel.loadFromAccountByVideoId(video.id, t)
: await ActorModel.loadAccountActorFollowerUrlByVideoId(video.id, t)
actors.push(videoActor)

View File

@ -15,17 +15,18 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
byActor: MActorLight
video: MVideoImmutable | MVideoAccountLight
contextType: ContextType
parallelizable?: boolean
transaction?: Transaction
}) {
const { byActor, video, transaction, contextType } = options
const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
const { byActor, video, transaction, contextType, parallelizable } = options
// Send to origin
if (video.isOwned() === false) {
return sendVideoActivityToOrigin(activityBuilder, options)
}
const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
// Send to followers
const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
const activity = activityBuilder(audience)
@ -38,6 +39,7 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
toFollowersOf: actorsInvolvedInVideo,
transaction,
actorsException,
parallelizable,
contextType
})
}
@ -130,9 +132,10 @@ async function broadcastToFollowers (options: {
transaction: Transaction
contextType: ContextType
parallelizable?: boolean
actorsException?: MActorWithInboxes[]
}) {
const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options
const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options
const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
@ -141,6 +144,7 @@ async function broadcastToFollowers (options: {
uris,
data,
byActor,
parallelizable,
contextType
})
})
@ -173,8 +177,9 @@ function broadcastTo (options: {
data: any
byActor: MActorId
contextType: ContextType
parallelizable?: boolean // default to false
}) {
const { uris, data, byActor, contextType } = options
const { uris, data, byActor, contextType, parallelizable } = options
if (uris.length === 0) return undefined
@ -200,7 +205,13 @@ function broadcastTo (options: {
contextType
}
JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
JobQueue.Instance.createJob({
type: parallelizable
? 'activitypub-http-broadcast-parallel'
: 'activitypub-http-broadcast',
payload
})
}
for (const unicastUri of unicastUris) {

View File

@ -43,6 +43,7 @@ import { processVideosViewsStats } from './handlers/video-views-stats'
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
{ type: 'activitypub-http-cleaner', payload: {} } |
@ -68,6 +69,7 @@ export type CreateJobOptions = {
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,
@ -93,6 +95,7 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> }
const jobTypes: JobType[] = [
'activitypub-follow',
'activitypub-http-broadcast',
'activitypub-http-broadcast-parallel',
'activitypub-http-fetcher',
'activitypub-http-unicast',
'activitypub-cleaner',

View File

@ -1,4 +1,3 @@
import { isTestInstance } from '@server/helpers/core-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { VIEW_LIFETIME } from '@server/initializers/constants'

View File

@ -1,5 +1,5 @@
import { values } from 'lodash'
import { literal, Op, Transaction } from 'sequelize'
import { literal, Op, QueryTypes, Transaction } from 'sequelize'
import {
AllowNull,
BelongsTo,
@ -43,15 +43,18 @@ import {
MActorAccountChannelId,
MActorAPAccount,
MActorAPChannel,
MActorFollowersUrl,
MActorFormattable,
MActorFull,
MActorHost,
MActorId,
MActorServer,
MActorSummaryFormattable,
MActorUrl,
MActorWithInboxes
} from '../../types/models'
import { AccountModel } from '../account/account'
import { getServerActor } from '../application/application'
import { ServerModel } from '../server/server'
import { isOutdated, throwIfNotValid } from '../utils'
import { VideoModel } from '../video/video'
@ -304,7 +307,10 @@ export class ActorModel extends Model<Partial<AttributesOnly<ActorModel>>> {
})
VideoChannel: VideoChannelModel
static load (id: number): Promise<MActor> {
static async load (id: number): Promise<MActor> {
const actorServer = await getServerActor()
if (id === actorServer.id) return actorServer
return ActorModel.unscoped().findByPk(id)
}
@ -312,48 +318,21 @@ export class ActorModel extends Model<Partial<AttributesOnly<ActorModel>>> {
return ActorModel.scope(ScopeNames.FULL).findByPk(id)
}
static loadFromAccountByVideoId (videoId: number, transaction: Transaction): Promise<MActor> {
const query = {
include: [
{
attributes: [ 'id' ],
model: AccountModel.unscoped(),
required: true,
include: [
{
attributes: [ 'id' ],
model: VideoChannelModel.unscoped(),
required: true,
include: [
{
attributes: [ 'id' ],
model: VideoModel.unscoped(),
required: true,
where: {
id: videoId
}
}
]
}
]
}
],
static loadAccountActorFollowerUrlByVideoId (videoId: number, transaction: Transaction) {
const query = `SELECT "actor"."id" AS "id", "actor"."followersUrl" AS "followersUrl" ` +
`FROM "actor" ` +
`INNER JOIN "account" ON "actor"."id" = "account"."actorId" ` +
`INNER JOIN "videoChannel" ON "videoChannel"."accountId" = "account"."id" ` +
`INNER JOIN "video" ON "video"."channelId" = "videoChannel"."id" AND "video"."id" = :videoId`
const options = {
type: QueryTypes.SELECT as QueryTypes.SELECT,
replacements: { videoId },
plain: true as true,
transaction
}
return ActorModel.unscoped().findOne(query)
}
static isActorUrlExist (url: string) {
const query = {
raw: true,
where: {
url
}
}
return ActorModel.unscoped().findOne(query)
.then(a => !!a)
return ActorModel.sequelize.query<MActorId & MActorFollowersUrl>(query, options)
}
static listByFollowersUrls (followersUrls: string[], transaction?: Transaction): Promise<MActorFull[]> {

View File

@ -3,7 +3,7 @@ import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Mode
import { AttributesOnly } from '@shared/typescript-utils'
import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc'
import { CONSTRAINTS_FIELDS } from '../../initializers/constants'
import { MActorDefault } from '../../types/models'
import { MActorDefault, MActorFollowersUrl, MActorId } from '../../types/models'
import { MVideoShareActor, MVideoShareFull } from '../../types/models/video'
import { ActorModel } from '../actor/actor'
import { buildLocalActorIdsIn, throwIfNotValid } from '../utils'
@ -107,22 +107,19 @@ export class VideoShareModel extends Model<Partial<AttributesOnly<VideoShareMode
})
}
static loadActorsByShare (videoId: number, t: Transaction): Promise<MActorDefault[]> {
const query = {
where: {
videoId
},
include: [
{
model: ActorModel,
required: true
}
],
static listActorIdsAndFollowerUrlsByShare (videoId: number, t: Transaction) {
const query = `SELECT "actor"."id" AS "id", "actor"."followersUrl" AS "followersUrl" ` +
`FROM "videoShare" ` +
`INNER JOIN "actor" ON "actor"."id" = "videoShare"."actorId" ` +
`WHERE "videoShare"."videoId" = :videoId`
const options = {
type: QueryTypes.SELECT as QueryTypes.SELECT,
replacements: { videoId },
transaction: t
}
return VideoShareModel.scope(ScopeNames.FULL).findAll(query)
.then((res: MVideoShareFull[]) => res.map(r => r.Actor))
return VideoShareModel.sequelize.query<MActorId & MActorFollowersUrl>(query, options)
}
static loadActorsWhoSharedVideosOf (actorOwnerId: number, t: Transaction): Promise<MActorDefault[]> {

View File

@ -9,6 +9,7 @@ export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
export type JobType =
| 'activitypub-http-unicast'
| 'activitypub-http-broadcast'
| 'activitypub-http-broadcast-parallel'
| 'activitypub-http-fetcher'
| 'activitypub-cleaner'
| 'activitypub-follow'