diff --git a/scripts/create-import-video-file-job.ts b/scripts/create-import-video-file-job.ts index cf974f240..9cb387d2e 100644 --- a/scripts/create-import-video-file-job.ts +++ b/scripts/create-import-video-file-job.ts @@ -44,7 +44,7 @@ async function run () { filePath: resolve(options.import) } - JobQueue.Instance.init(true) + JobQueue.Instance.init() await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput }) console.log('Import job for video %s created.', video.uuid) } diff --git a/scripts/create-move-video-storage-job.ts b/scripts/create-move-video-storage-job.ts index 0f0d4ee35..13ba3c0b7 100644 --- a/scripts/create-move-video-storage-job.ts +++ b/scripts/create-move-video-storage-job.ts @@ -37,7 +37,7 @@ run() async function run () { await initDatabaseModels(true) - JobQueue.Instance.init(true) + JobQueue.Instance.init() let ids: number[] = [] diff --git a/scripts/create-transcoding-job.ts b/scripts/create-transcoding-job.ts index aa97b0ba7..ffdf55ae4 100755 --- a/scripts/create-transcoding-job.ts +++ b/scripts/create-transcoding-job.ts @@ -97,7 +97,7 @@ async function run () { } } - JobQueue.Instance.init(true) + JobQueue.Instance.init() video.state = VideoState.TO_TRANSCODE await video.save() diff --git a/scripts/migrations/peertube-4.0.ts b/scripts/migrations/peertube-4.0.ts index 9e5ca60d4..b0891c2e6 100644 --- a/scripts/migrations/peertube-4.0.ts +++ b/scripts/migrations/peertube-4.0.ts @@ -21,7 +21,7 @@ async function run () { await initDatabaseModels(true) - JobQueue.Instance.init(true) + JobQueue.Instance.init() const ids = await VideoModel.listLocalIds() diff --git a/scripts/migrations/peertube-4.2.ts b/scripts/migrations/peertube-4.2.ts index 6a9007265..513c629ef 100644 --- a/scripts/migrations/peertube-4.2.ts +++ b/scripts/migrations/peertube-4.2.ts @@ -27,7 +27,7 @@ async function run () { console.log('Generate avatar miniatures from existing avatars.') await initDatabaseModels(true) - JobQueue.Instance.init(true) + JobQueue.Instance.init() const accounts: AccountModel[] = await AccountModel.findAll({ include: [ diff --git a/server.ts b/server.ts index 63a08f471..887814d4e 100644 --- a/server.ts +++ b/server.ts @@ -351,6 +351,12 @@ async function startApplication () { ApplicationModel.updateNodeVersions() .catch(err => logger.error('Cannot update node versions.', { err })) + JobQueue.Instance.start() + .catch(err => { + logger.error('Cannot start job queue.', { err }) + process.exit(-1) + }) + logger.info('HTTP server listening on %s:%d', hostname, port) logger.info('Web server: %s', WEBSERVER.URL) diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e54d12acd..655be6568 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -168,7 +168,7 @@ class JobQueue { private constructor () { } - init (produceOnly = false) { + init () { // Already initialized if (this.initialized === true) return this.initialized = true @@ -176,10 +176,10 @@ class JobQueue { this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST for (const handlerName of (Object.keys(handlers) as JobType[])) { - this.buildWorker(handlerName, produceOnly) + this.buildWorker(handlerName) this.buildQueue(handlerName) - this.buildQueueScheduler(handlerName, produceOnly) - this.buildQueueEvent(handlerName, produceOnly) + this.buildQueueScheduler(handlerName) + this.buildQueueEvent(handlerName) } this.flowProducer = new FlowProducer({ @@ -191,9 +191,9 @@ class JobQueue { this.addRepeatableJobs() } - private buildWorker (handlerName: JobType, produceOnly: boolean) { + private buildWorker (handlerName: JobType) { const workerOptions: WorkerOptions = { - autorun: !produceOnly, + autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, connection: this.getRedisConnection() @@ -246,9 +246,9 @@ class JobQueue { this.queues[handlerName] = queue } - private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { + private buildQueueScheduler (handlerName: JobType) { const queueSchedulerOptions: QueueSchedulerOptions = { - autorun: !produceOnly, + autorun: false, connection: this.getRedisConnection(), prefix: this.jobRedisPrefix, maxStalledCount: 10 @@ -260,9 +260,9 @@ class JobQueue { this.queueSchedulers[handlerName] = queueScheduler } - private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { + private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { - autorun: !produceOnly, + autorun: false, connection: this.getRedisConnection(), prefix: this.jobRedisPrefix } @@ -304,6 +304,23 @@ class JobQueue { return Promise.all(promises) } + start () { + const promises = Object.keys(this.workers) + .map(handlerName => { + const worker: Worker = this.workers[handlerName] + const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] + const queueEvent: QueueEvents = this.queueEvents[handlerName] + + return Promise.all([ + worker.run(), + queueScheduler.run(), + queueEvent.run() + ]) + }) + + return Promise.all(promises) + } + async pause () { for (const handlerName of Object.keys(this.workers)) { const worker: Worker = this.workers[handlerName]