mirror of https://github.com/CIRCL/lookyloo
				
				
				
			chg: Make archiver an index generator
							parent
							
								
									1bff8f1529
								
							
						
					
					
						commit
						117500b777
					
				|  | @ -5,7 +5,7 @@ from collections import defaultdict | |||
| import csv | ||||
| from datetime import datetime, timedelta | ||||
| import logging | ||||
| from typing import Dict, List, Tuple | ||||
| from typing import Dict, List | ||||
| from pathlib import Path | ||||
| 
 | ||||
| from redis import Redis | ||||
|  | @ -28,10 +28,55 @@ class Archiver(AbstractManager): | |||
|         self.archived_captures_dir = get_homedir() / 'archived_captures' | ||||
|         self.archived_captures_dir.mkdir(parents=True, exist_ok=True) | ||||
| 
 | ||||
|         self._load_archives() | ||||
|         self._load_indexes() | ||||
| 
 | ||||
|     def _to_run_forever(self): | ||||
|         self._archive() | ||||
|         self._update_all_capture_indexes() | ||||
|         self._load_indexes() | ||||
| 
 | ||||
|     def _update_index(self, root_dir: Path) -> None: | ||||
|         current_index: Dict[str, str] | ||||
| 
 | ||||
|         index_file = root_dir / 'index' | ||||
|         if index_file.exists(): | ||||
|             # Skip index if the directory has been archived. | ||||
|             existing_captures = index_file.parent.iterdir() | ||||
|             with index_file.open('r') as _f: | ||||
|                 current_index = {uuid: dirname for uuid, dirname in csv.reader(_f) if (index_file.parent / dirname) in existing_captures} | ||||
|             if not current_index: | ||||
|                 index_file.unlink() | ||||
|         else: | ||||
|             current_index = {} | ||||
| 
 | ||||
|         for uuid_file in root_dir.glob('*/uuid'): | ||||
|             if uuid_file.parent.name in current_index.values(): | ||||
|                 # The path is already in the index file, no need to read the uuid file | ||||
|                 continue | ||||
|             with uuid_file.open() as _f: | ||||
|                 current_index[_f.read().strip()] = uuid_file.parent.name | ||||
| 
 | ||||
|         if not current_index: | ||||
|             # The directory has been archived. | ||||
|             root_dir.unlink() | ||||
|             return | ||||
| 
 | ||||
|         with index_file.open('w') as _f: | ||||
|             index_writer = csv.writer(_f) | ||||
|             for uuid, dirname in current_index.items(): | ||||
|                 index_writer.writerow([uuid, dirname]) | ||||
| 
 | ||||
|     def _update_all_capture_indexes(self): | ||||
|         '''Run that after the captures are in the proper directories''' | ||||
|         # Recent captures | ||||
|         directories_to_index = set(capture_dir.parent.parent for capture_dir in get_captures_dir().glob('**/uuid')) | ||||
|         for directory_to_index in directories_to_index: | ||||
|             self._update_index(directory_to_index) | ||||
| 
 | ||||
|         # Archived captures | ||||
|         directories_to_index = set(capture_dir.parent.parent for capture_dir in self.archived_captures_dir.glob('**/uuid')) | ||||
|         for directory_to_index in directories_to_index: | ||||
|             self._update_index(directory_to_index) | ||||
| 
 | ||||
|     def _archive(self): | ||||
|         archive_interval = timedelta(days=get_config('generic', 'archive')) | ||||
|  | @ -40,54 +85,40 @@ class Archiver(AbstractManager): | |||
| 
 | ||||
|         # Format: | ||||
|         # { 2020: { 12: [(directory, uuid)] } } | ||||
|         to_archive: Dict[int, Dict[int, List[Tuple[Path, str]]]] = defaultdict(lambda: defaultdict(list)) | ||||
|         to_archive: Dict[int, Dict[int, List[Path]]] = defaultdict(lambda: defaultdict(list)) | ||||
|         for capture_uuid in get_captures_dir().glob('**/uuid'): | ||||
|             timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S.%f') | ||||
|             if timestamp.date() >= cut_time: | ||||
|                 # do not archive. | ||||
|                 continue | ||||
|             with capture_uuid.open() as _f: | ||||
|                 uuid = _f.read().strip() | ||||
|             to_archive[timestamp.year][timestamp.month].append((capture_uuid.parent, uuid)) | ||||
|             to_archive[timestamp.year][timestamp.month].append(capture_uuid.parent) | ||||
|             self.logger.info(f'Archiving {capture_uuid.parent}.') | ||||
| 
 | ||||
|         if not to_archive: | ||||
|             self.logger.info('Nothing to archive.') | ||||
|             return | ||||
| 
 | ||||
|         archived_uuids = {} | ||||
|         p = self.redis.pipeline() | ||||
|         for year, month_captures in to_archive.items(): | ||||
|             for month, captures in month_captures.items(): | ||||
|                 dest_dir = self.archived_captures_dir / str(year) / f'{month:02}' | ||||
|                 dest_dir.mkdir(parents=True, exist_ok=True) | ||||
|                 if (dest_dir / 'index').exists(): | ||||
|                     with (dest_dir / 'index').open('r') as _f: | ||||
|                         current_index = {uuid: dirname for uuid, dirname in csv.reader(_f)} | ||||
|                 else: | ||||
|                     current_index = {} | ||||
|                 for capture_path, uuid in captures: | ||||
|                     current_index[uuid] = capture_path.name | ||||
|                 for capture_path in captures: | ||||
|                     p.delete(str(capture_path)) | ||||
|                     capture_path.rename(dest_dir / capture_path.name) | ||||
|                     archived_uuids[uuid] = str(dest_dir / capture_path.name) | ||||
|                 with (dest_dir / 'index').open('w') as _f: | ||||
|                     index_writer = csv.writer(_f) | ||||
|                     for uuid, dirname in current_index.items(): | ||||
|                         index_writer.writerow([uuid, dirname]) | ||||
|         p.execute() | ||||
| 
 | ||||
|         # Clear empty | ||||
| 
 | ||||
|         if archived_uuids: | ||||
|             p = self.redis.pipeline() | ||||
|             for dir_key in self.redis.hmget('lookup_dirs', *archived_uuids.keys()): | ||||
|                 # Clear cache | ||||
|                 if dir_key: | ||||
|                     p.delete(dir_key) | ||||
|             p.hdel('lookup_dirs', *archived_uuids.keys()) | ||||
|             p.hmset('lookup_dirs_archived', archived_uuids)  # type: ignore | ||||
|             p.execute() | ||||
|         self.logger.info('Archiving done.') | ||||
| 
 | ||||
|     def _load_archives(self): | ||||
|     def _load_indexes(self): | ||||
|         # Initialize archives | ||||
|         for index in get_captures_dir().glob('**/index'): | ||||
|             with index.open('r') as _f: | ||||
|                 recent_uuids: Dict[str, str] = {uuid: str(index.parent / dirname) for uuid, dirname in csv.reader(_f)} | ||||
|             self.redis.hmset('lookup_dirs', recent_uuids)  # type: ignore | ||||
| 
 | ||||
|         # Initialize archives | ||||
|         self.redis.delete('lookup_dirs_archived') | ||||
|         for index in self.archived_captures_dir.glob('**/index'): | ||||
|             with index.open('r') as _f: | ||||
|                 archived_uuids: Dict[str, str] = {uuid: str(index.parent / dirname) for uuid, dirname in csv.reader(_f)} | ||||
|  | @ -96,7 +127,7 @@ class Archiver(AbstractManager): | |||
| 
 | ||||
| def main(): | ||||
|     a = Archiver() | ||||
|     a.run(sleep_in_sec=3600 * 24) | ||||
|     a.run(sleep_in_sec=3600) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|  |  | |||
|  | @ -37,7 +37,7 @@ class Processing(AbstractManager): | |||
|         safe_create_dir(self_generated_ua_file_path) | ||||
|         self_generated_ua_file = self_generated_ua_file_path / f'{yesterday.isoformat()}.json' | ||||
|         if self_generated_ua_file.exists(): | ||||
|             self.logger.info('User-agent file for {yesterday} already exists.') | ||||
|             self.logger.info(f'User-agent file for {yesterday} already exists.') | ||||
|             return | ||||
|         self.logger.info(f'Generating user-agent file for {yesterday}') | ||||
|         redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True) | ||||
|  |  | |||
							
								
								
									
										12
									
								
								bin/start.py
								
								
								
								
							
							
						
						
									
										12
									
								
								bin/start.py
								
								
								
								
							|  | @ -2,7 +2,7 @@ | |||
| # -*- coding: utf-8 -*- | ||||
| 
 | ||||
| from subprocess import run, Popen | ||||
| from lookyloo.helpers import get_homedir, get_config, reload_uuids_index | ||||
| from lookyloo.helpers import get_homedir, get_config | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|  | @ -12,11 +12,8 @@ def main(): | |||
|     p = run(['run_backend', '--start']) | ||||
|     p.check_returncode() | ||||
|     print('done.') | ||||
|     print('Reload UUIDs index...') | ||||
|     print('If this is taking too long, it means you have a lot of captures.') | ||||
|     print('You should run tools/change_captures_dir.py to re-organize the capture directory by year and month.') | ||||
|     print('You may also want to archive more captures.') | ||||
|     reload_uuids_index() | ||||
|     print('Start archiving process...') | ||||
|     Popen(['archiver']) | ||||
|     print('done.') | ||||
|     print('Start asynchronous ingestor...') | ||||
|     for _ in range(get_config('generic', 'async_capture_processes')): | ||||
|  | @ -28,9 +25,6 @@ def main(): | |||
|     print('Start background processing...') | ||||
|     Popen(['processing']) | ||||
|     print('done.') | ||||
|     print('Start archiving process...') | ||||
|     Popen(['archiver']) | ||||
|     print('done.') | ||||
|     print('Start website...') | ||||
|     Popen(['start_website']) | ||||
|     print('done.') | ||||
|  |  | |||
|  | @ -264,21 +264,6 @@ def get_useragent_for_requests(): | |||
|     return f'Lookyloo / {version}' | ||||
| 
 | ||||
| 
 | ||||
| def reload_uuids_index() -> None: | ||||
|     recent_uuids: Dict[str, str] = {} | ||||
|     for uuid_path in get_captures_dir().glob('**/uuid'): | ||||
|         with uuid_path.open() as f: | ||||
|             uuid = f.read() | ||||
|         recent_uuids[uuid] = str(uuid_path.parent) | ||||
|     if not recent_uuids: | ||||
|         return None | ||||
|     r = Redis(unix_socket_path=get_socket_path('cache')) | ||||
|     p = r.pipeline() | ||||
|     p.delete('lookup_dirs') | ||||
|     p.hmset('lookup_dirs', recent_uuids)  # type: ignore | ||||
|     p.execute() | ||||
| 
 | ||||
| 
 | ||||
| def get_capture_status(capture_uuid: str, /) -> CaptureStatus: | ||||
|     r = Redis(unix_socket_path=get_socket_path('cache')) | ||||
|     if r.zrank('to_capture', capture_uuid) is not None: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Raphaël Vinot
						Raphaël Vinot