mirror of https://github.com/CIRCL/lookyloo
chg: Fix typing
parent
a172c1f58a
commit
c54d29601d
|
@ -29,8 +29,6 @@ except ImportError:
|
||||||
|
|
||||||
from .exceptions import MissingEnv, CreateDirectoryException, ConfigError
|
from .exceptions import MissingEnv, CreateDirectoryException, ConfigError
|
||||||
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
configs: Dict[str, Dict[str, Any]] = {}
|
configs: Dict[str, Dict[str, Any]] = {}
|
||||||
logger = logging.getLogger('Lookyloo - Helpers')
|
logger = logging.getLogger('Lookyloo - Helpers')
|
||||||
|
|
||||||
|
@ -315,6 +313,7 @@ def remove_pickle_tree(capture_dir: Path) -> None:
|
||||||
if pickle_file.exists():
|
if pickle_file.exists():
|
||||||
pickle_file.unlink()
|
pickle_file.unlink()
|
||||||
|
|
||||||
|
|
||||||
def uniq_domains(uniq_urls):
|
def uniq_domains(uniq_urls):
|
||||||
domains = set()
|
domains = set()
|
||||||
for url in uniq_urls:
|
for url in uniq_urls:
|
||||||
|
|
|
@ -488,17 +488,17 @@ class Lookyloo():
|
||||||
all_cache.append(c)
|
all_cache.append(c)
|
||||||
return sorted(all_cache, key=operator.itemgetter('timestamp'), reverse=True)
|
return sorted(all_cache, key=operator.itemgetter('timestamp'), reverse=True)
|
||||||
|
|
||||||
def capture_cache(self, capture_uuid: str) -> Dict[str, Union[str, Path]]:
|
def capture_cache(self, capture_uuid: str) -> Dict[str, Union[str, Path, List]]:
|
||||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||||
if not capture_dir:
|
if not capture_dir:
|
||||||
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
|
raise MissingUUID(f'Unable to find UUID {capture_uuid} in the cache')
|
||||||
if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1':
|
if self.redis.hget(str(capture_dir), 'incomplete_redirects') == '1':
|
||||||
# try to rebuild the cache
|
# try to rebuild the cache
|
||||||
self._set_capture_cache(capture_dir, force=True)
|
self._set_capture_cache(capture_dir, force=True)
|
||||||
cached: Dict[str, Union[str, Path]] = self.redis.hgetall(str(capture_dir)) # type: ignore
|
cached: Dict[str, Union[str, Path, List]] = self.redis.hgetall(str(capture_dir)) # type: ignore
|
||||||
if all(key in cached.keys() for key in ['uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir']):
|
if all(key in cached.keys() for key in ['uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir']):
|
||||||
cached['redirects'] = json.loads(cached['redirects']) # type: ignore
|
cached['redirects'] = json.loads(cached['redirects']) # type: ignore
|
||||||
cached['capture_dir'] = Path(cached['capture_dir'])
|
cached['capture_dir'] = Path(cached['capture_dir']) # type: ignore
|
||||||
if 'categories' in cached:
|
if 'categories' in cached:
|
||||||
cached['categories'] = json.loads(cached['categories']) # type: ignore
|
cached['categories'] = json.loads(cached['categories']) # type: ignore
|
||||||
return cached
|
return cached
|
||||||
|
@ -997,60 +997,60 @@ class Lookyloo():
|
||||||
urls.append(to_append)
|
urls.append(to_append)
|
||||||
return hostnode, urls
|
return hostnode, urls
|
||||||
|
|
||||||
def get_stats(self):
|
def get_stats(self) -> Dict[str, List]:
|
||||||
stats = {}
|
stats: Dict[int, Dict[int, Dict[str, Any]]] = {}
|
||||||
today = date.today()
|
today = date.today()
|
||||||
calendar_week = today.isocalendar()[1]
|
calendar_week = today.isocalendar()[1]
|
||||||
weeks_stats = {calendar_week - 1: {'analysis': 0, 'analysis_with_redirects': 0, 'redirects': 0, 'uniq_urls': set()},
|
weeks_stats: Dict[int, Dict] = {calendar_week - 1: {'analysis': 0, 'analysis_with_redirects': 0, 'redirects': 0, 'uniq_urls': set()},
|
||||||
calendar_week: {'analysis': 0, 'analysis_with_redirects': 0, 'redirects': 0, 'uniq_urls': set()}}
|
calendar_week: {'analysis': 0, 'analysis_with_redirects': 0, 'redirects': 0, 'uniq_urls': set()}}
|
||||||
statistics: Dict[str, Any] = {'weeks': [],'years':[]}
|
statistics: Dict[str, List] = {'weeks': [], 'years': []}
|
||||||
for uuid in self.capture_uuids:
|
for uuid in self.capture_uuids:
|
||||||
cache = self.capture_cache(uuid)
|
cache = self.capture_cache(uuid)
|
||||||
if 'timestamp' not in cache:
|
if 'timestamp' not in cache:
|
||||||
continue
|
continue
|
||||||
date_analysis = datetime.fromisoformat(cache['timestamp'].rstrip('Z'))
|
date_analysis: datetime = datetime.fromisoformat(cache['timestamp'].rstrip('Z')) # type: ignore
|
||||||
if date_analysis.year not in stats:
|
if date_analysis.year not in stats:
|
||||||
stats[date_analysis.year] = {}
|
stats[date_analysis.year] = {}
|
||||||
if date_analysis.month not in stats[date_analysis.year]:
|
if date_analysis.month not in stats[date_analysis.year]:
|
||||||
stats[date_analysis.year][date_analysis.month] = {'analysis': 0, 'analysis_with_redirects' :0, 'redirects': 0, 'uniq_urls': set()}
|
stats[date_analysis.year][date_analysis.month] = {'analysis': 0, 'analysis_with_redirects': 0, 'redirects': 0, 'uniq_urls': set()}
|
||||||
stats[date_analysis.year][date_analysis.month]['analysis'] += 1
|
stats[date_analysis.year][date_analysis.month]['analysis'] += 1
|
||||||
if len(cache['redirects']) > 0:
|
if len(cache['redirects']) > 0: # type: ignore
|
||||||
stats[date_analysis.year][date_analysis.month]['analysis_with_redirects'] += 1
|
stats[date_analysis.year][date_analysis.month]['analysis_with_redirects'] += 1
|
||||||
stats[date_analysis.year][date_analysis.month]['redirects'] += len(cache['redirects'])
|
stats[date_analysis.year][date_analysis.month]['redirects'] += len(cache['redirects']) # type: ignore
|
||||||
stats[date_analysis.year][date_analysis.month]['uniq_urls'].update(cache['redirects'])
|
stats[date_analysis.year][date_analysis.month]['uniq_urls'].update(cache['redirects'])
|
||||||
stats[date_analysis.year][date_analysis.month]['uniq_urls'].add(cache['url'])
|
stats[date_analysis.year][date_analysis.month]['uniq_urls'].add(cache['url'])
|
||||||
if date_analysis.isocalendar()[1] in weeks_stats:
|
if date_analysis.isocalendar()[1] in weeks_stats:
|
||||||
weeks_stats[date_analysis.isocalendar()[1]]['analysis'] += 1
|
weeks_stats[date_analysis.isocalendar()[1]]['analysis'] += 1
|
||||||
if len(cache['redirects']) > 0:
|
if len(cache['redirects']) > 0: # type: ignore
|
||||||
weeks_stats[date_analysis.isocalendar()[1]]['analysis_with_redirects'] += 1
|
weeks_stats[date_analysis.isocalendar()[1]]['analysis_with_redirects'] += 1
|
||||||
weeks_stats[date_analysis.isocalendar()[1]]['redirects'] += len(cache['redirects'])
|
weeks_stats[date_analysis.isocalendar()[1]]['redirects'] += len(cache['redirects']) # type: ignore
|
||||||
weeks_stats[date_analysis.isocalendar()[1]]['uniq_urls'].update(cache['redirects'])
|
weeks_stats[date_analysis.isocalendar()[1]]['uniq_urls'].update(cache['redirects'])
|
||||||
weeks_stats[date_analysis.isocalendar()[1]]['uniq_urls'].add(cache['url'])
|
weeks_stats[date_analysis.isocalendar()[1]]['uniq_urls'].add(cache['url'])
|
||||||
for week_number, week_stat in weeks_stats.items():
|
for week_number, week_stat in weeks_stats.items():
|
||||||
week={}
|
week = {}
|
||||||
week['week']= week_number
|
week['week'] = week_number
|
||||||
week['analysis']= week_stat['analysis']
|
week['analysis'] = week_stat['analysis']
|
||||||
week['analysis_with_redirects']= week_stat['analysis_with_redirects']
|
week['analysis_with_redirects'] = week_stat['analysis_with_redirects']
|
||||||
week['redirects']= week_stat['redirects']
|
week['redirects'] = week_stat['redirects']
|
||||||
week['uniq_urls'] = len(week_stat['uniq_urls'])
|
week['uniq_urls'] = len(week_stat['uniq_urls'])
|
||||||
week['uniq_domains'] =len(uniq_domains(week_stat['uniq_urls']))
|
week['uniq_domains'] = len(uniq_domains(week_stat['uniq_urls']))
|
||||||
statistics['weeks'].append(week)
|
statistics['weeks'].append(week)
|
||||||
for year, data in stats.items():
|
for year, data in stats.items():
|
||||||
years={}
|
years: Dict[str, Union[Dict, int]] = {}
|
||||||
years['year']=year
|
years['year'] = year
|
||||||
yearly_analysis = 0
|
yearly_analysis = 0
|
||||||
yearly_redirects = 0
|
yearly_redirects = 0
|
||||||
for month in sorted(data.keys()):
|
for month in sorted(data.keys()):
|
||||||
stats = data[month]
|
_stats = data[month]
|
||||||
mstats = {}
|
mstats = {}
|
||||||
mstats['month'] = month
|
mstats['month'] = month
|
||||||
mstats['analysys'] = stats['analysis']
|
mstats['analysys'] = _stats['analysis']
|
||||||
mstats['analysis_with_redirects'] = stats['analysis_with_redirects']
|
mstats['analysis_with_redirects'] = _stats['analysis_with_redirects']
|
||||||
mstats['redirects'] = stats['redirects']
|
mstats['redirects'] = _stats['redirects']
|
||||||
mstats['uniq_url'] = len(stats['uniq_urls'])
|
mstats['uniq_url'] = len(_stats['uniq_urls'])
|
||||||
mstats['uniq_domains'] = len(uniq_domains(stats['uniq_urls']))
|
mstats['uniq_domains'] = len(uniq_domains(_stats['uniq_urls']))
|
||||||
yearly_analysis += stats['analysis']
|
yearly_analysis += _stats['analysis']
|
||||||
yearly_redirects += stats['redirects']
|
yearly_redirects += _stats['redirects']
|
||||||
years[calendar.month_name[month]] = mstats
|
years[calendar.month_name[month]] = mstats
|
||||||
years['yearly_analysis'] = yearly_analysis
|
years['yearly_analysis'] = yearly_analysis
|
||||||
years['yearly_redirects'] = yearly_redirects
|
years['yearly_redirects'] = yearly_redirects
|
||||||
|
|
|
@ -678,5 +678,5 @@ def json_hostname_info():
|
||||||
|
|
||||||
@app.route('/json/stats', methods=['GET'])
|
@app.route('/json/stats', methods=['GET'])
|
||||||
def json_stats():
|
def json_stats():
|
||||||
to_return=lookyloo.get_stats()
|
to_return = lookyloo.get_stats()
|
||||||
return jsonify(to_return)
|
return jsonify(to_return)
|
||||||
|
|
Loading…
Reference in New Issue