Cleanup invalid rates/comments/shares

pull/1745/head
Chocobozzz 2019-03-19 16:23:02 +01:00
parent d74d29ad9e
commit 2ba9287131
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
16 changed files with 117 additions and 49 deletions

View File

@ -17,11 +17,9 @@ import {
serversBlocklistSortValidator, serversBlocklistSortValidator,
unblockServerByAccountValidator unblockServerByAccountValidator
} from '../../../middlewares/validators' } from '../../../middlewares/validators'
import { AccountModel } from '../../../models/account/account'
import { AccountBlocklistModel } from '../../../models/account/account-blocklist' import { AccountBlocklistModel } from '../../../models/account/account-blocklist'
import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../../../lib/blocklist' import { addAccountInBlocklist, addServerInBlocklist, removeAccountFromBlocklist, removeServerFromBlocklist } from '../../../lib/blocklist'
import { ServerBlocklistModel } from '../../../models/server/server-blocklist' import { ServerBlocklistModel } from '../../../models/server/server-blocklist'
import { ServerModel } from '../../../models/server/server'
const myBlocklistRouter = express.Router() const myBlocklistRouter = express.Router()

View File

@ -2,7 +2,6 @@ import * as express from 'express'
import { UserWatchingVideo } from '../../../../shared' import { UserWatchingVideo } from '../../../../shared'
import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videoWatchingValidator } from '../../../middlewares' import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videoWatchingValidator } from '../../../middlewares'
import { UserVideoHistoryModel } from '../../../models/account/user-video-history' import { UserVideoHistoryModel } from '../../../models/account/user-video-history'
import { UserModel } from '../../../models/account/user'
const watchingRouter = express.Router() const watchingRouter = express.Router()

View File

@ -6,13 +6,12 @@ import * as flatten from 'flat'
import * as winston from 'winston' import * as winston from 'winston'
import { CONFIG } from '../initializers' import { CONFIG } from '../initializers'
import { jsonLoggerFormat, labelFormatter } from './logger' import { jsonLoggerFormat, labelFormatter } from './logger'
import { VideoDetails, User, VideoChannel, VideoAbuse, VideoImport } from '../../shared' import { User, VideoAbuse, VideoChannel, VideoDetails, VideoImport } from '../../shared'
import { VideoComment } from '../../shared/models/videos/video-comment.model' import { VideoComment } from '../../shared/models/videos/video-comment.model'
import { CustomConfig } from '../../shared/models/server/custom-config.model' import { CustomConfig } from '../../shared/models/server/custom-config.model'
import { UserModel } from '../models/account/user'
function getAuditIdFromRes (res: express.Response) { function getAuditIdFromRes (res: express.Response) {
return (res.locals.oauth.token.User as UserModel).username return res.locals.oauth.token.User.username
} }
enum AUDIT_TYPE { enum AUDIT_TYPE {

View File

@ -4,7 +4,10 @@ import { logger } from '../../helpers/logger'
import * as Bluebird from 'bluebird' import * as Bluebird from 'bluebird'
import { ActivityPubOrderedCollection } from '../../../shared/models/activitypub' import { ActivityPubOrderedCollection } from '../../../shared/models/activitypub'
async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => (Promise<any> | Bluebird<any>)) { type HandlerFunction<T> = (items: T[]) => (Promise<any> | Bluebird<any>)
type CleanerFunction = (startedDate: Date) => (Promise<any> | Bluebird<any>)
async function crawlCollectionPage <T> (uri: string, handler: HandlerFunction<T>, cleaner?: CleanerFunction) {
logger.info('Crawling ActivityPub data on %s.', uri) logger.info('Crawling ActivityPub data on %s.', uri)
const options = { const options = {
@ -15,6 +18,8 @@ async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => (P
timeout: JOB_REQUEST_TIMEOUT timeout: JOB_REQUEST_TIMEOUT
} }
const startDate = new Date()
const response = await doRequest<ActivityPubOrderedCollection<T>>(options) const response = await doRequest<ActivityPubOrderedCollection<T>>(options)
const firstBody = response.body const firstBody = response.body
@ -35,6 +40,8 @@ async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => (P
await handler(items) await handler(items)
} }
} }
if (cleaner) await cleaner(startDate)
} }
export { export {

View File

@ -54,12 +54,7 @@ async function addVideoShares (shareUrls: string[], instance: VideoModel) {
url: shareUrl url: shareUrl
} }
await VideoShareModel.findOrCreate({ await VideoShareModel.upsert(entry)
where: {
url: shareUrl
},
defaults: entry
})
} catch (err) { } catch (err) {
logger.warn('Cannot add share %s.', shareUrl, { err }) logger.warn('Cannot add share %s.', shareUrl, { err })
} }

View File

@ -34,8 +34,7 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto
accountId: actor.Account.id, accountId: actor.Account.id,
inReplyToCommentId, inReplyToCommentId,
originCommentId, originCommentId,
createdAt: new Date(comment.published), createdAt: new Date(comment.published)
updatedAt: new Date(comment.updated)
} }
} }
@ -74,12 +73,7 @@ async function addVideoComment (videoInstance: VideoModel, commentUrl: string) {
const entry = await videoCommentActivityObjectToDBAttributes(videoInstance, actor, body) const entry = await videoCommentActivityObjectToDBAttributes(videoInstance, actor, body)
if (!entry) return { created: false } if (!entry) return { created: false }
const [ comment, created ] = await VideoCommentModel.findOrCreate({ const [ comment, created ] = await VideoCommentModel.upsert<VideoCommentModel>(entry, { returning: true })
where: {
url: body.id
},
defaults: entry
})
comment.Account = actor.Account comment.Account = actor.Account
comment.Video = videoInstance comment.Video = videoInstance

View File

@ -38,19 +38,14 @@ async function createRates (ratesUrl: string[], video: VideoModel, rate: VideoRa
const actor = await getOrCreateActorAndServerAndModel(actorUrl) const actor = await getOrCreateActorAndServerAndModel(actorUrl)
const [ , created ] = await AccountVideoRateModel const entry = {
.findOrCreate({
where: {
videoId: video.id,
accountId: actor.Account.id
},
defaults: {
videoId: video.id, videoId: video.id,
accountId: actor.Account.id, accountId: actor.Account.id,
type: rate, type: rate,
url: body.id url: body.id
} }
})
const created = await AccountVideoRateModel.upsert(entry)
if (created) rateCounts += 1 if (created) rateCounts += 1
} catch (err) { } catch (err) {

View File

@ -40,6 +40,9 @@ import { Notifier } from '../notifier'
import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist' import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist'
import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type' import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
import { FilteredModelAttributes } from 'sequelize-typescript/lib/models/Model' import { FilteredModelAttributes } from 'sequelize-typescript/lib/models/Model'
import { AccountVideoRateModel } from '../../models/account/account-video-rate'
import { VideoShareModel } from '../../models/video/video-share'
import { VideoCommentModel } from '../../models/video/video-comment'
async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) { async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) {
// If the video is not private and published, we federate it // If the video is not private and published, we federate it
@ -134,31 +137,43 @@ async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: Vid
const jobPayloads: ActivitypubHttpFetcherPayload[] = [] const jobPayloads: ActivitypubHttpFetcherPayload[] = []
if (syncParam.likes === true) { if (syncParam.likes === true) {
await crawlCollectionPage<string>(fetchedVideo.likes, items => createRates(items, video, 'like')) const handler = items => createRates(items, video, 'like')
const cleaner = crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'like' as 'like', crawlStartDate)
await crawlCollectionPage<string>(fetchedVideo.likes, handler, cleaner)
.catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err })) .catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err }))
} else { } else {
jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' }) jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' })
} }
if (syncParam.dislikes === true) { if (syncParam.dislikes === true) {
await crawlCollectionPage<string>(fetchedVideo.dislikes, items => createRates(items, video, 'dislike')) const handler = items => createRates(items, video, 'dislike')
const cleaner = crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'dislike' as 'dislike', crawlStartDate)
await crawlCollectionPage<string>(fetchedVideo.dislikes, handler, cleaner)
.catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err })) .catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err }))
} else { } else {
jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' }) jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' })
} }
if (syncParam.shares === true) { if (syncParam.shares === true) {
await crawlCollectionPage<string>(fetchedVideo.shares, items => addVideoShares(items, video)) const handler = items => addVideoShares(items, video)
const cleaner = crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate)
await crawlCollectionPage<string>(fetchedVideo.shares, handler, cleaner)
.catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err })) .catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err }))
} else { } else {
jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' }) jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
} }
if (syncParam.comments === true) { if (syncParam.comments === true) {
await crawlCollectionPage<string>(fetchedVideo.comments, items => addVideoComments(items, video)) const handler = items => addVideoComments(items, video)
const cleaner = crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate)
await crawlCollectionPage<string>(fetchedVideo.comments, handler, cleaner)
.catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err })) .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err }))
} else { } else {
jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' }) jobPayloads.push({ uri: fetchedVideo.comments, videoId: video.id, type: 'video-comments' as 'video-comments' })
} }
await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })) await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }))

View File

@ -1,4 +1,5 @@
import * as Bull from 'bull' import * as Bull from 'bull'
import * as Bluebird from 'bluebird'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { processActivities } from '../../activitypub/process' import { processActivities } from '../../activitypub/process'
import { addVideoComments } from '../../activitypub/video-comments' import { addVideoComments } from '../../activitypub/video-comments'
@ -7,6 +8,9 @@ import { VideoModel } from '../../../models/video/video'
import { addVideoShares, createRates } from '../../activitypub' import { addVideoShares, createRates } from '../../activitypub'
import { createAccountPlaylists } from '../../activitypub/playlist' import { createAccountPlaylists } from '../../activitypub/playlist'
import { AccountModel } from '../../../models/account/account' import { AccountModel } from '../../../models/account/account'
import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
import { VideoShareModel } from '../../../models/video/video-share'
import { VideoCommentModel } from '../../../models/video/video-comment'
type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments' | 'account-playlists' type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments' | 'account-playlists'
@ -37,7 +41,14 @@ async function processActivityPubHttpFetcher (job: Bull.Job) {
'account-playlists': items => createAccountPlaylists(items, account) 'account-playlists': items => createAccountPlaylists(items, account)
} }
return crawlCollectionPage(payload.uri, fetcherType[payload.type]) const cleanerType: { [ id in FetchType ]?: (crawlStartDate: Date) => Bluebird<any> } = {
'video-likes': crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'like' as 'like', crawlStartDate),
'video-dislikes': crawlStartDate => AccountVideoRateModel.cleanOldRatesOf(video.id, 'dislike' as 'dislike', crawlStartDate),
'video-shares': crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate),
'video-comments': crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate)
}
return crawlCollectionPage(payload.uri, fetcherType[payload.type], cleanerType[payload.type])
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -1,5 +1,5 @@
import { values } from 'lodash' import { values } from 'lodash'
import { Transaction } from 'sequelize' import { Transaction, Op } from 'sequelize'
import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Table, UpdatedAt } from 'sequelize-typescript' import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Table, UpdatedAt } from 'sequelize-typescript'
import { IFindOptions } from 'sequelize-typescript/lib/interfaces/IFindOptions' import { IFindOptions } from 'sequelize-typescript/lib/interfaces/IFindOptions'
import { VideoRateType } from '../../../shared/models/videos' import { VideoRateType } from '../../../shared/models/videos'
@ -158,4 +158,31 @@ export class AccountVideoRateModel extends Model<AccountVideoRateModel> {
return AccountVideoRateModel.findAndCountAll(query) return AccountVideoRateModel.findAndCountAll(query)
} }
static cleanOldRatesOf (videoId: number, type: VideoRateType, beforeUpdatedAt: Date) {
return AccountVideoRateModel.sequelize.transaction(async t => {
const query = {
where: {
updatedAt: {
[Op.lt]: beforeUpdatedAt
},
videoId,
type
},
transaction: t
}
const deleted = await AccountVideoRateModel.destroy(query)
const options = {
transaction: t,
where: {
id: videoId
}
}
if (type === 'like') await VideoModel.increment({ likes: -deleted }, options)
else if (type === 'dislike') await VideoModel.increment({ dislikes: -deleted }, options)
})
}
} }

View File

@ -1,4 +1,5 @@
import * as Sequelize from 'sequelize' import * as Sequelize from 'sequelize'
import { Op } from 'sequelize'
import { import {
AllowNull, AllowNull,
BeforeDestroy, BeforeDestroy,
@ -453,6 +454,19 @@ export class VideoCommentModel extends Model<VideoCommentModel> {
} }
} }
static cleanOldCommentsOf (videoId: number, beforeUpdatedAt: Date) {
const query = {
where: {
updatedAt: {
[Op.lt]: beforeUpdatedAt
},
videoId
}
}
return VideoCommentModel.destroy(query)
}
getCommentStaticPath () { getCommentStaticPath () {
return this.Video.getWatchStaticPath() + ';threadId=' + this.getThreadId() return this.Video.getWatchStaticPath() + ';threadId=' + this.getThreadId()
} }

View File

@ -1,4 +1,5 @@
import * as Sequelize from 'sequelize' import * as Sequelize from 'sequelize'
import { Op } from 'sequelize'
import * as Bluebird from 'bluebird' import * as Bluebird from 'bluebird'
import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Scopes, Table, UpdatedAt } from 'sequelize-typescript' import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Scopes, Table, UpdatedAt } from 'sequelize-typescript'
import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc' import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc'
@ -200,4 +201,17 @@ export class VideoShareModel extends Model<VideoShareModel> {
return VideoShareModel.findAndCountAll(query) return VideoShareModel.findAndCountAll(query)
} }
static cleanOldSharesOf (videoId: number, beforeUpdatedAt: Date) {
const query = {
where: {
updatedAt: {
[Op.lt]: beforeUpdatedAt
},
videoId
}
}
return VideoShareModel.destroy(query)
}
} }

View File

@ -1547,7 +1547,7 @@ export class VideoModel extends Model<VideoModel> {
attributes: query.attributes, attributes: query.attributes,
order: [ // Keep original order order: [ // Keep original order
Sequelize.literal( Sequelize.literal(
ids.map(id => `"VideoModel".id = ${id}`).join(', ') ids.map(id => `"VideoModel".id = ${id} DESC`).join(', ')
) )
] ]
} }

View File

@ -8,7 +8,7 @@ import {
generateUserAccessToken, generateUserAccessToken,
getVideo, getVideo,
getVideoPlaylist, getVideoPlaylist,
killallServers, killallServers, rateVideo,
reRunServer, reRunServer,
ServerInfo, ServerInfo,
setAccessTokensToServers, setAccessTokensToServers,

View File

@ -579,15 +579,15 @@ describe('Test multiple servers', function () {
this.timeout(20000) this.timeout(20000)
await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like') await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')
await wait(200) await wait(500)
await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike') await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'dislike')
await wait(200) await wait(500)
await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like') await rateVideo(servers[0].url, servers[0].accessToken, remoteVideosServer1[0], 'like')
await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like') await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'like')
await wait(200) await wait(500)
await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike') await rateVideo(servers[2].url, servers[2].accessToken, localVideosServer3[1], 'dislike')
await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike') await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[1], 'dislike')
await wait(200) await wait(500)
await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like') await rateVideo(servers[2].url, servers[2].accessToken, remoteVideosServer3[0], 'like')
await waitJobs(servers) await waitJobs(servers)