import * as kue from 'kue' import { JobState, JobType } from '../../../shared/models' import { logger } from '../../helpers/logger' import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' import { Redis } from '../redis' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { EmailPayload, processEmail } from './handlers/email' import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-file', payload: VideoFilePayload } | { type: 'email', payload: EmailPayload } const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-follow': processActivityPubFollow, 'video-file-import': processVideoFileImport, 'video-file': processVideoFile, 'email': processEmail } const jobsWithTLL: JobType[] = [ 'activitypub-http-broadcast', 'activitypub-http-unicast', 'activitypub-http-fetcher', 'activitypub-follow' ] class JobQueue { private static instance: JobQueue private jobQueue: kue.Queue private initialized = false private jobRedisPrefix: string private constructor () {} async init () { // Already initialized if (this.initialized === true) return this.initialized = true this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST this.jobQueue = kue.createQueue({ prefix: this.jobRedisPrefix, redis: { host: CONFIG.REDIS.HOSTNAME, port: CONFIG.REDIS.PORT, auth: CONFIG.REDIS.AUTH, db: CONFIG.REDIS.DB } }) this.jobQueue.setMaxListeners(20) this.jobQueue.on('error', err => { logger.error('Error in job queue.', { err }) process.exit(-1) }) this.jobQueue.watchStuckJobs(5000) await this.reactiveStuckJobs() for (const handlerName of Object.keys(handlers)) { this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { try { const res = await handlers[ handlerName ](job) return done(null, res) } catch (err) { logger.error('Cannot execute job %d.', job.id, { err }) return done(err) } }) } } createJob (obj: CreateJobArgument, priority = 'normal') { return new Promise((res, rej) => { let job = this.jobQueue .create(obj.type, obj.payload) .priority(priority) .attempts(JOB_ATTEMPTS[obj.type]) .backoff({ delay: 60 * 1000, type: 'exponential' }) if (jobsWithTLL.indexOf(obj.type) !== -1) { job = job.ttl(JOB_REQUEST_TTL) } return job.save(err => { if (err) return rej(err) return res() }) }) } async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise { const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) const jobPromises = jobStrings .map(s => s.split('|')) .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) return Promise.all(jobPromises) } count (state: JobState) { return new Promise((res, rej) => { this.jobQueue[state + 'Count']((err, total) => { if (err) return rej(err) return res(total) }) }) } removeOldJobs () { const now = new Date().getTime() kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { if (err) { logger.error('Cannot get jobs when removing old jobs.', { err }) return } for (const job of jobs) { if (now - job.created_at > JOB_COMPLETED_LIFETIME) { job.remove() } } }) } private reactiveStuckJobs () { const promises: Promise[] = [] this.jobQueue.active((err, ids) => { if (err) throw err for (const id of ids) { kue.Job.get(id, (err, job) => { if (err) throw err const p = new Promise((res, rej) => { job.inactive(err => { if (err) return rej(err) return res() }) }) promises.push(p) }) } }) return Promise.all(promises) } private getJob (id: number) { return new Promise((res, rej) => { kue.Job.get(id, (err, job) => { if (err) return rej(err) return res(job) }) }) } static get Instance () { return this.instance || (this.instance = new this()) } } // --------------------------------------------------------------------------- export { JobQueue }