From c6656a1a2eed6e180dfeb190bdaa9ceb0571c6f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Tue, 24 Mar 2020 13:25:41 +0100 Subject: [PATCH] chg: Add option to aggregare by country --- examples/covid19/import_csse_covid19_daily.py | 119 +++++++++++++++--- 1 file changed, 100 insertions(+), 19 deletions(-) diff --git a/examples/covid19/import_csse_covid19_daily.py b/examples/covid19/import_csse_covid19_daily.py index 2f6cf16..4d3561f 100755 --- a/examples/covid19/import_csse_covid19_daily.py +++ b/examples/covid19/import_csse_covid19_daily.py @@ -3,18 +3,95 @@ from pathlib import Path from csv import DictReader -from pymisp import MISPEvent, MISPOrganisation, PyMISP +from pymisp import MISPEvent, MISPOrganisation, PyMISP, MISPObject from datetime import datetime from dateutil.parser import parse import json from pymisp.tools import feed_meta_generator from io import BytesIO +from collections import defaultdict make_feed = False +aggregate_by_country = True + path = Path('/home/raphael/gits/COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/') +def get_country_region(row): + if 'Country/Region' in row: + return row['Country/Region'] + elif 'Country_Region' in row: + return row['Country_Region'] + else: + print(p, row.keys()) + raise Exception() + + +def get_last_update(row): + if 'Last_Update' in row: + return parse(row['Last_Update']) + elif 'Last Update' in row: + return parse(row['Last Update']) + else: + print(p, row.keys()) + raise Exception() + + +def add_detailed_object(obj, row): + if 'Province/State' in row: + if row['Province/State']: + obj.add_attribute('province-state', row['Province/State']) + elif '\ufeffProvince/State' in row: + if row['\ufeffProvince/State']: + obj.add_attribute('province-state', row['\ufeffProvince/State']) + elif 'Province_State' in row: + if row['Province_State']: + obj.add_attribute('province-state', row['Province_State']) + else: + print(p, row.keys()) + raise Exception() + + obj.add_attribute('country-region', get_country_region(row)) + + obj.add_attribute('update', get_last_update(row)) + + if 'Lat' in row: + obj.add_attribute('latitude', row['Lat']) + + if 'Long_' in row: + obj.add_attribute('longitude', row['Long_']) + elif 'Long' in row: + obj.add_attribute('longitude', row['Long']) + + if row['Confirmed']: + obj.add_attribute('confirmed', int(row['Confirmed'])) + if row['Deaths']: + obj.add_attribute('death', int(row['Deaths'])) + if row['Recovered']: + obj.add_attribute('recovered', int(row['Recovered'])) + if 'Active' in row and row['Active']: + obj.add_attribute('active', int(row['Active'])) + + +def country_aggregate(aggregate, row): + c = get_country_region(row) + if c not in aggregate: + aggregate[c] = defaultdict(active=0, death=0, recovered=0, confirmed=0, update=datetime.fromtimestamp(0)) + if row['Confirmed']: + aggregate[c]['confirmed'] += int(row['Confirmed']) + if row['Deaths']: + aggregate[c]['death'] += int(row['Deaths']) + if row['Recovered']: + aggregate[c]['recovered'] += int(row['Recovered']) + if 'Active' in row and row['Active']: + aggregate[c]['active'] += int(row['Active']) + + update = get_last_update(row) + if update > aggregate[c]['update']: + aggregate[c]['update'] = update + + if make_feed: org = MISPOrganisation() org.name = 'CIRCL' @@ -26,7 +103,10 @@ else: for p in path.glob('**/*.csv'): d = datetime.strptime(p.name[:-4], '%m-%d-%Y').date() event = MISPEvent() - event.info = f"[{d.isoformat()}] CSSE COVID-19 daily report" + if aggregate_by_country: + event.info = f"[{d.isoformat()}] CSSE COVID-19 daily report" + else: + event.info = f"[{d.isoformat()}] CSSE COVID-19 detailed daily report" event.date = d event.distribution = 3 event.add_tag('tlp:white') @@ -39,27 +119,28 @@ for p in path.glob('**/*.csv'): continue event.add_attribute('attachment', p.name, data=BytesIO(p.open('rb').read())) event.add_attribute('link', f'https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports/{p.name}', comment='Source') + if aggregate_by_country: + aggregate = defaultdict() with p.open() as f: reader = DictReader(f) for row in reader: - obj = event.add_object(name='covid19-csse-daily-report', standalone=False) - if 'Province/State' in row: - if row['Province/State']: - obj.add_attribute('province-state', row['Province/State']) - elif '\ufeffProvince/State' in row: - if row['\ufeffProvince/State']: - obj.add_attribute('province-state', row['\ufeffProvince/State']) + if aggregate_by_country: + country_aggregate(aggregate, row) else: - print(p, row.keys()) - raise Exception() - obj.add_attribute('country-region', row['Country/Region']) - obj.add_attribute('update', parse(row['Last Update'])) - if row['Confirmed']: - obj.add_attribute('confirmed', int(row['Confirmed'])) - if row['Deaths']: - obj.add_attribute('death', int(row['Deaths'])) - if row['Recovered']: - obj.add_attribute('recovered', int(row['Recovered'])) + obj = MISPObject(name='covid19-csse-daily-report') + add_detailed_object(obj, row) + event.add_object(obj) + + if aggregate_by_country: + for country, values in aggregate.items(): + obj = event.add_object(name='covid19-csse-daily-report', standalone=False) + obj.add_attribute('country-region', country) + obj.add_attribute('update', values['update']) + obj.add_attribute('confirmed', values['confirmed']) + obj.add_attribute('death', values['death']) + obj.add_attribute('recovered', values['recovered']) + obj.add_attribute('active', values['active']) + if make_feed: with (Path('output') / f'{event.uuid}.json').open('w') as _w: json.dump(event.to_feed(), _w)