mirror of https://github.com/Chocobozzz/PeerTube
Try to speed up AP update transaction
parent
75e12406e2
commit
28dfb44b14
|
@ -58,7 +58,7 @@ function transactionRetryer <T> (func: (err: any, data: T) => any) {
|
||||||
|
|
||||||
errorFilter: err => {
|
errorFilter: err => {
|
||||||
const willRetry = (err.name === 'SequelizeDatabaseError')
|
const willRetry = (err.name === 'SequelizeDatabaseError')
|
||||||
logger.debug('Maybe retrying the transaction function.', { willRetry, err })
|
logger.debug('Maybe retrying the transaction function.', { willRetry, err, tags: [ 'sql', 'retry' ] })
|
||||||
return willRetry
|
return willRetry
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -68,6 +68,8 @@ function transactionRetryer <T> (func: (err: any, data: T) => any) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
function updateInstanceWithAnother <M, T extends U, U extends Model<M>> (instanceToUpdate: T, baseInstance: U) {
|
function updateInstanceWithAnother <M, T extends U, U extends Model<M>> (instanceToUpdate: T, baseInstance: U) {
|
||||||
const obj = baseInstance.toJSON()
|
const obj = baseInstance.toJSON()
|
||||||
|
|
||||||
|
@ -82,12 +84,6 @@ function resetSequelizeInstance (instance: Model<any>, savedFields: object) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function afterCommitIfTransaction (t: Transaction, fn: Function) {
|
|
||||||
if (t) return t.afterCommit(() => fn())
|
|
||||||
|
|
||||||
return fn()
|
|
||||||
}
|
|
||||||
|
|
||||||
function deleteNonExistingModels <T extends { hasSameUniqueKeysThan (other: T): boolean } & Pick<Model, 'destroy'>> (
|
function deleteNonExistingModels <T extends { hasSameUniqueKeysThan (other: T): boolean } & Pick<Model, 'destroy'>> (
|
||||||
fromDatabase: T[],
|
fromDatabase: T[],
|
||||||
newModels: T[],
|
newModels: T[],
|
||||||
|
@ -111,6 +107,18 @@ function setAsUpdated (table: string, id: number, transaction?: Transaction) {
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
function runInReadCommittedTransaction <T> (fn: (t: Transaction) => Promise<T>) {
|
||||||
|
return sequelizeTypescript.transaction(t => fn(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
function afterCommitIfTransaction (t: Transaction, fn: Function) {
|
||||||
|
if (t) return t.afterCommit(() => fn())
|
||||||
|
|
||||||
|
return fn()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
export {
|
export {
|
||||||
resetSequelizeInstance,
|
resetSequelizeInstance,
|
||||||
retryTransactionWrapper,
|
retryTransactionWrapper,
|
||||||
|
@ -118,5 +126,6 @@ export {
|
||||||
updateInstanceWithAnother,
|
updateInstanceWithAnother,
|
||||||
afterCommitIfTransaction,
|
afterCommitIfTransaction,
|
||||||
deleteNonExistingModels,
|
deleteNonExistingModels,
|
||||||
setAsUpdated
|
setAsUpdated,
|
||||||
|
runInReadCommittedTransaction
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ async function getOrCreateAPActor (
|
||||||
if (actor.Account) (actor as MActorAccountChannelIdActor).Account.Actor = actor
|
if (actor.Account) (actor as MActorAccountChannelIdActor).Account.Actor = actor
|
||||||
if (actor.VideoChannel) (actor as MActorAccountChannelIdActor).VideoChannel.Actor = actor
|
if (actor.VideoChannel) (actor as MActorAccountChannelIdActor).VideoChannel.Actor = actor
|
||||||
|
|
||||||
const { actor: actorRefreshed, refreshed } = await retryTransactionWrapper(refreshActorIfNeeded, actor, fetchType)
|
const { actor: actorRefreshed, refreshed } = await refreshActorIfNeeded(actor, fetchType)
|
||||||
if (!actorRefreshed) throw new Error('Actor ' + actor.url + ' does not exist anymore.')
|
if (!actorRefreshed) throw new Error('Actor ' + actor.url + ' does not exist anymore.')
|
||||||
|
|
||||||
await scheduleOutboxFetchIfNeeded(actor, created, refreshed, updateCollections)
|
await scheduleOutboxFetchIfNeeded(actor, created, refreshed, updateCollections)
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import { resetSequelizeInstance } from '@server/helpers/database-utils'
|
import { resetSequelizeInstance, runInReadCommittedTransaction } from '@server/helpers/database-utils'
|
||||||
import { logger } from '@server/helpers/logger'
|
import { logger } from '@server/helpers/logger'
|
||||||
import { sequelizeTypescript } from '@server/initializers/database'
|
|
||||||
import { VideoChannelModel } from '@server/models/video/video-channel'
|
import { VideoChannelModel } from '@server/models/video/video-channel'
|
||||||
import { MAccount, MActor, MActorFull, MChannel } from '@server/types/models'
|
import { MAccount, MActor, MActorFull, MChannel } from '@server/types/models'
|
||||||
import { ActivityPubActor, ActorImageType } from '@shared/models'
|
import { ActivityPubActor, ActorImageType } from '@shared/models'
|
||||||
|
@ -32,22 +31,23 @@ export class APActorUpdater {
|
||||||
const bannerInfo = getImageInfoFromObject(this.actorObject, ActorImageType.BANNER)
|
const bannerInfo = getImageInfoFromObject(this.actorObject, ActorImageType.BANNER)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await sequelizeTypescript.transaction(async t => {
|
|
||||||
await this.updateActorInstance(this.actor, this.actorObject)
|
await this.updateActorInstance(this.actor, this.actorObject)
|
||||||
|
|
||||||
await updateActorImageInstance(this.actor, ActorImageType.AVATAR, avatarInfo, t)
|
|
||||||
await updateActorImageInstance(this.actor, ActorImageType.BANNER, bannerInfo, t)
|
|
||||||
|
|
||||||
await this.actor.save({ transaction: t })
|
|
||||||
|
|
||||||
this.accountOrChannel.name = this.actorObject.name || this.actorObject.preferredUsername
|
this.accountOrChannel.name = this.actorObject.name || this.actorObject.preferredUsername
|
||||||
this.accountOrChannel.description = this.actorObject.summary
|
this.accountOrChannel.description = this.actorObject.summary
|
||||||
|
|
||||||
if (this.accountOrChannel instanceof VideoChannelModel) this.accountOrChannel.support = this.actorObject.support
|
if (this.accountOrChannel instanceof VideoChannelModel) this.accountOrChannel.support = this.actorObject.support
|
||||||
|
|
||||||
|
await runInReadCommittedTransaction(async t => {
|
||||||
|
await this.actor.save({ transaction: t })
|
||||||
await this.accountOrChannel.save({ transaction: t })
|
await this.accountOrChannel.save({ transaction: t })
|
||||||
})
|
})
|
||||||
|
|
||||||
|
await runInReadCommittedTransaction(async t => {
|
||||||
|
await updateActorImageInstance(this.actor, ActorImageType.AVATAR, avatarInfo, t)
|
||||||
|
await updateActorImageInstance(this.actor, ActorImageType.BANNER, bannerInfo, t)
|
||||||
|
})
|
||||||
|
|
||||||
logger.info('Remote account %s updated', this.actorObject.url)
|
logger.info('Remote account %s updated', this.actorObject.url)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (this.actor !== undefined && this.actorFieldsSave !== undefined) {
|
if (this.actor !== undefined && this.actorFieldsSave !== undefined) {
|
||||||
|
|
|
@ -24,12 +24,11 @@ async function processCreateView (activity: ActivityView | ActivityCreate, byAct
|
||||||
? activity.object
|
? activity.object
|
||||||
: (activity.object as ViewObject).object
|
: (activity.object as ViewObject).object
|
||||||
|
|
||||||
const options = {
|
const { video } = await getOrCreateAPVideo({
|
||||||
videoObject,
|
videoObject,
|
||||||
fetchType: 'only-video' as 'only-video',
|
fetchType: 'only-video',
|
||||||
allowRefresh: false as false
|
allowRefresh: false
|
||||||
}
|
})
|
||||||
const { video } = await getOrCreateAPVideo(options)
|
|
||||||
|
|
||||||
if (!video.isLive) {
|
if (!video.isLive) {
|
||||||
await Redis.Instance.addVideoView(video.id)
|
await Redis.Instance.addVideoView(video.id)
|
||||||
|
|
|
@ -49,7 +49,7 @@ export abstract class APVideoAbstractBuilder {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
protected async setPreview (video: MVideoFullLight, t: Transaction) {
|
protected async setPreview (video: MVideoFullLight, t?: Transaction) {
|
||||||
// Don't fetch the preview that could be big, create a placeholder instead
|
// Don't fetch the preview that could be big, create a placeholder instead
|
||||||
const previewIcon = getPreviewFromIcons(this.videoObject)
|
const previewIcon = getPreviewFromIcons(this.videoObject)
|
||||||
if (!previewIcon) return
|
if (!previewIcon) return
|
||||||
|
|
|
@ -28,7 +28,7 @@ function getTrackerUrls (object: VideoObject, video: MVideoWithHost) {
|
||||||
async function setVideoTrackers (options: {
|
async function setVideoTrackers (options: {
|
||||||
video: MVideo
|
video: MVideo
|
||||||
trackers: string[]
|
trackers: string[]
|
||||||
transaction?: Transaction
|
transaction: Transaction
|
||||||
}) {
|
}) {
|
||||||
const { video, trackers, transaction } = options
|
const { video, trackers, transaction } = options
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
import { Transaction } from 'sequelize/types'
|
import { Transaction } from 'sequelize/types'
|
||||||
import { resetSequelizeInstance } from '@server/helpers/database-utils'
|
import { resetSequelizeInstance, runInReadCommittedTransaction } from '@server/helpers/database-utils'
|
||||||
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
|
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
|
||||||
import { sequelizeTypescript } from '@server/initializers/database'
|
|
||||||
import { Notifier } from '@server/lib/notifier'
|
import { Notifier } from '@server/lib/notifier'
|
||||||
import { PeerTubeSocket } from '@server/lib/peertube-socket'
|
import { PeerTubeSocket } from '@server/lib/peertube-socket'
|
||||||
import { autoBlacklistVideoIfNeeded } from '@server/lib/video-blacklist'
|
import { autoBlacklistVideoIfNeeded } from '@server/lib/video-blacklist'
|
||||||
|
@ -48,24 +47,26 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
|
||||||
|
|
||||||
const thumbnailModel = await this.tryToGenerateThumbnail(this.video)
|
const thumbnailModel = await this.tryToGenerateThumbnail(this.video)
|
||||||
|
|
||||||
const videoUpdated = await sequelizeTypescript.transaction(async t => {
|
|
||||||
this.checkChannelUpdateOrThrow(channelActor)
|
this.checkChannelUpdateOrThrow(channelActor)
|
||||||
|
|
||||||
const videoUpdated = await this.updateVideo(channelActor.VideoChannel, t, overrideTo)
|
const videoUpdated = await this.updateVideo(channelActor.VideoChannel, undefined, overrideTo)
|
||||||
|
|
||||||
if (thumbnailModel) await videoUpdated.addAndSaveThumbnail(thumbnailModel, t)
|
if (thumbnailModel) await videoUpdated.addAndSaveThumbnail(thumbnailModel)
|
||||||
|
|
||||||
await this.setPreview(videoUpdated, t)
|
await runInReadCommittedTransaction(async t => {
|
||||||
await this.setWebTorrentFiles(videoUpdated, t)
|
await this.setWebTorrentFiles(videoUpdated, t)
|
||||||
await this.setStreamingPlaylists(videoUpdated, t)
|
await this.setStreamingPlaylists(videoUpdated, t)
|
||||||
await this.setTags(videoUpdated, t)
|
|
||||||
await this.setTrackers(videoUpdated, t)
|
|
||||||
await this.setCaptions(videoUpdated, t)
|
|
||||||
await this.setOrDeleteLive(videoUpdated, t)
|
|
||||||
|
|
||||||
return videoUpdated
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
runInReadCommittedTransaction(t => this.setTags(videoUpdated, t)),
|
||||||
|
runInReadCommittedTransaction(t => this.setTrackers(videoUpdated, t)),
|
||||||
|
this.setOrDeleteLive(videoUpdated),
|
||||||
|
this.setPreview(videoUpdated)
|
||||||
|
])
|
||||||
|
|
||||||
|
await runInReadCommittedTransaction(t => this.setCaptions(videoUpdated, t))
|
||||||
|
|
||||||
await autoBlacklistVideoIfNeeded({
|
await autoBlacklistVideoIfNeeded({
|
||||||
video: videoUpdated,
|
video: videoUpdated,
|
||||||
user: undefined,
|
user: undefined,
|
||||||
|
@ -103,7 +104,7 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private updateVideo (channel: MChannelId, transaction: Transaction, overrideTo?: string[]) {
|
private updateVideo (channel: MChannelId, transaction?: Transaction, overrideTo?: string[]) {
|
||||||
const to = overrideTo || this.videoObject.to
|
const to = overrideTo || this.videoObject.to
|
||||||
const videoData = getVideoAttributesFromObject(channel, this.videoObject, to)
|
const videoData = getVideoAttributesFromObject(channel, this.videoObject, to)
|
||||||
this.video.name = videoData.name
|
this.video.name = videoData.name
|
||||||
|
@ -140,7 +141,9 @@ export class APVideoUpdater extends APVideoAbstractBuilder {
|
||||||
await this.insertOrReplaceCaptions(videoUpdated, t)
|
await this.insertOrReplaceCaptions(videoUpdated, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
private async setOrDeleteLive (videoUpdated: MVideoFullLight, transaction: Transaction) {
|
private async setOrDeleteLive (videoUpdated: MVideoFullLight, transaction?: Transaction) {
|
||||||
|
if (!this.video.isLive) return
|
||||||
|
|
||||||
if (this.video.isLive) return this.insertOrReplaceLive(videoUpdated, transaction)
|
if (this.video.isLive) return this.insertOrReplaceLive(videoUpdated, transaction)
|
||||||
|
|
||||||
// Delete existing live if it exists
|
// Delete existing live if it exists
|
||||||
|
|
|
@ -1886,7 +1886,7 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
|
||||||
return Array.isArray(this.VideoFiles) === true && this.VideoFiles.length !== 0
|
return Array.isArray(this.VideoFiles) === true && this.VideoFiles.length !== 0
|
||||||
}
|
}
|
||||||
|
|
||||||
async addAndSaveThumbnail (thumbnail: MThumbnail, transaction: Transaction) {
|
async addAndSaveThumbnail (thumbnail: MThumbnail, transaction?: Transaction) {
|
||||||
thumbnail.videoId = this.id
|
thumbnail.videoId = this.id
|
||||||
|
|
||||||
const savedThumbnail = await thumbnail.save({ transaction })
|
const savedThumbnail = await thumbnail.save({ transaction })
|
||||||
|
|
Loading…
Reference in New Issue