diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index d9d7a6e42..fb3cc8f66 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -32,7 +32,7 @@ class JobQueue { private constructor () {} - init () { + async init () { // Already initialized if (this.initialized === true) return this.initialized = true @@ -54,6 +54,8 @@ class JobQueue { }) this.jobQueue.watchStuckJobs(5000) + await this.reactiveStuckJobs() + for (const handlerName of Object.keys(handlers)) { this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { try { @@ -117,6 +119,31 @@ class JobQueue { }) } + private reactiveStuckJobs () { + const promises: Promise[] = [] + + this.jobQueue.active((err, ids) => { + if (err) throw err + + for (const id of ids) { + kue.Job.get(id, (err, job) => { + if (err) throw err + + const p = new Promise((res, rej) => { + job.inactive(err => { + if (err) return rej(err) + return res() + }) + }) + + promises.push(p) + }) + } + }) + + return Promise.all(promises) + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts index 7f67525ed..f10ca856d 100644 --- a/server/tests/real-world/real-world.ts +++ b/server/tests/real-world/real-world.ts @@ -347,7 +347,7 @@ function goodbye () { } async function isTherePendingRequests (servers: ServerInfo[]) { - const states: JobState[] = [ 'inactive', 'active' ] + const states: JobState[] = [ 'inactive', 'active', 'delayed' ] const tasks: Promise[] = [] let pendingRequests = false