mirror of https://github.com/Chocobozzz/PeerTube
				
				
				
			
		
			
				
	
	
		
			232 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			TypeScript
		
	
	
			
		
		
	
	
			232 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			TypeScript
		
	
	
| import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
 | |
| import { min } from 'lodash'
 | |
| import { dirname } from 'path'
 | |
| import { Readable } from 'stream'
 | |
| import {
 | |
|   CompletedPart,
 | |
|   CompleteMultipartUploadCommand,
 | |
|   CreateMultipartUploadCommand,
 | |
|   DeleteObjectCommand,
 | |
|   GetObjectCommand,
 | |
|   ListObjectsV2Command,
 | |
|   PutObjectCommand,
 | |
|   UploadPartCommand
 | |
| } from '@aws-sdk/client-s3'
 | |
| import { pipelinePromise } from '@server/helpers/core-utils'
 | |
| import { isArray } from '@server/helpers/custom-validators/misc'
 | |
| import { logger } from '@server/helpers/logger'
 | |
| import { CONFIG } from '@server/initializers/config'
 | |
| import { getPrivateUrl } from '../urls'
 | |
| import { getClient } from './client'
 | |
| import { lTags } from './logger'
 | |
| 
 | |
| type BucketInfo = {
 | |
|   BUCKET_NAME: string
 | |
|   PREFIX?: string
 | |
| }
 | |
| 
 | |
| async function storeObject (options: {
 | |
|   inputPath: string
 | |
|   objectStorageKey: string
 | |
|   bucketInfo: BucketInfo
 | |
| }): Promise<string> {
 | |
|   const { inputPath, objectStorageKey, bucketInfo } = options
 | |
| 
 | |
|   logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
 | |
| 
 | |
|   const stats = await stat(inputPath)
 | |
| 
 | |
|   // If bigger than max allowed size we do a multipart upload
 | |
|   if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
 | |
|     return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
 | |
|   }
 | |
| 
 | |
|   const fileStream = createReadStream(inputPath)
 | |
|   return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
 | |
| }
 | |
| 
 | |
| async function removeObject (filename: string, bucketInfo: BucketInfo) {
 | |
|   const command = new DeleteObjectCommand({
 | |
|     Bucket: bucketInfo.BUCKET_NAME,
 | |
|     Key: buildKey(filename, bucketInfo)
 | |
|   })
 | |
| 
 | |
|   return getClient().send(command)
 | |
| }
 | |
| 
 | |
| async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
 | |
|   const s3Client = getClient()
 | |
| 
 | |
|   const commandPrefix = bucketInfo.PREFIX + prefix
 | |
|   const listCommand = new ListObjectsV2Command({
 | |
|     Bucket: bucketInfo.BUCKET_NAME,
 | |
|     Prefix: commandPrefix
 | |
|   })
 | |
| 
 | |
|   const listedObjects = await s3Client.send(listCommand)
 | |
| 
 | |
|   // FIXME: use bulk delete when s3ninja will support this operation
 | |
|   // const deleteParams = {
 | |
|   //   Bucket: bucketInfo.BUCKET_NAME,
 | |
|   //   Delete: { Objects: [] }
 | |
|   // }
 | |
| 
 | |
|   if (isArray(listedObjects.Contents) !== true) {
 | |
|     const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.`
 | |
| 
 | |
|     logger.error(message, { response: listedObjects, ...lTags() })
 | |
|     throw new Error(message)
 | |
|   }
 | |
| 
 | |
|   for (const object of listedObjects.Contents) {
 | |
|     const command = new DeleteObjectCommand({
 | |
|       Bucket: bucketInfo.BUCKET_NAME,
 | |
|       Key: object.Key
 | |
|     })
 | |
| 
 | |
|     await s3Client.send(command)
 | |
| 
 | |
|     // FIXME: use bulk delete when s3ninja will support this operation
 | |
|     // deleteParams.Delete.Objects.push({ Key: object.Key })
 | |
|   }
 | |
| 
 | |
|   // FIXME: use bulk delete when s3ninja will support this operation
 | |
|   // const deleteCommand = new DeleteObjectsCommand(deleteParams)
 | |
|   // await s3Client.send(deleteCommand)
 | |
| 
 | |
|   // Repeat if not all objects could be listed at once (limit of 1000?)
 | |
|   if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
 | |
| }
 | |
| 
 | |
| async function makeAvailable (options: {
 | |
|   key: string
 | |
|   destination: string
 | |
|   bucketInfo: BucketInfo
 | |
| }) {
 | |
|   const { key, destination, bucketInfo } = options
 | |
| 
 | |
|   await ensureDir(dirname(options.destination))
 | |
| 
 | |
|   const command = new GetObjectCommand({
 | |
|     Bucket: bucketInfo.BUCKET_NAME,
 | |
|     Key: buildKey(key, bucketInfo)
 | |
|   })
 | |
|   const response = await getClient().send(command)
 | |
| 
 | |
|   const file = createWriteStream(destination)
 | |
|   await pipelinePromise(response.Body as Readable, file)
 | |
| 
 | |
|   file.close()
 | |
| }
 | |
| 
 | |
| function buildKey (key: string, bucketInfo: BucketInfo) {
 | |
|   return bucketInfo.PREFIX + key
 | |
| }
 | |
| 
 | |
| // ---------------------------------------------------------------------------
 | |
| 
 | |
| export {
 | |
|   BucketInfo,
 | |
|   buildKey,
 | |
|   storeObject,
 | |
|   removeObject,
 | |
|   removePrefix,
 | |
|   makeAvailable
 | |
| }
 | |
| 
 | |
| // ---------------------------------------------------------------------------
 | |
| 
 | |
| async function objectStoragePut (options: {
 | |
|   objectStorageKey: string
 | |
|   content: ReadStream
 | |
|   bucketInfo: BucketInfo
 | |
| }) {
 | |
|   const { objectStorageKey, content, bucketInfo } = options
 | |
| 
 | |
|   const command = new PutObjectCommand({
 | |
|     Bucket: bucketInfo.BUCKET_NAME,
 | |
|     Key: buildKey(objectStorageKey, bucketInfo),
 | |
|     Body: content,
 | |
|     ACL: 'public-read'
 | |
|   })
 | |
| 
 | |
|   await getClient().send(command)
 | |
| 
 | |
|   return getPrivateUrl(bucketInfo, objectStorageKey)
 | |
| }
 | |
| 
 | |
| async function multiPartUpload (options: {
 | |
|   inputPath: string
 | |
|   objectStorageKey: string
 | |
|   bucketInfo: BucketInfo
 | |
| }) {
 | |
|   const { objectStorageKey, inputPath, bucketInfo } = options
 | |
| 
 | |
|   const key = buildKey(objectStorageKey, bucketInfo)
 | |
|   const s3Client = getClient()
 | |
| 
 | |
|   const statResult = await stat(inputPath)
 | |
| 
 | |
|   const createMultipartCommand = new CreateMultipartUploadCommand({
 | |
|     Bucket: bucketInfo.BUCKET_NAME,
 | |
|     Key: key,
 | |
|     ACL: 'public-read'
 | |
|   })
 | |
|   const createResponse = await s3Client.send(createMultipartCommand)
 | |
| 
 | |
|   const fd = await open(inputPath, 'r')
 | |
|   let partNumber = 1
 | |
|   const parts: CompletedPart[] = []
 | |
|   const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
 | |
| 
 | |
|   for (let start = 0; start < statResult.size; start += partSize) {
 | |
|     logger.debug(
 | |
|       'Uploading part %d of file to %s%s in bucket %s',
 | |
|       partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
 | |
|     )
 | |
| 
 | |
|     // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
 | |
|     // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
 | |
|     // streams with start and end set, so it just tries to stat the file in stream.path.
 | |
|     // This fails for us because we only want to send part of the file. The stream type
 | |
|     // is modified so we can set the byteLength here, which s3 detects because array buffers
 | |
|     // have this field set
 | |
|     const stream: ReadStream & { byteLength: number } =
 | |
|       createReadStream(
 | |
|         inputPath,
 | |
|         { fd, autoClose: false, start, end: (start + partSize) - 1 }
 | |
|       ) as ReadStream & { byteLength: number }
 | |
| 
 | |
|     // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
 | |
|     stream.byteLength = min([ statResult.size - start, partSize ])
 | |
| 
 | |
|     const uploadPartCommand = new UploadPartCommand({
 | |
|       Bucket: bucketInfo.BUCKET_NAME,
 | |
|       Key: key,
 | |
|       UploadId: createResponse.UploadId,
 | |
|       PartNumber: partNumber,
 | |
|       Body: stream
 | |
|     })
 | |
|     const uploadResponse = await s3Client.send(uploadPartCommand)
 | |
| 
 | |
|     parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
 | |
|     partNumber += 1
 | |
|   }
 | |
|   await close(fd)
 | |
| 
 | |
|   const completeUploadCommand = new CompleteMultipartUploadCommand({
 | |
|     Bucket: bucketInfo.BUCKET_NAME,
 | |
|     Key: key,
 | |
|     UploadId: createResponse.UploadId,
 | |
|     MultipartUpload: { Parts: parts }
 | |
|   })
 | |
|   await s3Client.send(completeUploadCommand)
 | |
| 
 | |
|   logger.debug(
 | |
|     'Completed %s%s in bucket %s in %d parts',
 | |
|     bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
 | |
|   )
 | |
| 
 | |
|   return getPrivateUrl(bucketInfo, objectStorageKey)
 | |
| }
 |