Add timeout and TTL to request jobs

pull/559/head
Chocobozzz 2018-05-09 09:08:22 +02:00
parent 7797350a0e
commit 71e3dfda4e
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
5 changed files with 33 additions and 14 deletions

View File

@ -78,9 +78,10 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
'video-file': 1,
'email': 5
}
const BROADCAST_CONCURRENCY = 5 // How many requests in parallel we do in activitypub-http-broadcast job
// 2 days
const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2
const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job
const JOB_REQUEST_TIMEOUT = 3000 // 3 seconds
const JOB_REQUEST_TTL = 60000 * 10 // 10 minutes
const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days
// 1 hour
let SCHEDULER_INTERVAL = 60000 * 60
@ -466,6 +467,8 @@ export {
VIDEO_RATE_TYPES,
VIDEO_MIMETYPE_EXT,
VIDEO_TRANSCODING_FPS,
JOB_REQUEST_TIMEOUT,
JOB_REQUEST_TTL,
USER_PASSWORD_RESET_LIFETIME,
IMAGE_MIMETYPE_EXT,
SCHEDULER_INTERVAL,

View File

@ -4,7 +4,7 @@ import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
import { BROADCAST_CONCURRENCY } from '../../../initializers'
import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
export type ActivitypubHttpBroadcastPayload = {
uris: string[]
@ -24,7 +24,8 @@ async function processActivityPubHttpBroadcast (job: kue.Job) {
method: 'POST',
uri: '',
json: body,
httpSignature: httpSignatureOptions
httpSignature: httpSignatureOptions,
timeout: JOB_REQUEST_TIMEOUT
}
const badUrls: string[] = []

View File

@ -1,7 +1,7 @@
import * as kue from 'kue'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ACTIVITY_PUB } from '../../../initializers'
import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers'
import { processActivities } from '../../activitypub/process'
import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
@ -18,7 +18,8 @@ async function processActivityPubHttpFetcher (job: kue.Job) {
method: 'GET',
uri: '',
json: true,
activityPub: true
activityPub: true,
timeout: JOB_REQUEST_TIMEOUT
}
for (const uri of payload.uris) {

View File

@ -3,6 +3,7 @@ import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
export type ActivitypubHttpUnicastPayload = {
uri: string
@ -23,7 +24,8 @@ async function processActivityPubHttpUnicast (job: kue.Job) {
method: 'POST',
uri,
json: body,
httpSignature: httpSignatureOptions
httpSignature: httpSignatureOptions,
timeout: JOB_REQUEST_TIMEOUT
}
try {

View File

@ -1,7 +1,7 @@
import * as kue from 'kue'
import { JobState, JobType } from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
import { Redis } from '../redis'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
@ -27,6 +27,13 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
'email': processEmail
}
const jobsWithTLL: JobType[] = [
'activitypub-http-broadcast',
'activitypub-http-unicast',
'activitypub-http-fetcher',
'activitypub-follow'
]
class JobQueue {
private static instance: JobQueue
@ -77,16 +84,21 @@ class JobQueue {
createJob (obj: CreateJobArgument, priority = 'normal') {
return new Promise((res, rej) => {
this.jobQueue
let job = this.jobQueue
.create(obj.type, obj.payload)
.priority(priority)
.attempts(JOB_ATTEMPTS[obj.type])
.backoff({ delay: 60 * 1000, type: 'exponential' })
.save(err => {
if (err) return rej(err)
return res()
})
if (jobsWithTLL.indexOf(obj.type) !== -1) {
job = job.ttl(JOB_REQUEST_TTL)
}
return job.save(err => {
if (err) return rej(err)
return res()
})
})
}