Add job queue hooks

pull/5170/head
Chocobozzz 2022-08-02 15:29:00 +02:00
parent 7a9e420a02
commit 22df69fdec
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
12 changed files with 151 additions and 26 deletions

View File

@ -112,8 +112,14 @@ class PluginsManager {
for (const hook of this.hooks[hookName]) { for (const hook of this.hooks[hookName]) {
logger.info(`Running hook ${hookName} of plugin ${hook.plugin.name}`) logger.info(`Running hook ${hookName} of plugin ${hook.plugin.name}`)
result = await internalRunHook(hook.handler, hookType, result, params, err => { result = await internalRunHook({
handler: hook.handler,
hookType,
result,
params,
onError: err => {
logger.error(`Cannot run hook ${hookName} of script ${hook.clientScript.script} of plugin ${hook.plugin.name}`, err) logger.error(`Cannot run hook ${hookName} of script ${hook.clientScript.script} of plugin ${hook.plugin.name}`, err)
}
}) })
} }

View File

@ -24,6 +24,7 @@ import {
} from '../../../shared/models' } from '../../../shared/models'
import { logger } from '../../helpers/logger' import { logger } from '../../helpers/logger'
import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
import { Hooks } from '../plugins/hooks'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow' import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
@ -157,8 +158,11 @@ class JobQueue {
const handler = handlers[handlerName] const handler = handlers[handlerName]
queue.process(this.getJobConcurrency(handlerName), handler) queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => {
.catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
}).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
queue.on('failed', (job, err) => { queue.on('failed', (job, err) => {
const logLevel = silentFailure.has(handlerName) const logLevel = silentFailure.has(handlerName)

View File

@ -8,8 +8,8 @@ type RawFunction <U, T> = (params: U) => T
// Helpers to run hooks // Helpers to run hooks
const Hooks = { const Hooks = {
wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U) => { wrapObject: <T, U extends ServerFilterHookName>(result: T, hookName: U, context?: any) => {
return PluginManager.Instance.runHook(hookName, result) return PluginManager.Instance.runHook(hookName, result, context)
}, },
wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => { wrapPromiseFun: async <U, T, V extends ServerFilterHookName>(fun: PromiseFunction<U, T>, params: U, hookName: V) => {

View File

@ -220,6 +220,10 @@ function buildPluginRelatedHelpers (plugin: MPlugin, npmName: string) {
function buildUserHelpers () { function buildUserHelpers () {
return { return {
loadById: (id: number) => {
return UserModel.loadByIdFull(id)
},
getAuthUser: (res: express.Response) => { getAuthUser: (res: express.Response) => {
const user = res.locals.oauth?.token?.User const user = res.locals.oauth?.token?.User
if (!user) return undefined if (!user) return undefined

View File

@ -215,8 +215,12 @@ export class PluginManager implements ServerHook {
for (const hook of this.hooks[hookName]) { for (const hook of this.hooks[hookName]) {
logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName) logger.debug('Running hook %s of plugin %s.', hookName, hook.npmName)
result = await internalRunHook(hook.handler, hookType, result, params, err => { result = await internalRunHook({
logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) handler: hook.handler,
hookType,
result,
params,
onError: err => { logger.error('Cannot run hook %s of plugin %s.', hookName, hook.pluginName, { err }) }
}) })
} }

View File

@ -88,6 +88,15 @@ async function register ({
return res.json({ routerRoute }) return res.json({ routerRoute })
}) })
router.get('/user/:id', async (req, res) => {
const user = await peertubeHelpers.user.loadById(req.params.id)
if (!user) return res.status(404).end()
return res.json({
username: user.username
})
})
router.get('/user', async (req, res) => { router.get('/user', async (req, res) => {
const user = await peertubeHelpers.user.getAuthUser(res) const user = await peertubeHelpers.user.getAuthUser(res)
if (!user) return res.sendStatus(404) if (!user) return res.sendStatus(404)
@ -97,6 +106,7 @@ async function register ({
const isUser = user.role === 2 const isUser = user.role === 2
return res.json({ return res.json({
id: user.id,
username: user.username, username: user.username,
displayName: user.Account.name, displayName: user.Account.name,
isAdmin, isAdmin,

View File

@ -253,6 +253,27 @@ async function register ({ registerHook, registerSetting, settingsManager, stora
} }
}) })
registerHook({
target: 'filter:job-queue.process.params',
handler: (object, context) => {
peertubeHelpers.logger.debug('TOTO.', { object, context })
if (context.type !== 'video-studio-edition') return object
object.data.tasks = [
{
name: 'cut',
options: {
start: 0,
end: 1
}
}
]
return object
}
})
// Upload/import/live attributes // Upload/import/live attributes
for (const target of [ for (const target of [
'filter:api.video.upload.video-attribute.result', 'filter:api.video.upload.video-attribute.result',
@ -284,7 +305,10 @@ async function register ({ registerHook, registerSetting, settingsManager, stora
'filter:api.search.video-playlists.index.list.result', 'filter:api.search.video-playlists.index.list.result',
'filter:api.overviews.videos.list.params', 'filter:api.overviews.videos.list.params',
'filter:api.overviews.videos.list.result' 'filter:api.overviews.videos.list.result',
'filter:job-queue.process.params',
'filter:job-queue.process.result'
] ]
for (const h of filterHooks) { for (const h of filterHooks) {

View File

@ -632,6 +632,51 @@ describe('Test plugin filter hooks', function () {
}) })
describe('Job queue filters', function () {
let videoUUID: string
before(async function () {
this.timeout(120_000)
const { uuid } = await servers[0].videos.quickUpload({ name: 'studio' })
const video = await servers[0].videos.get({ id: uuid })
expect(video.duration).at.least(2)
videoUUID = video.uuid
await waitJobs(servers)
await servers[0].config.enableStudio()
})
it('Should run filter:job-queue.process.params', async function () {
this.timeout(120_000)
await servers[0].videoStudio.createEditionTasks({
videoId: videoUUID,
tasks: [
{
name: 'add-intro',
options: {
file: 'video_very_short_240p.mp4'
}
}
]
})
await waitJobs(servers)
await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.params', 1, false)
const video = await servers[0].videos.get({ id: videoUUID })
expect(video.duration).at.most(2)
})
it('Should run filter:job-queue.process.result', async function () {
await servers[0].servers.waitUntilLog('Run hook filter:job-queue.process.result', 1, false)
})
})
after(async function () { after(async function () {
await cleanupTests(servers) await cleanupTests(servers)
}) })

View File

@ -110,6 +110,7 @@ describe('Test plugin helpers', function () {
}) })
describe('User', function () { describe('User', function () {
let rootId: number
it('Should not get a user if not authenticated', async function () { it('Should not get a user if not authenticated', async function () {
await makeGetRequest({ await makeGetRequest({
@ -132,6 +133,28 @@ describe('Test plugin helpers', function () {
expect(res.body.isAdmin).to.be.true expect(res.body.isAdmin).to.be.true
expect(res.body.isModerator).to.be.false expect(res.body.isModerator).to.be.false
expect(res.body.isUser).to.be.false expect(res.body.isUser).to.be.false
rootId = res.body.id
})
it('Should load a user by id', async function () {
{
const res = await makeGetRequest({
url: servers[0].url,
path: '/plugins/test-four/router/user/' + rootId,
expectedStatus: HttpStatusCode.OK_200
})
expect(res.body.username).to.equal('root')
}
{
await makeGetRequest({
url: servers[0].url,
path: '/plugins/test-four/router/user/42',
expectedStatus: HttpStatusCode.NOT_FOUND_404
})
}
}) })
}) })

View File

@ -14,10 +14,9 @@ import {
RegisterServerSettingOptions, RegisterServerSettingOptions,
ServerConfig, ServerConfig,
ThumbnailType, ThumbnailType,
UserRole,
VideoBlacklistCreate VideoBlacklistCreate
} from '@shared/models' } from '@shared/models'
import { MVideoThumbnail } from '../models' import { MUserDefault, MVideoThumbnail } from '../models'
import { import {
RegisterServerAuthExternalOptions, RegisterServerAuthExternalOptions,
RegisterServerAuthExternalResult, RegisterServerAuthExternalResult,
@ -100,16 +99,10 @@ export type PeerTubeHelpers = {
user: { user: {
// PeerTube >= 3.2 // PeerTube >= 3.2
getAuthUser: (response: Response) => Promise<{ getAuthUser: (response: Response) => Promise<MUserDefault>
id?: string
username: string // PeerTube >= 4.3
email: string loadById: (id: number) => Promise<MUserDefault>
blocked: boolean
role: UserRole
Account: {
name: string
}
} | undefined>
} }
} }

View File

@ -8,15 +8,24 @@ function getHookType (hookName: string) {
return HookType.STATIC return HookType.STATIC
} }
async function internalRunHook <T> (handler: Function, hookType: HookType, result: T, params: any, onError: (err: Error) => void) { async function internalRunHook <T> (options: {
handler: Function
hookType: HookType
result: T
params: any
onError: (err: Error) => void
}) {
const { handler, hookType, result, params, onError } = options
try { try {
if (hookType === HookType.FILTER) { if (hookType === HookType.FILTER) {
const p = handler(result, params) const p = handler(result, params)
if (isPromise(p)) result = await p const newResult = isPromise(p)
else result = p ? await p
: p
return result return newResult
} }
// Action/static hooks do not have result value // Action/static hooks do not have result value

View File

@ -90,7 +90,10 @@ export const serverFilterHookObject = {
// Filter result to check if the embed is allowed for a particular request // Filter result to check if the embed is allowed for a particular request
'filter:html.embed.video.allowed.result': true, 'filter:html.embed.video.allowed.result': true,
'filter:html.embed.video-playlist.allowed.result': true 'filter:html.embed.video-playlist.allowed.result': true,
'filter:job-queue.process.params': true,
'filter:job-queue.process.result': true
} }
export type ServerFilterHookName = keyof typeof serverFilterHookObject export type ServerFilterHookName = keyof typeof serverFilterHookObject