chg: Avoid directory listing as much as possible in archiver, allow shutdown

pull/751/head
Raphaël Vinot 2023-08-04 14:02:45 +02:00
parent 54674f6c5b
commit c203aa91b9
2 changed files with 35 additions and 9 deletions

View File

@ -8,7 +8,7 @@ import shutil
from collections import defaultdict from collections import defaultdict
from collections.abc import Mapping from collections.abc import Mapping
from datetime import datetime, timedelta from datetime import datetime, timedelta, date
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
@ -88,6 +88,9 @@ class Archiver(AbstractManager):
self.logger.info('Update archives indexes') self.logger.info('Update archives indexes')
directories_to_index = {capture_dir.parent.parent for capture_dir in self.archived_captures_dir.glob('*/*/*/uuid')} directories_to_index = {capture_dir.parent.parent for capture_dir in self.archived_captures_dir.glob('*/*/*/uuid')}
for directory_to_index in directories_to_index: for directory_to_index in directories_to_index:
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
self.logger.debug(f'Updating index for {directory_to_index}') self.logger.debug(f'Updating index for {directory_to_index}')
self._update_index(directory_to_index) self._update_index(directory_to_index)
self.logger.info('Archived indexes updated') self.logger.info('Archived indexes updated')
@ -100,7 +103,18 @@ class Archiver(AbstractManager):
# Format: # Format:
# { 2020: { 12: [(directory, uuid)] } } # { 2020: { 12: [(directory, uuid)] } }
to_archive: Dict[int, Dict[int, List[Path]]] = defaultdict(lambda: defaultdict(list)) to_archive: Dict[int, Dict[int, List[Path]]] = defaultdict(lambda: defaultdict(list))
for capture_uuid in get_captures_dir().glob('*/*/*/uuid'): # In order to avoid scanning the complete directory on each run, we check if year and month are
# older than the cut time.
for index in get_captures_dir().glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
month = int(index.parent.name)
year = int(index.parent.parent.name)
if date(year, month, 1) >= cut_time:
continue
for capture_uuid in index.parent.glob('*/uuid'):
try: try:
timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S.%f') timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S.%f')
except ValueError: except ValueError:
@ -131,6 +145,9 @@ class Archiver(AbstractManager):
def _compress_hars(self): def _compress_hars(self):
self.logger.info('Compressing archived captures') self.logger.info('Compressing archived captures')
for index in self.archived_captures_dir.glob('*/*/index'): for index in self.archived_captures_dir.glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
with index.open('r') as _f: with index.open('r') as _f:
for uuid, dirname in csv.reader(_f): for uuid, dirname in csv.reader(_f):
for har in (index.parent / dirname).rglob('*.har'): for har in (index.parent / dirname).rglob('*.har'):
@ -145,6 +162,10 @@ class Archiver(AbstractManager):
def _load_indexes(self): def _load_indexes(self):
# Initialize archives # Initialize archives
for index in get_captures_dir().glob('*/*/index'): for index in get_captures_dir().glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
with index.open('r') as _f: with index.open('r') as _f:
recent_uuids: Mapping = {uuid: str(index.parent / dirname) for uuid, dirname in csv.reader(_f) if (index.parent / dirname).exists()} recent_uuids: Mapping = {uuid: str(index.parent / dirname) for uuid, dirname in csv.reader(_f) if (index.parent / dirname).exists()}
if recent_uuids: if recent_uuids:
@ -155,6 +176,9 @@ class Archiver(AbstractManager):
# Initialize archives # Initialize archives
for index in self.archived_captures_dir.glob('*/*/index'): for index in self.archived_captures_dir.glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
with index.open('r') as _f: with index.open('r') as _f:
archived_uuids: Mapping = {uuid: str(index.parent / dirname) for uuid, dirname in csv.reader(_f) if (index.parent / dirname).exists()} archived_uuids: Mapping = {uuid: str(index.parent / dirname) for uuid, dirname in csv.reader(_f) if (index.parent / dirname).exists()}
if archived_uuids: if archived_uuids:

View File

@ -84,6 +84,8 @@ class BackgroundIndexer(AbstractManager):
self.logger.warning(f'Unable to find {uuid}. That should not happen.') self.logger.warning(f'Unable to find {uuid}. That should not happen.')
except NoValidHarFile as e: except NoValidHarFile as e:
self.logger.critical(f'There are no HAR files in the capture {uuid}: {uuid_path.parent.name} - {e}') self.logger.critical(f'There are no HAR files in the capture {uuid}: {uuid_path.parent.name} - {e}')
except FileNotFoundError:
self.logger.warning(f'Capture {uuid} disappeared during processing, probably archived.')
except Exception: except Exception:
self.logger.exception(f'Unable to build pickle for {uuid}: {uuid_path.parent.name}') self.logger.exception(f'Unable to build pickle for {uuid}: {uuid_path.parent.name}')
# The capture is not working, moving it away. # The capture is not working, moving it away.