Avoid aborting completing jobs

pull/5817/head
Chocobozzz 2023-05-19 10:10:41 +02:00
parent 2617295569
commit 472170b4f9
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
4 changed files with 18 additions and 8 deletions

View File

@ -68,6 +68,14 @@ function transactionRetryer <T> (func: (err: any, data: T) => any) {
}) })
} }
function saveInTransactionWithRetries <T extends Pick<Model, 'save'>> (model: T) {
return retryTransactionWrapper(() => {
return sequelizeTypescript.transaction(async transaction => {
await model.save({ transaction })
})
})
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function resetSequelizeInstance <T> (instance: Model<T>) { function resetSequelizeInstance <T> (instance: Model<T>) {
@ -105,6 +113,7 @@ export {
resetSequelizeInstance, resetSequelizeInstance,
retryTransactionWrapper, retryTransactionWrapper,
transactionRetryer, transactionRetryer,
saveInTransactionWithRetries,
afterCommitIfTransaction, afterCommitIfTransaction,
filterNonExistingModels, filterNonExistingModels,
deleteAllModels, deleteAllModels,

View File

@ -577,6 +577,7 @@ const VIDEO_PLAYLIST_TYPES: { [ id in VideoPlaylistType ]: string } = {
const RUNNER_JOB_STATES: { [ id in RunnerJobState ]: string } = { const RUNNER_JOB_STATES: { [ id in RunnerJobState ]: string } = {
[RunnerJobState.PROCESSING]: 'Processing', [RunnerJobState.PROCESSING]: 'Processing',
[RunnerJobState.COMPLETED]: 'Completed', [RunnerJobState.COMPLETED]: 'Completed',
[RunnerJobState.COMPLETING]: 'Completing',
[RunnerJobState.PENDING]: 'Pending', [RunnerJobState.PENDING]: 'Pending',
[RunnerJobState.ERRORED]: 'Errored', [RunnerJobState.ERRORED]: 'Errored',
[RunnerJobState.WAITING_FOR_PARENT_JOB]: 'Waiting for parent job to finish', [RunnerJobState.WAITING_FOR_PARENT_JOB]: 'Waiting for parent job to finish',

View File

@ -1,5 +1,5 @@
import { throttle } from 'lodash' import { throttle } from 'lodash'
import { retryTransactionWrapper } from '@server/helpers/database-utils' import { retryTransactionWrapper, saveInTransactionWithRetries } from '@server/helpers/database-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger' import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { RUNNER_JOBS } from '@server/initializers/constants' import { RUNNER_JOBS } from '@server/initializers/constants'
import { sequelizeTypescript } from '@server/initializers/database' import { sequelizeTypescript } from '@server/initializers/database'
@ -12,10 +12,10 @@ import {
RunnerJobLiveRTMPHLSTranscodingPayload, RunnerJobLiveRTMPHLSTranscodingPayload,
RunnerJobLiveRTMPHLSTranscodingPrivatePayload, RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
RunnerJobState, RunnerJobState,
RunnerJobStudioTranscodingPayload,
RunnerJobSuccessPayload, RunnerJobSuccessPayload,
RunnerJobType, RunnerJobType,
RunnerJobUpdatePayload, RunnerJobUpdatePayload,
RunnerJobStudioTranscodingPayload,
RunnerJobVideoStudioTranscodingPrivatePayload, RunnerJobVideoStudioTranscodingPrivatePayload,
RunnerJobVODAudioMergeTranscodingPayload, RunnerJobVODAudioMergeTranscodingPayload,
RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODAudioMergeTranscodingPrivatePayload,
@ -139,6 +139,9 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
}) { }) {
const { runnerJob } = options const { runnerJob } = options
runnerJob.state = RunnerJobState.COMPLETING
await saveInTransactionWithRetries(runnerJob)
try { try {
await this.specificComplete(options) await this.specificComplete(options)
@ -153,11 +156,7 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
runnerJob.progress = null runnerJob.progress = null
runnerJob.finishedAt = new Date() runnerJob.finishedAt = new Date()
await retryTransactionWrapper(() => { await saveInTransactionWithRetries(runnerJob)
return sequelizeTypescript.transaction(async transaction => {
await runnerJob.save({ transaction })
})
})
const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)

View File

@ -6,5 +6,6 @@ export enum RunnerJobState {
WAITING_FOR_PARENT_JOB = 5, WAITING_FOR_PARENT_JOB = 5,
CANCELLED = 6, CANCELLED = 6,
PARENT_ERRORED = 7, PARENT_ERRORED = 7,
PARENT_CANCELLED = 8 PARENT_CANCELLED = 8,
COMPLETING = 9
} }