fix: s3fs support was broken.

pull/818/head
Raphaël Vinot 2023-10-23 15:59:14 +02:00
parent 6079dfdd37
commit 1c5c178d20
1 changed files with 22 additions and 12 deletions

View File

@ -5,6 +5,7 @@ import gzip
import logging
import logging.config
import os
import random
import shutil
from collections import defaultdict
@ -47,12 +48,12 @@ class Archiver(AbstractManager):
s3fs_config = get_config('generic', 's3fs')
if s3fs_config.get('archive_on_s3fs'):
self.archive_on_s3fs = True
self.s3fs_client = s3fs.S3FileSystem(key=s3fs_config['key'],
secret=s3fs_config['secret'],
endpoint_url=s3fs_config['endpoint_url'],
self.s3fs_client = s3fs.S3FileSystem(key=s3fs_config['config']['key'],
secret=s3fs_config['config']['secret'],
endpoint_url=s3fs_config['config']['endpoint_url'],
config_kwargs={'connect_timeout': 10,
'read_timeout': 900})
self.s3fs_bucket = s3fs_config['bucket_name']
self.s3fs_bucket = s3fs_config['config']['bucket_name']
self.s3fs_client.clear_multipart_uploads(self.s3fs_bucket)
def _to_run_forever(self):
@ -76,18 +77,22 @@ class Archiver(AbstractManager):
def _update_index(self, root_dir: Path, *, s3fs: bool=False) -> None:
current_index: Dict[str, str] = {}
if s3fs:
self.s3fs_client.invalidate_cache(str(root_dir))
all_s3fs_captures = self.s3fs_client.ls(str(root_dir), detail=False, refresh=True)
self.logger.info(f'Updating index for {root_dir} (s3fs)')
self.s3fs_client.invalidate_cache(self.s3fs_bucket)
# On s3fs, the path is bucket_name/year/month
# root_dir is /full/local/path/to/archived_captures/year/month
s3fs_dir = '/'.join([self.s3fs_bucket, root_dir.parent.name, root_dir.name])
all_s3fs_captures = self.s3fs_client.ls(s3fs_dir, detail=False, refresh=True)
if not all_s3fs_captures:
self.s3fs_client.rmdir(str(root_dir))
self.logger.warning(f'{root_dir} is empty on s3fs ({s3fs_dir}).')
return
else:
self.logger.debug(f'Updating index for {root_dir}')
if not any(os.scandir(root_dir)):
# the directory is empty, we can safely remove it
root_dir.rmdir()
return
self.logger.debug(f'Updating index for {root_dir}')
index_file = root_dir / 'index'
if index_file.exists():
# Skip index if the directory has been archived.
@ -108,7 +113,7 @@ class Archiver(AbstractManager):
new_captures = {existing_capture.rsplit('/', 1)[-1] for existing_capture in all_s3fs_captures
if existing_capture.rsplit('/', 1)[-1]
and (existing_capture.rsplit('/', 1)[-1] not in curent_index_dirs)
and self.s3fs_client.is_dir(str(existing_capture))}
and self.s3fs_client.isdir(existing_capture)}
else:
with os.scandir(root_dir) as it:
new_captures = {existing_capture.name for existing_capture in it
@ -116,8 +121,11 @@ class Archiver(AbstractManager):
and existing_capture.is_dir()}
if not new_captures:
if s3fs:
self.logger.info(f'No new captures in {root_dir} (s3fs directory)')
else:
self.logger.debug(f'No new captures in {root_dir}')
# No new captures, quitting
self.logger.debug(f'No new captures in {root_dir}.')
return
self.logger.info(f'{len(new_captures)} new captures in {root_dir}.')
@ -198,7 +206,9 @@ class Archiver(AbstractManager):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
self._update_index(directory_to_index, s3fs=self.archive_on_s3fs)
# Updating the indexes can take a while, just run this call once in N calls
if random.randrange(20) == 0:
self._update_index(directory_to_index, s3fs=self.archive_on_s3fs)
self.logger.info('Archived indexes updated')
def _archive(self):
@ -283,7 +293,7 @@ class Archiver(AbstractManager):
finally:
(dest_dir / capture_path.name / 'lock').unlink(missing_ok=True)
# we archived some captures, update relevant index
self._update_index(dest_dir)
self._update_index(dest_dir, s3fs=self.archive_on_s3fs)
if not archiving_done:
break
else: