fix: validate s3 response (#5231)

* refactor:  remove `objectStoragePut`

this is already implemented in `lib-storage`

* fix:  validate s3 response

* fix:  enable built-in retries

* chore: add `leavePartsOnError` comment

* refactor: decrease partSize to speed up retries

* refactor: rethrow s3 errors

* refactor: reduce max_upload_part  default to 100MB

* refactor: validate response

* chore: add link to explanation
pull/5250/head
q_h 2022-09-08 09:54:12 +03:00 committed by GitHub
parent e9fc9e03c1
commit 23c0b67d7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 37 deletions

View File

@ -152,7 +152,7 @@ object_storage:
secret_access_key: '' secret_access_key: ''
# Maximum amount to upload in one request to object storage # Maximum amount to upload in one request to object storage
max_upload_part: 2GB max_upload_part: 100MB
streaming_playlists: streaming_playlists:
bucket_name: 'streaming-playlists' bucket_name: 'streaming-playlists'

View File

@ -150,7 +150,7 @@ object_storage:
secret_access_key: '' secret_access_key: ''
# Maximum amount to upload in one request to object storage # Maximum amount to upload in one request to object storage
max_upload_part: 2GB max_upload_part: 100MB
streaming_playlists: streaming_playlists:
bucket_name: 'streaming-playlists' bucket_name: 'streaming-playlists'

View File

@ -1,11 +1,11 @@
import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra' import { createReadStream, createWriteStream, ensureDir, ReadStream } from 'fs-extra'
import { dirname } from 'path' import { dirname } from 'path'
import { Readable } from 'stream' import { Readable } from 'stream'
import { import {
CompleteMultipartUploadCommandOutput,
DeleteObjectCommand, DeleteObjectCommand,
GetObjectCommand, GetObjectCommand,
ListObjectsV2Command, ListObjectsV2Command,
PutObjectCommand,
PutObjectCommandInput PutObjectCommandInput
} from '@aws-sdk/client-s3' } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage' import { Upload } from '@aws-sdk/lib-storage'
@ -31,14 +31,9 @@ async function storeObject (options: {
logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
const stats = await stat(inputPath)
const fileStream = createReadStream(inputPath) const fileStream = createReadStream(inputPath)
if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
}
return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
} }
async function removeObject (filename: string, bucketInfo: BucketInfo) { async function removeObject (filename: string, bucketInfo: BucketInfo) {
@ -132,31 +127,7 @@ export {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
async function objectStoragePut (options: { async function uploadToStorage (options: {
objectStorageKey: string
content: ReadStream
bucketInfo: BucketInfo
}) {
const { objectStorageKey, content, bucketInfo } = options
const input: PutObjectCommandInput = {
Bucket: bucketInfo.BUCKET_NAME,
Key: buildKey(objectStorageKey, bucketInfo),
Body: content
}
if (CONFIG.OBJECT_STORAGE.UPLOAD_ACL) {
input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
}
const command = new PutObjectCommand(input)
await getClient().send(command)
return getPrivateUrl(bucketInfo, objectStorageKey)
}
async function multiPartUpload (options: {
content: ReadStream content: ReadStream
objectStorageKey: string objectStorageKey: string
bucketInfo: BucketInfo bucketInfo: BucketInfo
@ -177,11 +148,23 @@ async function multiPartUpload (options: {
client: getClient(), client: getClient(),
queueSize: 4, queueSize: 4,
partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART, partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
leavePartsOnError: false,
// `leavePartsOnError` must be set to `true` to avoid silently dropping failed parts
// More detailed explanation:
// https://github.com/aws/aws-sdk-js-v3/blob/v3.164.0/lib/lib-storage/src/Upload.ts#L274
// https://github.com/aws/aws-sdk-js-v3/issues/2311#issuecomment-939413928
leavePartsOnError: true,
params: input params: input
}) })
await parallelUploads3.done() const response = (await parallelUploads3.done()) as CompleteMultipartUploadCommandOutput
// Check is needed even if the HTTP status code is 200 OK
// For more information, see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
if (!response.Bucket) {
const message = `Error uploading ${objectStorageKey} to bucket ${bucketInfo.BUCKET_NAME}`
logger.error(message, { response, ...lTags() })
throw new Error(message)
}
logger.debug( logger.debug(
'Completed %s%s in bucket %s', 'Completed %s%s in bucket %s',