Migrate to bull

pull/803/head
Chocobozzz 2018-07-10 17:02:20 +02:00
parent 2cdf27bae6
commit 94831479f5
21 changed files with 221 additions and 163 deletions

View File

@ -9,7 +9,7 @@
</div> </div>
<p-table <p-table
[value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="id" [value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="uniqId"
[sortField]="sort.field" [sortOrder]="sort.order" (onLazyLoad)="loadLazy($event)" [sortField]="sort.field" [sortOrder]="sort.order" (onLazyLoad)="loadLazy($event)"
> >
<ng-template pTemplate="header"> <ng-template pTemplate="header">
@ -19,7 +19,8 @@
<th i18n style="width: 210px">Type</th> <th i18n style="width: 210px">Type</th>
<th i18n style="width: 130px">State</th> <th i18n style="width: 130px">State</th>
<th i18n style="width: 250px" pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th> <th i18n style="width: 250px" pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th>
<th i18n style="width: 250px">Updated</th> <th i18n style="width: 250px">Processed on</th>
<th i18n style="width: 250px">Finished on</th>
</tr> </tr>
</ng-template> </ng-template>
@ -34,18 +35,19 @@
<td>{{ job.type }}</td> <td>{{ job.type }}</td>
<td>{{ job.state }}</td> <td>{{ job.state }}</td>
<td>{{ job.createdAt }}</td> <td>{{ job.createdAt }}</td>
<td>{{ job.updatedAt }}</td> <td>{{ job.processedOn }}</td>
<td>{{ job.finishedOn }}</td>
</tr> </tr>
</ng-template> </ng-template>
<ng-template pTemplate="rowexpansion" let-job> <ng-template pTemplate="rowexpansion" let-job>
<tr> <tr>
<td colspan="6"> <td colspan="7">
<pre>{{ job.data }}</pre> <pre>{{ job.data }}</pre>
</td> </td>
</tr> </tr>
<tr class="job-error" *ngIf="job.error"> <tr class="job-error" *ngIf="job.error">
<td colspan="6"> <td colspan="7">
<pre>{{ job.error }}</pre> <pre>{{ job.error }}</pre>
</td> </td>
</tr> </tr>

View File

@ -17,8 +17,8 @@ import { I18n } from '@ngx-translate/i18n-polyfill'
export class JobsListComponent extends RestTable implements OnInit { export class JobsListComponent extends RestTable implements OnInit {
private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state' private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state'
jobState: JobState = 'inactive' jobState: JobState = 'waiting'
jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
jobs: Job[] = [] jobs: Job[] = []
totalRecords: number totalRecords: number
rowsPerPage = 10 rowsPerPage = 10

View File

@ -25,8 +25,11 @@ export class JobService {
return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params }) return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
.pipe( .pipe(
map(res => this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'updatedAt' ])), map(res => {
return this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'processedOn', 'finishedOn' ])
}),
map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData)), map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData)),
map(res => this.restExtractor.applyToResultListData(res, this.buildUniqId)),
catchError(err => this.restExtractor.handleError(err)) catchError(err => this.restExtractor.handleError(err))
) )
} }
@ -36,4 +39,8 @@ export class JobService {
return Object.assign(obj, { data }) return Object.assign(obj, { data })
} }
private buildUniqId (obj: Job) {
return Object.assign(obj, { uniqId: `${obj.id}-${obj.type}` })
}
} }

View File

@ -8,5 +8,5 @@ for i in $(seq 1 6); do
rm -f "./config/local-test.json" rm -f "./config/local-test.json"
rm -f "./config/local-test-$i.json" rm -f "./config/local-test-$i.json"
createdb -O peertube "peertube_test$i" createdb -O peertube "peertube_test$i"
redis-cli KEYS "q-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL redis-cli KEYS "bull-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL
done done

View File

@ -13,6 +13,7 @@ import {
} from '../../middlewares' } from '../../middlewares'
import { paginationValidator } from '../../middlewares/validators' import { paginationValidator } from '../../middlewares/validators'
import { listJobsValidator } from '../../middlewares/validators/jobs' import { listJobsValidator } from '../../middlewares/validators/jobs'
import { isArray } from '../../helpers/custom-validators/misc'
const jobsRouter = express.Router() const jobsRouter = express.Router()
@ -36,26 +37,30 @@ export {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) {
const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC' const state: JobState = req.params.state
const asc = req.query.sort === 'createdAt'
const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc)
const total = await JobQueue.Instance.count(req.params.state) const total = await JobQueue.Instance.count(state)
const result: ResultList<any> = { const result: ResultList<any> = {
total, total,
data: jobs.map(j => formatJob(j.toJSON())) data: jobs.map(j => formatJob(j, state))
} }
return res.json(result) return res.json(result)
} }
function formatJob (job: any): Job { function formatJob (job: any, state: JobState): Job {
const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null
return { return {
id: job.id, id: job.id,
state: job.state as JobState, state: state,
type: job.type as JobType, type: job.queue.name as JobType,
data: job.data, data: job.data,
error: job.error, error,
createdAt: new Date(parseInt(job.created_at, 10)), createdAt: new Date(job.timestamp),
updatedAt: new Date(parseInt(job.updated_at, 10)) finishedOn: new Date(job.finishedOn),
processedOn: new Date(job.processedOn)
} }
} }

View File

@ -1,7 +1,7 @@
import { JobState } from '../../../shared/models' import { JobState } from '../../../shared/models'
import { exists } from './misc' import { exists } from './misc'
const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
function isValidJobState (value: JobState) { function isValidJobState (value: JobState) {
return exists(value) && jobStates.indexOf(value) !== -1 return exists(value) && jobStates.indexOf(value) !== -1

View File

@ -14,7 +14,7 @@ let config: IConfig = require('config')
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const LAST_MIGRATION_VERSION = 225 const LAST_MIGRATION_VERSION = 230
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -0,0 +1,63 @@
import * as Sequelize from 'sequelize'
import { createClient } from 'redis'
import { CONFIG } from '../constants'
import { JobQueue } from '../../lib/job-queue'
import { initDatabaseModels } from '../database'
async function up (utils: {
transaction: Sequelize.Transaction
queryInterface: Sequelize.QueryInterface
sequelize: Sequelize.Sequelize
}): Promise<any> {
await initDatabaseModels(false)
return new Promise((res, rej) => {
const client = createClient({
host: CONFIG.REDIS.HOSTNAME,
port: CONFIG.REDIS.PORT,
db: CONFIG.REDIS.DB
})
const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST
client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => {
if (err) return rej(err)
const jobPromises = jobStrings
.map(s => s.split('|'))
.map(([ , jobId ]) => {
return new Promise((res, rej) => {
client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => {
if (err) return rej(err)
try {
const parsedData = JSON.parse(job.data)
return res({ type: job.type, payload: parsedData })
} catch (err) {
console.error('Cannot parse data %s.', job.data)
return res(null)
}
})
})
})
JobQueue.Instance.init()
.then(() => Promise.all(jobPromises))
.then((jobs: any) => {
const createJobPromises = jobs
.filter(job => job !== null)
.map(job => JobQueue.Instance.createJob(job))
return Promise.all(createJobPromises)
})
.then(() => res())
})
})
}
function down (options) {
throw new Error('Not implemented.')
}
export { up, down }

View File

@ -1,4 +1,4 @@
import * as kue from 'kue' import * as Bull from 'bull'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { getServerActor } from '../../../helpers/utils' import { getServerActor } from '../../../helpers/utils'
import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers'
@ -14,7 +14,7 @@ export type ActivitypubFollowPayload = {
host: string host: string
} }
async function processActivityPubFollow (job: kue.Job) { async function processActivityPubFollow (job: Bull.Job) {
const payload = job.data as ActivitypubFollowPayload const payload = job.data as ActivitypubFollowPayload
const host = payload.host const host = payload.host

View File

@ -1,4 +1,4 @@
import * as kue from 'kue' import * as Bull from 'bull'
import * as Bluebird from 'bluebird' import * as Bluebird from 'bluebird'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests' import { doRequest } from '../../../helpers/requests'
@ -12,7 +12,7 @@ export type ActivitypubHttpBroadcastPayload = {
body: any body: any
} }
async function processActivityPubHttpBroadcast (job: kue.Job) { async function processActivityPubHttpBroadcast (job: Bull.Job) {
logger.info('Processing ActivityPub broadcast in job %d.', job.id) logger.info('Processing ActivityPub broadcast in job %d.', job.id)
const payload = job.data as ActivitypubHttpBroadcastPayload const payload = job.data as ActivitypubHttpBroadcastPayload

View File

@ -1,4 +1,4 @@
import * as kue from 'kue' import * as Bull from 'bull'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { processActivities } from '../../activitypub/process' import { processActivities } from '../../activitypub/process'
import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
@ -9,7 +9,7 @@ export type ActivitypubHttpFetcherPayload = {
uris: string[] uris: string[]
} }
async function processActivityPubHttpFetcher (job: kue.Job) { async function processActivityPubHttpFetcher (job: Bull.Job) {
logger.info('Processing ActivityPub fetcher in job %d.', job.id) logger.info('Processing ActivityPub fetcher in job %d.', job.id)
const payload = job.data as ActivitypubHttpBroadcastPayload const payload = job.data as ActivitypubHttpBroadcastPayload

View File

@ -1,4 +1,4 @@
import * as kue from 'kue' import * as Bull from 'bull'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests' import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
@ -11,7 +11,7 @@ export type ActivitypubHttpUnicastPayload = {
body: any body: any
} }
async function processActivityPubHttpUnicast (job: kue.Job) { async function processActivityPubHttpUnicast (job: Bull.Job) {
logger.info('Processing ActivityPub unicast in job %d.', job.id) logger.info('Processing ActivityPub unicast in job %d.', job.id)
const payload = job.data as ActivitypubHttpUnicastPayload const payload = job.data as ActivitypubHttpUnicastPayload

View File

@ -1,4 +1,4 @@
import * as kue from 'kue' import * as Bull from 'bull'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { Emailer } from '../../emailer' import { Emailer } from '../../emailer'
@ -8,7 +8,7 @@ export type EmailPayload = {
text: string text: string
} }
async function processEmail (job: kue.Job) { async function processEmail (job: Bull.Job) {
const payload = job.data as EmailPayload const payload = job.data as EmailPayload
logger.info('Processing email in job %d.', job.id) logger.info('Processing email in job %d.', job.id)

View File

@ -1,4 +1,4 @@
import * as kue from 'kue' import * as Bull from 'bull'
import { VideoResolution, VideoState } from '../../../../shared' import { VideoResolution, VideoState } from '../../../../shared'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { computeResolutionsToTranscode } from '../../../helpers/utils' import { computeResolutionsToTranscode } from '../../../helpers/utils'
@ -7,6 +7,7 @@ import { JobQueue } from '../job-queue'
import { federateVideoIfNeeded } from '../../activitypub' import { federateVideoIfNeeded } from '../../activitypub'
import { retryTransactionWrapper } from '../../../helpers/database-utils' import { retryTransactionWrapper } from '../../../helpers/database-utils'
import { sequelizeTypescript } from '../../../initializers' import { sequelizeTypescript } from '../../../initializers'
import * as Bluebird from 'bluebird'
export type VideoFilePayload = { export type VideoFilePayload = {
videoUUID: string videoUUID: string
@ -20,7 +21,7 @@ export type VideoFileImportPayload = {
filePath: string filePath: string
} }
async function processVideoFileImport (job: kue.Job) { async function processVideoFileImport (job: Bull.Job) {
const payload = job.data as VideoFileImportPayload const payload = job.data as VideoFileImportPayload
logger.info('Processing video file import in job %d.', job.id) logger.info('Processing video file import in job %d.', job.id)
@ -37,7 +38,7 @@ async function processVideoFileImport (job: kue.Job) {
return video return video
} }
async function processVideoFile (job: kue.Job) { async function processVideoFile (job: Bull.Job) {
const payload = job.data as VideoFilePayload const payload = job.data as VideoFilePayload
logger.info('Processing video file in job %d.', job.id) logger.info('Processing video file in job %d.', job.id)
@ -109,7 +110,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
) )
if (resolutionsEnabled.length !== 0) { if (resolutionsEnabled.length !== 0) {
const tasks: Promise<any>[] = [] const tasks: Bluebird<any>[] = []
for (const resolution of resolutionsEnabled) { for (const resolution of resolutionsEnabled) {
const dataInput = { const dataInput = {

View File

@ -1,13 +1,12 @@
import * as kue from 'kue' import * as Bull from 'bull'
import { JobState, JobType } from '../../../shared/models' import { JobState, JobType } from '../../../shared/models'
import { logger } from '../../helpers/logger' import { logger } from '../../helpers/logger'
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
import { Redis } from '../redis'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { EmailPayload, processEmail } from './handlers/email' import { EmailPayload, processEmail } from './handlers/email'
import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
type CreateJobArgument = type CreateJobArgument =
@ -19,7 +18,7 @@ type CreateJobArgument =
{ type: 'video-file', payload: VideoFilePayload } | { type: 'video-file', payload: VideoFilePayload } |
{ type: 'email', payload: EmailPayload } { type: 'email', payload: EmailPayload }
const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-http-fetcher': processActivityPubHttpFetcher,
@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
'email': processEmail 'email': processEmail
} }
const jobsWithTLL: JobType[] = [ const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
'activitypub-http-broadcast': true,
'activitypub-http-unicast': true,
'activitypub-http-fetcher': true,
'activitypub-follow': true
}
const jobTypes: JobType[] = [
'activitypub-follow',
'activitypub-http-broadcast', 'activitypub-http-broadcast',
'activitypub-http-unicast',
'activitypub-http-fetcher', 'activitypub-http-fetcher',
'activitypub-follow' 'activitypub-http-unicast',
'email',
'video-file',
'video-file-import'
] ]
class JobQueue { class JobQueue {
private static instance: JobQueue private static instance: JobQueue
private jobQueue: kue.Queue private queues: { [ id in JobType ]?: Bull.Queue } = {}
private initialized = false private initialized = false
private jobRedisPrefix: string private jobRedisPrefix: string
@ -51,9 +60,8 @@ class JobQueue {
if (this.initialized === true) return if (this.initialized === true) return
this.initialized = true this.initialized = true
this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST
const queueOptions = {
this.jobQueue = kue.createQueue({
prefix: this.jobRedisPrefix, prefix: this.jobRedisPrefix,
redis: { redis: {
host: CONFIG.REDIS.HOSTNAME, host: CONFIG.REDIS.HOSTNAME,
@ -61,120 +69,94 @@ class JobQueue {
auth: CONFIG.REDIS.AUTH, auth: CONFIG.REDIS.AUTH,
db: CONFIG.REDIS.DB db: CONFIG.REDIS.DB
} }
}) }
this.jobQueue.setMaxListeners(20)
this.jobQueue.on('error', err => {
logger.error('Error in job queue.', { err })
process.exit(-1)
})
this.jobQueue.watchStuckJobs(5000)
await this.reactiveStuckJobs()
for (const handlerName of Object.keys(handlers)) { for (const handlerName of Object.keys(handlers)) {
this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { const queue = new Bull(handlerName, queueOptions)
try { const handler = handlers[handlerName]
const res = await handlers[ handlerName ](job)
return done(null, res) queue.process(JOB_CONCURRENCY[handlerName], handler)
} catch (err) { .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err }))
logger.error('Cannot execute job %d.', job.id, { err })
return done(err) queue.on('error', err => {
} logger.error('Error in job queue %s.', handlerName, { err })
process.exit(-1)
}) })
this.queues[handlerName] = queue
} }
} }
createJob (obj: CreateJobArgument, priority = 'normal') { createJob (obj: CreateJobArgument) {
return new Promise((res, rej) => { const queue = this.queues[obj.type]
let job = this.jobQueue if (queue === undefined) {
.create(obj.type, obj.payload) logger.error('Unknown queue %s: cannot create job.', obj.type)
.priority(priority) return
.attempts(JOB_ATTEMPTS[obj.type]) }
.backoff({ delay: 60 * 1000, type: 'exponential' })
if (jobsWithTLL.indexOf(obj.type) !== -1) { const jobArgs: Bull.JobOptions = {
job = job.ttl(JOB_REQUEST_TTL) backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[obj.type]
}
if (jobsWithRequestTimeout[obj.type] === true) {
jobArgs.timeout = JOB_REQUEST_TTL
}
return queue.add(obj.payload, jobArgs)
}
async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
let results: Bull.Job[] = []
// TODO: optimize
for (const jobType of jobTypes) {
const queue = this.queues[ jobType ]
if (queue === undefined) {
logger.error('Unknown queue %s to list jobs.', jobType)
continue
} }
return job.save(err => { // FIXME: Bull queue typings does not have getJobs method
if (err) return rej(err) const jobs = await (queue as any).getJobs(state, 0, start + count, asc)
results = results.concat(jobs)
}
return res() results.sort((j1: any, j2: any) => {
}) if (j1.timestamp < j2.timestamp) return -1
else if (j1.timestamp === j2.timestamp) return 0
return 1
}) })
if (asc === false) results.reverse()
return results.slice(start, start + count)
} }
async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> { async count (state: JobState): Promise<number> {
const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) let total = 0
const jobPromises = jobStrings for (const type of jobTypes) {
.map(s => s.split('|')) const queue = this.queues[ type ]
.map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) if (queue === undefined) {
logger.error('Unknown queue %s to count jobs.', type)
continue
}
return Promise.all(jobPromises) const counts = await queue.getJobCounts()
}
count (state: JobState) { total += counts[ state ]
return new Promise<number>((res, rej) => { }
this.jobQueue[state + 'Count']((err, total) => {
if (err) return rej(err)
return res(total) return total
})
})
} }
removeOldJobs () { removeOldJobs () {
const now = new Date().getTime() for (const key of Object.keys(this.queues)) {
kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { const queue = this.queues[key]
if (err) { queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
logger.error('Cannot get jobs when removing old jobs.', { err }) }
return
}
for (const job of jobs) {
if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
job.remove()
}
}
})
}
private reactiveStuckJobs () {
const promises: Promise<any>[] = []
this.jobQueue.active((err, ids) => {
if (err) throw err
for (const id of ids) {
kue.Job.get(id, (err, job) => {
if (err) throw err
const p = new Promise((res, rej) => {
job.inactive(err => {
if (err) return rej(err)
return res()
})
})
promises.push(p)
})
}
})
return Promise.all(promises)
}
private getJob (id: number) {
return new Promise<kue.Job>((res, rej) => {
kue.Job.get(id, (err, job) => {
if (err) return rej(err)
return res(job)
})
})
} }
static get Instance () { static get Instance () {

View File

@ -78,16 +78,6 @@ class Redis {
return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) return this.setObject(this.buildCachedRouteKey(req), cached, lifetime)
} }
listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) {
return new Promise<string[]>((res, rej) => {
this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => {
if (err) return rej(err)
return res(values)
})
})
}
generateResetPasswordKey (userId: number) { generateResetPasswordKey (userId: number) {
return 'reset-password-' + userId return 'reset-password-' + userId
} }

View File

@ -6,15 +6,21 @@ import { JobState } from '../../../../shared/models'
import { VideoPrivacy } from '../../../../shared/models/videos' import { VideoPrivacy } from '../../../../shared/models/videos'
import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model'
import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils'
import { import {
flushAndRunMultipleServers, flushTests, getVideosList, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, flushAndRunMultipleServers,
getVideosList,
killallServers,
ServerInfo,
setAccessTokensToServers,
uploadVideo,
wait wait
} from '../../utils/index' } from '../../utils/index'
import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows' import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows'
import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
import { import {
addVideoCommentReply, addVideoCommentThread, getVideoCommentThreads, addVideoCommentReply,
addVideoCommentThread,
getVideoCommentThreads,
getVideoThreadComments getVideoThreadComments
} from '../../utils/videos/video-comments' } from '../../utils/videos/video-comments'
@ -146,7 +152,7 @@ describe('Test handle downs', function () {
}) })
it('Should not have pending/processing jobs anymore', async function () { it('Should not have pending/processing jobs anymore', async function () {
const states: JobState[] = [ 'inactive', 'active' ] const states: JobState[] = [ 'waiting', 'active' ]
for (const state of states) { for (const state of states) {
const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')

View File

@ -2,7 +2,7 @@
import * as chai from 'chai' import * as chai from 'chai'
import 'mocha' import 'mocha'
import { flushTests, killallServers, ServerInfo, setAccessTokensToServers, wait } from '../../utils/index' import { killallServers, ServerInfo, setAccessTokensToServers } from '../../utils/index'
import { doubleFollow } from '../../utils/server/follows' import { doubleFollow } from '../../utils/server/follows'
import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
import { flushAndRunMultipleServers } from '../../utils/server/servers' import { flushAndRunMultipleServers } from '../../utils/server/servers'
@ -35,22 +35,23 @@ describe('Test jobs', function () {
}) })
it('Should list jobs', async function () { it('Should list jobs', async function () {
const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete') const res = await getJobsList(servers[1].url, servers[1].accessToken, 'completed')
expect(res.body.total).to.be.above(2) expect(res.body.total).to.be.above(2)
expect(res.body.data).to.have.length.above(2) expect(res.body.data).to.have.length.above(2)
}) })
it('Should list jobs with sort and pagination', async function () { it('Should list jobs with sort and pagination', async function () {
const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt') const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 1, 'createdAt')
expect(res.body.total).to.be.above(2) expect(res.body.total).to.be.above(2)
expect(res.body.data).to.have.lengthOf(1) expect(res.body.data).to.have.lengthOf(1)
const job = res.body.data[0] const job = res.body.data[0]
expect(job.state).to.equal('complete') expect(job.state).to.equal('completed')
expect(job.type).to.equal('activitypub-http-unicast') expect(job.type).to.equal('activitypub-http-unicast')
expect(dateIsValid(job.createdAt)).to.be.true expect(dateIsValid(job.createdAt)).to.be.true
expect(dateIsValid(job.updatedAt)).to.be.true expect(dateIsValid(job.processedOn)).to.be.true
expect(dateIsValid(job.finishedOn)).to.be.true
}) })
after(async function () { after(async function () {

View File

@ -347,7 +347,7 @@ function goodbye () {
} }
async function isTherePendingRequests (servers: ServerInfo[]) { async function isTherePendingRequests (servers: ServerInfo[]) {
const states: JobState[] = [ 'inactive', 'active', 'delayed' ] const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
const tasks: Promise<any>[] = [] const tasks: Promise<any>[] = []
let pendingRequests = false let pendingRequests = false

View File

@ -33,7 +33,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ] if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
else servers = serversArg as ServerInfo[] else servers = serversArg as ServerInfo[]
const states: JobState[] = [ 'inactive', 'active', 'delayed' ] const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
const tasks: Promise<any>[] = [] const tasks: Promise<any>[] = []
let pendingRequests: boolean let pendingRequests: boolean

View File

@ -1,4 +1,4 @@
export type JobState = 'active' | 'complete' | 'failed' | 'inactive' | 'delayed' export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
export type JobType = 'activitypub-http-unicast' | export type JobType = 'activitypub-http-unicast' |
'activitypub-http-broadcast' | 'activitypub-http-broadcast' |
@ -15,5 +15,6 @@ export interface Job {
data: any, data: any,
error: any, error: any,
createdAt: Date createdAt: Date
updatedAt: Date finishedOn: Date
processedOn: Date
} }