mirror of https://github.com/Chocobozzz/PeerTube
prevent multiple post-process triggering of upload-resumable (#4175)
* prevent multiple post-process triggering of upload-resumable * switch from 409 to 503 for upload being processed * Improve resumable upload check Co-authored-by: Chocobozzz <me@florianbigard.com>pull/4507/head
parent
b2ad0090c1
commit
276250f0a3
|
@ -82,9 +82,10 @@ export class VideoUploadComponent extends VideoSend implements OnInit, OnDestroy
|
||||||
uploaderClass: UploaderXFormData,
|
uploaderClass: UploaderXFormData,
|
||||||
chunkSize,
|
chunkSize,
|
||||||
retryConfig: {
|
retryConfig: {
|
||||||
maxAttempts: 6,
|
maxAttempts: 30, // maximum attempts for 503 codes, otherwise set to 6, see below
|
||||||
shouldRetry: (code: number) => {
|
maxDelay: 120_000, // 2 min
|
||||||
return code < 400 || code >= 501
|
shouldRetry: (code: number, attempts: number) => {
|
||||||
|
return code === HttpStatusCode.SERVICE_UNAVAILABLE_503 || ((code < 400 || code > 500) && attempts < 6)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import { uuidToShort } from '@server/helpers/uuid'
|
||||||
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
|
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
|
||||||
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
|
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
|
||||||
import { generateWebTorrentVideoFilename } from '@server/lib/paths'
|
import { generateWebTorrentVideoFilename } from '@server/lib/paths'
|
||||||
|
import { Redis } from '@server/lib/redis'
|
||||||
import {
|
import {
|
||||||
addMoveToObjectStorageJob,
|
addMoveToObjectStorageJob,
|
||||||
addOptimizeOrMergeAudioJob,
|
addOptimizeOrMergeAudioJob,
|
||||||
|
@ -94,7 +95,7 @@ uploadRouter.delete('/upload-resumable',
|
||||||
uploadRouter.put('/upload-resumable',
|
uploadRouter.put('/upload-resumable',
|
||||||
openapiOperationDoc({ operationId: 'uploadResumable' }),
|
openapiOperationDoc({ operationId: 'uploadResumable' }),
|
||||||
authenticate,
|
authenticate,
|
||||||
uploadxMiddleware, // uploadx doesn't use call next() before the file upload completes
|
uploadxMiddleware, // uploadx doesn't next() before the file upload completes
|
||||||
asyncMiddleware(videosAddResumableValidator),
|
asyncMiddleware(videosAddResumableValidator),
|
||||||
asyncMiddleware(addVideoResumable)
|
asyncMiddleware(addVideoResumable)
|
||||||
)
|
)
|
||||||
|
@ -122,15 +123,20 @@ export async function addVideoLegacy (req: express.Request, res: express.Respons
|
||||||
const videoInfo: VideoCreate = req.body
|
const videoInfo: VideoCreate = req.body
|
||||||
const files = req.files
|
const files = req.files
|
||||||
|
|
||||||
return addVideo({ res, videoPhysicalFile, videoInfo, files })
|
const response = await addVideo({ res, videoPhysicalFile, videoInfo, files })
|
||||||
|
|
||||||
|
return res.json(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function addVideoResumable (_req: express.Request, res: express.Response) {
|
export async function addVideoResumable (req: express.Request, res: express.Response) {
|
||||||
const videoPhysicalFile = res.locals.videoFileResumable
|
const videoPhysicalFile = res.locals.videoFileResumable
|
||||||
const videoInfo = videoPhysicalFile.metadata
|
const videoInfo = videoPhysicalFile.metadata
|
||||||
const files = { previewfile: videoInfo.previewfile }
|
const files = { previewfile: videoInfo.previewfile }
|
||||||
|
|
||||||
return addVideo({ res, videoPhysicalFile, videoInfo, files })
|
const response = await addVideo({ res, videoPhysicalFile, videoInfo, files })
|
||||||
|
await Redis.Instance.setUploadSession(req.query.upload_id, response)
|
||||||
|
|
||||||
|
return res.json(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
async function addVideo (options: {
|
async function addVideo (options: {
|
||||||
|
@ -225,13 +231,13 @@ async function addVideo (options: {
|
||||||
|
|
||||||
Hooks.runAction('action:api.video.uploaded', { video: videoCreated })
|
Hooks.runAction('action:api.video.uploaded', { video: videoCreated })
|
||||||
|
|
||||||
return res.json({
|
return {
|
||||||
video: {
|
video: {
|
||||||
id: videoCreated.id,
|
id: videoCreated.id,
|
||||||
shortUUID: uuidToShort(videoCreated.uuid),
|
shortUUID: uuidToShort(videoCreated.uuid),
|
||||||
uuid: videoCreated.uuid
|
uuid: videoCreated.uuid
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
|
async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import { join } from 'path'
|
import { join } from 'path'
|
||||||
|
import { JobQueue } from '@server/lib/job-queue'
|
||||||
import { RESUMABLE_UPLOAD_DIRECTORY } from '../initializers/constants'
|
import { RESUMABLE_UPLOAD_DIRECTORY } from '../initializers/constants'
|
||||||
|
|
||||||
function getResumableUploadPath (filename?: string) {
|
function getResumableUploadPath (filename?: string) {
|
||||||
|
@ -7,8 +8,14 @@ function getResumableUploadPath (filename?: string) {
|
||||||
return RESUMABLE_UPLOAD_DIRECTORY
|
return RESUMABLE_UPLOAD_DIRECTORY
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function scheduleDeleteResumableUploadMetaFile (filepath: string) {
|
||||||
|
const payload = { filepath }
|
||||||
|
JobQueue.Instance.createJob({ type: 'delete-resumable-upload-meta-file', payload }, { delay: 900 * 1000 }) // executed in 15 min
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
export {
|
export {
|
||||||
getResumableUploadPath
|
getResumableUploadPath,
|
||||||
|
scheduleDeleteResumableUploadMetaFile
|
||||||
}
|
}
|
||||||
|
|
|
@ -665,6 +665,8 @@ const RESUMABLE_UPLOAD_DIRECTORY = join(CONFIG.STORAGE.TMP_DIR, 'resumable-uploa
|
||||||
const HLS_STREAMING_PLAYLIST_DIRECTORY = join(CONFIG.STORAGE.STREAMING_PLAYLISTS_DIR, 'hls')
|
const HLS_STREAMING_PLAYLIST_DIRECTORY = join(CONFIG.STORAGE.STREAMING_PLAYLISTS_DIR, 'hls')
|
||||||
const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls')
|
const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls')
|
||||||
|
|
||||||
|
const RESUMABLE_UPLOAD_SESSION_LIFETIME = SCHEDULER_INTERVALS_MS.REMOVE_DANGLING_RESUMABLE_UPLOADS
|
||||||
|
|
||||||
const VIDEO_LIVE = {
|
const VIDEO_LIVE = {
|
||||||
EXTENSION: '.ts',
|
EXTENSION: '.ts',
|
||||||
CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes
|
CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes
|
||||||
|
@ -838,6 +840,7 @@ export {
|
||||||
LAZY_STATIC_PATHS,
|
LAZY_STATIC_PATHS,
|
||||||
SEARCH_INDEX,
|
SEARCH_INDEX,
|
||||||
RESUMABLE_UPLOAD_DIRECTORY,
|
RESUMABLE_UPLOAD_DIRECTORY,
|
||||||
|
RESUMABLE_UPLOAD_SESSION_LIFETIME,
|
||||||
HLS_REDUNDANCY_DIRECTORY,
|
HLS_REDUNDANCY_DIRECTORY,
|
||||||
P2P_MEDIA_LOADER_PEER_VERSION,
|
P2P_MEDIA_LOADER_PEER_VERSION,
|
||||||
ACTOR_IMAGES_SIZE,
|
ACTOR_IMAGES_SIZE,
|
||||||
|
|
|
@ -8,6 +8,7 @@ import {
|
||||||
ActivitypubHttpFetcherPayload,
|
ActivitypubHttpFetcherPayload,
|
||||||
ActivitypubHttpUnicastPayload,
|
ActivitypubHttpUnicastPayload,
|
||||||
ActorKeysPayload,
|
ActorKeysPayload,
|
||||||
|
DeleteResumableUploadMetaFilePayload,
|
||||||
EmailPayload,
|
EmailPayload,
|
||||||
JobState,
|
JobState,
|
||||||
JobType,
|
JobType,
|
||||||
|
@ -52,6 +53,7 @@ type CreateJobArgument =
|
||||||
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } |
|
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } |
|
||||||
{ type: 'actor-keys', payload: ActorKeysPayload } |
|
{ type: 'actor-keys', payload: ActorKeysPayload } |
|
||||||
{ type: 'video-redundancy', payload: VideoRedundancyPayload } |
|
{ type: 'video-redundancy', payload: VideoRedundancyPayload } |
|
||||||
|
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
|
||||||
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
|
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
|
||||||
|
|
||||||
export type CreateJobOptions = {
|
export type CreateJobOptions = {
|
||||||
|
|
|
@ -9,7 +9,8 @@ import {
|
||||||
USER_PASSWORD_CREATE_LIFETIME,
|
USER_PASSWORD_CREATE_LIFETIME,
|
||||||
VIEW_LIFETIME,
|
VIEW_LIFETIME,
|
||||||
WEBSERVER,
|
WEBSERVER,
|
||||||
TRACKER_RATE_LIMITS
|
TRACKER_RATE_LIMITS,
|
||||||
|
RESUMABLE_UPLOAD_SESSION_LIFETIME
|
||||||
} from '../initializers/constants'
|
} from '../initializers/constants'
|
||||||
import { CONFIG } from '../initializers/config'
|
import { CONFIG } from '../initializers/config'
|
||||||
|
|
||||||
|
@ -202,6 +203,30 @@ class Redis {
|
||||||
])
|
])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ************ Resumable uploads final responses ************ */
|
||||||
|
|
||||||
|
setUploadSession (uploadId: string, response?: { video: { id: number, shortUUID: string, uuid: string } }) {
|
||||||
|
return this.setValue(
|
||||||
|
'resumable-upload-' + uploadId,
|
||||||
|
response
|
||||||
|
? JSON.stringify(response)
|
||||||
|
: '',
|
||||||
|
RESUMABLE_UPLOAD_SESSION_LIFETIME
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
doesUploadSessionExist (uploadId: string) {
|
||||||
|
return this.exists('resumable-upload-' + uploadId)
|
||||||
|
}
|
||||||
|
|
||||||
|
async getUploadSession (uploadId: string) {
|
||||||
|
const value = await this.getValue('resumable-upload-' + uploadId)
|
||||||
|
|
||||||
|
return value
|
||||||
|
? JSON.parse(value)
|
||||||
|
: ''
|
||||||
|
}
|
||||||
|
|
||||||
/* ************ Keys generation ************ */
|
/* ************ Keys generation ************ */
|
||||||
|
|
||||||
generateCachedRouteKey (req: express.Request) {
|
generateCachedRouteKey (req: express.Request) {
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
import express from 'express'
|
import express from 'express'
|
||||||
import { body, header, param, query, ValidationChain } from 'express-validator'
|
import { body, header, param, query, ValidationChain } from 'express-validator'
|
||||||
|
import { isTestInstance } from '@server/helpers/core-utils'
|
||||||
import { getResumableUploadPath } from '@server/helpers/upload'
|
import { getResumableUploadPath } from '@server/helpers/upload'
|
||||||
|
import { Redis } from '@server/lib/redis'
|
||||||
import { isAbleToUploadVideo } from '@server/lib/user'
|
import { isAbleToUploadVideo } from '@server/lib/user'
|
||||||
import { getServerActor } from '@server/models/application/application'
|
import { getServerActor } from '@server/models/application/application'
|
||||||
import { ExpressPromiseHandler } from '@server/types/express'
|
import { ExpressPromiseHandler } from '@server/types/express'
|
||||||
|
@ -105,12 +107,34 @@ const videosAddLegacyValidator = getCommonVideoEditAttributes().concat([
|
||||||
const videosAddResumableValidator = [
|
const videosAddResumableValidator = [
|
||||||
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
|
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
|
||||||
const user = res.locals.oauth.token.User
|
const user = res.locals.oauth.token.User
|
||||||
|
|
||||||
const body: express.CustomUploadXFile<express.UploadXFileMetadata> = req.body
|
const body: express.CustomUploadXFile<express.UploadXFileMetadata> = req.body
|
||||||
const file = { ...body, duration: undefined, path: getResumableUploadPath(body.id), filename: body.metadata.filename }
|
const file = { ...body, duration: undefined, path: getResumableUploadPath(body.id), filename: body.metadata.filename }
|
||||||
|
|
||||||
const cleanup = () => deleteFileAndCatch(file.path)
|
const cleanup = () => deleteFileAndCatch(file.path)
|
||||||
|
|
||||||
|
const uploadId = req.query.upload_id
|
||||||
|
const sessionExists = await Redis.Instance.doesUploadSessionExist(uploadId)
|
||||||
|
|
||||||
|
if (sessionExists) {
|
||||||
|
const sessionResponse = await Redis.Instance.getUploadSession(uploadId)
|
||||||
|
|
||||||
|
if (!sessionResponse) {
|
||||||
|
res.setHeader('Retry-After', 300) // ask to retry after 5 min, knowing the upload_id is kept for up to 15 min after completion
|
||||||
|
|
||||||
|
return res.fail({
|
||||||
|
status: HttpStatusCode.SERVICE_UNAVAILABLE_503,
|
||||||
|
message: 'The upload is already being processed'
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTestInstance()) {
|
||||||
|
res.setHeader('x-resumable-upload-cached', 'true')
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.json(sessionResponse)
|
||||||
|
}
|
||||||
|
|
||||||
|
await Redis.Instance.setUploadSession(uploadId)
|
||||||
|
|
||||||
if (!await doesVideoChannelOfAccountExist(file.metadata.channelId, user, res)) return cleanup()
|
if (!await doesVideoChannelOfAccountExist(file.metadata.channelId, user, res)) return cleanup()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -180,6 +180,21 @@ describe('Test resumable upload', function () {
|
||||||
await sendChunks({ pathUploadId: uploadId, expectedStatus, contentRangeBuilder, contentLength: size })
|
await sendChunks({ pathUploadId: uploadId, expectedStatus, contentRangeBuilder, contentLength: size })
|
||||||
await checkFileSize(uploadId, 0)
|
await checkFileSize(uploadId, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('Should be able to accept 2 PUT requests', async function () {
|
||||||
|
const uploadId = await prepareUpload()
|
||||||
|
|
||||||
|
const result1 = await sendChunks({ pathUploadId: uploadId })
|
||||||
|
const result2 = await sendChunks({ pathUploadId: uploadId })
|
||||||
|
|
||||||
|
expect(result1.body.video.uuid).to.exist
|
||||||
|
expect(result1.body.video.uuid).to.equal(result2.body.video.uuid)
|
||||||
|
|
||||||
|
expect(result1.headers['x-resumable-upload-cached']).to.not.exist
|
||||||
|
expect(result2.headers['x-resumable-upload-cached']).to.equal('true')
|
||||||
|
|
||||||
|
await checkFileSize(uploadId, null)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
after(async function () {
|
after(async function () {
|
||||||
|
|
|
@ -138,6 +138,10 @@ export interface ActorKeysPayload {
|
||||||
actorId: number
|
actorId: number
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface DeleteResumableUploadMetaFilePayload {
|
||||||
|
filepath: string
|
||||||
|
}
|
||||||
|
|
||||||
export interface MoveObjectStoragePayload {
|
export interface MoveObjectStoragePayload {
|
||||||
videoUUID: string
|
videoUUID: string
|
||||||
isNewVideo: boolean
|
isNewVideo: boolean
|
||||||
|
|
|
@ -2081,6 +2081,13 @@ paths:
|
||||||
description: video unreadable
|
description: video unreadable
|
||||||
'429':
|
'429':
|
||||||
description: too many concurrent requests
|
description: too many concurrent requests
|
||||||
|
'503':
|
||||||
|
description: upload is already being processed
|
||||||
|
headers:
|
||||||
|
'Retry-After':
|
||||||
|
schema:
|
||||||
|
type: number
|
||||||
|
example: 300
|
||||||
delete:
|
delete:
|
||||||
summary: Cancel the resumable upload of a video, deleting any data uploaded so far
|
summary: Cancel the resumable upload of a video, deleting any data uploaded so far
|
||||||
description: Uses [a resumable protocol](https://github.com/kukhariev/node-uploadx/blob/master/proto.md) to cancel the upload of a video
|
description: Uses [a resumable protocol](https://github.com/kukhariev/node-uploadx/blob/master/proto.md) to cancel the upload of a video
|
||||||
|
|
Loading…
Reference in New Issue