new: Use S3FS in archiving script instead, remove python 3.12 uspport

s3fs_python3.12
Raphaël Vinot 2023-10-23 13:35:29 +02:00
parent bf9ff87dac
commit 353015096e
4 changed files with 50 additions and 33 deletions

View File

@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.8", "3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v4

View File

@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11"]
steps:
- uses: actions/checkout@v4

View File

@ -14,6 +14,7 @@ from pathlib import Path
from typing import Dict, List, Optional
from redis import Redis
import s3fs # type: ignore
from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path, try_make_file
from lookyloo.helpers import get_captures_dir, is_locked
@ -46,6 +47,13 @@ class Archiver(AbstractManager):
s3fs_config = get_config('generic', 's3fs')
if s3fs_config.get('archive_on_s3fs'):
self.archive_on_s3fs = True
self.s3fs_client = s3fs.S3FileSystem(key=s3fs_config['key'],
secret=s3fs_config['secret'],
endpoint_url=s3fs_config['endpoint_url'],
config_kwargs={'connect_timeout': 10,
'read_timeout': 900})
self.s3fs_bucket = s3fs_config['bucket_name']
self.s3fs_client.clear_multipart_uploads(self.s3fs_bucket)
def _to_run_forever(self):
archiving_done = False
@ -65,12 +73,19 @@ class Archiver(AbstractManager):
# This call takes a very long time on MinIO
self._update_all_capture_indexes()
def _update_index(self, root_dir: Path) -> None:
def _update_index(self, root_dir: Path, *, s3fs: bool=False) -> None:
current_index: Dict[str, str] = {}
if not any(os.scandir(root_dir)):
# the directory is empty, we can safely remove it
root_dir.rmdir()
return
if s3fs:
self.s3fs_client.invalidate_cache(str(root_dir))
all_s3fs_captures = self.s3fs_client.ls(str(root_dir), detail=False, refresh=True)
if not all_s3fs_captures:
self.s3fs_client.rmdir(str(root_dir))
return
else:
if not any(os.scandir(root_dir)):
# the directory is empty, we can safely remove it
root_dir.rmdir()
return
self.logger.debug(f'Updating index for {root_dir}')
index_file = root_dir / 'index'
@ -89,9 +104,16 @@ class Archiver(AbstractManager):
curent_index_dirs = set(current_index.values())
with os.scandir(root_dir) as it:
new_captures = {existing_capture.name for existing_capture in it
if (existing_capture.name not in curent_index_dirs) and existing_capture.is_dir()}
if s3fs:
new_captures = {existing_capture.rsplit('/', 1)[-1] for existing_capture in all_s3fs_captures
if existing_capture.rsplit('/', 1)[-1]
and (existing_capture.rsplit('/', 1)[-1] not in curent_index_dirs)
and self.s3fs_client.is_dir(str(existing_capture))}
else:
with os.scandir(root_dir) as it:
new_captures = {existing_capture.name for existing_capture in it
if (existing_capture.name not in curent_index_dirs)
and existing_capture.is_dir()}
if not new_captures:
# No new captures, quitting
@ -102,9 +124,6 @@ class Archiver(AbstractManager):
for capture_dir_name in new_captures:
capture_dir = root_dir / capture_dir_name
if not capture_dir.is_dir():
self.logger.warning(f'{capture_dir} is not a directory')
continue
if not next(capture_dir.iterdir(), None):
self.logger.warning(f'{capture_dir} is empty, removing.')
capture_dir.rmdir()
@ -116,20 +135,21 @@ class Archiver(AbstractManager):
continue
with uuid_file.open() as _f:
uuid = _f.read().strip()
try:
if not uuid:
self.logger.warning(f'{uuid_file} is empty')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
if uuid in current_index:
self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
except OSError as e:
self.logger.warning(f'Error when discarding capture {capture_dir}: {e}')
continue
current_index[uuid] = uuid_file.parent.name
try:
if not uuid:
self.logger.warning(f'{uuid_file} is empty')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
if uuid in current_index:
self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
except OSError as e:
self.logger.warning(f'Error when discarding capture {capture_dir}: {e}')
continue
current_index[uuid] = uuid_file.parent.name
if not current_index:
# The directory has been archived. It is probably safe to unlink, but
@ -162,8 +182,8 @@ class Archiver(AbstractManager):
# Recent captures
self.logger.info('Update recent indexes')
# NOTE: the call below will check the existence of every path ending with `uuid`,
# it is extremely inneficient as we have many hundred of thusands of them
# and we only care about the rood directory (ex: 2023/06)
# it is extremely ineficient as we have many hundred of thusands of them
# and we only care about the root directory (ex: 2023/06)
# directories_to_index = {capture_dir.parent.parent
# for capture_dir in get_captures_dir().glob('*/*/*/uuid')}
for directory_to_index in self._make_dirs_list(get_captures_dir()):
@ -172,16 +192,13 @@ class Archiver(AbstractManager):
break
self._update_index(directory_to_index)
self.logger.info('Recent indexes updated')
if self.archive_on_s3fs:
self.logger.info('Not updating indexes as they are on a s3fs-fuse mount.')
return
# Archived captures
self.logger.info('Update archives indexes')
for directory_to_index in self._make_dirs_list(self.archived_captures_dir):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
self._update_index(directory_to_index)
self._update_index(directory_to_index, s3fs=self.archive_on_s3fs)
self.logger.info('Archived indexes updated')
def _archive(self):

View File

@ -35,7 +35,7 @@ start_website = "bin.start_website:main"
[tool.poetry.dependencies]
python = ">=3.8,<3.13"
python = ">=3.8,<3.12"
requests = "^2.31.0"
flask = "^2.3.3"
gunicorn = "^21.2.0"