BGP-Ranking/bin/parser.py

125 lines
5.0 KiB
Python
Raw Permalink Normal View History

2018-07-13 14:51:00 +02:00
#!/usr/bin/env python3
2018-03-29 22:37:28 +02:00
# -*- coding: utf-8 -*-
2021-12-06 14:30:08 +01:00
import importlib
import json
2018-03-29 22:37:28 +02:00
import logging
2021-12-06 14:30:08 +01:00
import re
import types
from datetime import datetime
from io import BytesIO
from logging import Logger
2018-03-29 22:37:28 +02:00
from pathlib import Path
2021-12-06 14:30:08 +01:00
from typing import List, Union, Tuple
from uuid import uuid4
from redis import Redis
from bgpranking.default import AbstractManager, safe_create_dir, get_socket_path
from bgpranking.helpers import get_modules, get_data_dir, get_modules_dir
2018-03-30 14:33:33 +02:00
2018-03-29 22:37:28 +02:00
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s',
2021-12-17 10:54:05 +01:00
level=logging.INFO)
2018-03-29 22:37:28 +02:00
2021-12-06 14:30:08 +01:00
class RawFilesParser():
def __init__(self, config_file: Path, logger: Logger) -> None:
self.logger = logger
with open(config_file, 'r') as f:
module_parameters = json.load(f)
self.vendor = module_parameters['vendor']
self.listname = module_parameters['name']
if 'parser' in module_parameters:
self.parse_raw_file = types.MethodType(importlib.import_module(module_parameters['parser'], 'bgpranking').parse_raw_file, self) # type: ignore
self.source = f'{self.vendor}-{self.listname}'
self.directory = get_data_dir() / self.vendor / self.listname
safe_create_dir(self.directory)
self.unparsable_dir = self.directory / 'unparsable'
safe_create_dir(self.unparsable_dir)
self.redis_intake = Redis(unix_socket_path=get_socket_path('intake'), db=0)
self.logger.debug(f'{self.source}: Starting intake.')
@property
def files_to_parse(self) -> List[Path]:
return sorted([f for f in self.directory.iterdir() if f.is_file()], reverse=True)
def extract_ipv4(self, bytestream: bytes) -> List[Union[bytes, Tuple[bytes, datetime]]]:
return re.findall(rb'[0-9]+(?:\.[0-9]+){3}', bytestream)
def strip_leading_zeros(self, ips: List[bytes]) -> List[bytes]:
'''Helper to get rid of leading 0s in an IP list.
Only run it when needed, it is nasty and slow'''
return ['.'.join(str(int(part)) for part in ip.split(b'.')).encode() for ip in ips]
def parse_raw_file(self, f: BytesIO) -> List[Union[bytes, Tuple[bytes, datetime]]]:
# If the list doesn't provide a time, fallback to current day, midnight
self.datetime = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
return self.extract_ipv4(f.getvalue())
def parse_raw_files(self) -> None:
nb_unparsable_files = len([f for f in self.unparsable_dir.iterdir() if f.is_file()])
if nb_unparsable_files:
self.logger.warning(f'{self.source}: Was unable to parse {nb_unparsable_files} files.')
try:
for filepath in self.files_to_parse:
self.logger.debug(f'{self.source}: Parsing {filepath}, {len(self.files_to_parse) - 1} to go.')
with open(filepath, 'rb') as f:
to_parse = BytesIO(f.read())
p = self.redis_intake.pipeline()
for line in self.parse_raw_file(to_parse):
if isinstance(line, tuple):
ip, datetime = line
else:
ip = line
datetime = self.datetime
uuid = uuid4()
p.hmset(str(uuid), {'ip': ip, 'source': self.source,
'datetime': datetime.isoformat()})
p.sadd('intake', str(uuid))
p.execute()
self._archive(filepath)
except Exception as e:
self.logger.warning(f"{self.source}: That didn't go well: {e}")
self._unparsable(filepath)
def _archive(self, filepath: Path) -> None:
'''After processing, move file to the archive directory'''
filepath.rename(self.directory / 'archive' / filepath.name)
def _unparsable(self, filepath: Path) -> None:
'''After processing, move file to the archive directory'''
filepath.rename(self.unparsable_dir / filepath.name)
2018-03-30 14:33:33 +02:00
class ParserManager(AbstractManager):
2018-03-29 22:37:28 +02:00
2021-12-06 14:30:08 +01:00
def __init__(self, loglevel: int=logging.DEBUG):
2018-03-30 14:33:33 +02:00
super().__init__(loglevel)
2021-12-06 14:30:08 +01:00
self.script_name = 'parser'
self.modules_paths = get_modules()
self.modules = [RawFilesParser(path, self.logger) for path in self.modules_paths]
2018-03-29 22:37:28 +02:00
2018-03-30 14:33:33 +02:00
def _to_run_forever(self):
2018-07-13 14:51:00 +02:00
# Check if there are new config files
2021-12-06 14:30:08 +01:00
new_modules_paths = [modulepath for modulepath in get_modules_dir().glob('*.json') if modulepath not in self.modules_paths]
self.modules += [RawFilesParser(path, self.logger) for path in new_modules_paths]
2018-07-13 14:51:00 +02:00
self.modules_paths += new_modules_paths
if self.modules:
2021-12-06 14:30:08 +01:00
for module in self.modules:
module.parse_raw_files()
2018-07-13 14:51:00 +02:00
else:
self.logger.warning('No config files were found so there are no parsers running yet. Will try again later.')
2018-03-29 22:37:28 +02:00
2021-12-06 14:30:08 +01:00
def main():
2018-03-29 22:37:28 +02:00
parser_manager = ParserManager()
2018-03-30 14:33:33 +02:00
parser_manager.run(sleep_in_sec=120)
2021-12-06 14:30:08 +01:00
if __name__ == '__main__':
main()