chg: Add option to aggregare by country

pull/562/head
Raphaël Vinot 2020-03-24 13:25:41 +01:00
parent a9ad33e8e3
commit c6656a1a2e
1 changed files with 100 additions and 19 deletions

View File

@ -3,18 +3,95 @@
from pathlib import Path from pathlib import Path
from csv import DictReader from csv import DictReader
from pymisp import MISPEvent, MISPOrganisation, PyMISP from pymisp import MISPEvent, MISPOrganisation, PyMISP, MISPObject
from datetime import datetime from datetime import datetime
from dateutil.parser import parse from dateutil.parser import parse
import json import json
from pymisp.tools import feed_meta_generator from pymisp.tools import feed_meta_generator
from io import BytesIO from io import BytesIO
from collections import defaultdict
make_feed = False make_feed = False
aggregate_by_country = True
path = Path('/home/raphael/gits/COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/') path = Path('/home/raphael/gits/COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/')
def get_country_region(row):
if 'Country/Region' in row:
return row['Country/Region']
elif 'Country_Region' in row:
return row['Country_Region']
else:
print(p, row.keys())
raise Exception()
def get_last_update(row):
if 'Last_Update' in row:
return parse(row['Last_Update'])
elif 'Last Update' in row:
return parse(row['Last Update'])
else:
print(p, row.keys())
raise Exception()
def add_detailed_object(obj, row):
if 'Province/State' in row:
if row['Province/State']:
obj.add_attribute('province-state', row['Province/State'])
elif '\ufeffProvince/State' in row:
if row['\ufeffProvince/State']:
obj.add_attribute('province-state', row['\ufeffProvince/State'])
elif 'Province_State' in row:
if row['Province_State']:
obj.add_attribute('province-state', row['Province_State'])
else:
print(p, row.keys())
raise Exception()
obj.add_attribute('country-region', get_country_region(row))
obj.add_attribute('update', get_last_update(row))
if 'Lat' in row:
obj.add_attribute('latitude', row['Lat'])
if 'Long_' in row:
obj.add_attribute('longitude', row['Long_'])
elif 'Long' in row:
obj.add_attribute('longitude', row['Long'])
if row['Confirmed']:
obj.add_attribute('confirmed', int(row['Confirmed']))
if row['Deaths']:
obj.add_attribute('death', int(row['Deaths']))
if row['Recovered']:
obj.add_attribute('recovered', int(row['Recovered']))
if 'Active' in row and row['Active']:
obj.add_attribute('active', int(row['Active']))
def country_aggregate(aggregate, row):
c = get_country_region(row)
if c not in aggregate:
aggregate[c] = defaultdict(active=0, death=0, recovered=0, confirmed=0, update=datetime.fromtimestamp(0))
if row['Confirmed']:
aggregate[c]['confirmed'] += int(row['Confirmed'])
if row['Deaths']:
aggregate[c]['death'] += int(row['Deaths'])
if row['Recovered']:
aggregate[c]['recovered'] += int(row['Recovered'])
if 'Active' in row and row['Active']:
aggregate[c]['active'] += int(row['Active'])
update = get_last_update(row)
if update > aggregate[c]['update']:
aggregate[c]['update'] = update
if make_feed: if make_feed:
org = MISPOrganisation() org = MISPOrganisation()
org.name = 'CIRCL' org.name = 'CIRCL'
@ -26,7 +103,10 @@ else:
for p in path.glob('**/*.csv'): for p in path.glob('**/*.csv'):
d = datetime.strptime(p.name[:-4], '%m-%d-%Y').date() d = datetime.strptime(p.name[:-4], '%m-%d-%Y').date()
event = MISPEvent() event = MISPEvent()
event.info = f"[{d.isoformat()}] CSSE COVID-19 daily report" if aggregate_by_country:
event.info = f"[{d.isoformat()}] CSSE COVID-19 daily report"
else:
event.info = f"[{d.isoformat()}] CSSE COVID-19 detailed daily report"
event.date = d event.date = d
event.distribution = 3 event.distribution = 3
event.add_tag('tlp:white') event.add_tag('tlp:white')
@ -39,27 +119,28 @@ for p in path.glob('**/*.csv'):
continue continue
event.add_attribute('attachment', p.name, data=BytesIO(p.open('rb').read())) event.add_attribute('attachment', p.name, data=BytesIO(p.open('rb').read()))
event.add_attribute('link', f'https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports/{p.name}', comment='Source') event.add_attribute('link', f'https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports/{p.name}', comment='Source')
if aggregate_by_country:
aggregate = defaultdict()
with p.open() as f: with p.open() as f:
reader = DictReader(f) reader = DictReader(f)
for row in reader: for row in reader:
obj = event.add_object(name='covid19-csse-daily-report', standalone=False) if aggregate_by_country:
if 'Province/State' in row: country_aggregate(aggregate, row)
if row['Province/State']:
obj.add_attribute('province-state', row['Province/State'])
elif '\ufeffProvince/State' in row:
if row['\ufeffProvince/State']:
obj.add_attribute('province-state', row['\ufeffProvince/State'])
else: else:
print(p, row.keys()) obj = MISPObject(name='covid19-csse-daily-report')
raise Exception() add_detailed_object(obj, row)
obj.add_attribute('country-region', row['Country/Region']) event.add_object(obj)
obj.add_attribute('update', parse(row['Last Update']))
if row['Confirmed']: if aggregate_by_country:
obj.add_attribute('confirmed', int(row['Confirmed'])) for country, values in aggregate.items():
if row['Deaths']: obj = event.add_object(name='covid19-csse-daily-report', standalone=False)
obj.add_attribute('death', int(row['Deaths'])) obj.add_attribute('country-region', country)
if row['Recovered']: obj.add_attribute('update', values['update'])
obj.add_attribute('recovered', int(row['Recovered'])) obj.add_attribute('confirmed', values['confirmed'])
obj.add_attribute('death', values['death'])
obj.add_attribute('recovered', values['recovered'])
obj.add_attribute('active', values['active'])
if make_feed: if make_feed:
with (Path('output') / f'{event.uuid}.json').open('w') as _w: with (Path('output') / f'{event.uuid}.json').open('w') as _w:
json.dump(event.to_feed(), _w) json.dump(event.to_feed(), _w)