mirror of https://github.com/MISP/PyMISP
278 lines
8.5 KiB
Python
278 lines
8.5 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
from json import JSONDecoder
|
|
import math
|
|
import random
|
|
import pygal
|
|
from pygal.style import Style
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
from dateutil.parser import parse
|
|
import sys
|
|
|
|
################ Errors ################
|
|
|
|
class DateError(Exception):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
################ Tools ################
|
|
|
|
def buildDoubleIndex(index1, index2, datatype):
|
|
it = -1
|
|
newindex1 = []
|
|
for index in index2:
|
|
if index == 0:
|
|
it+=1
|
|
newindex1.append(index1[it])
|
|
arrays = [newindex1, index2]
|
|
tuples = list(zip(*arrays))
|
|
return pd.MultiIndex.from_tuples(tuples, names=['event', datatype])
|
|
|
|
def buildNewColumn(index2, column):
|
|
it = -1
|
|
newcolumn = []
|
|
for index in index2:
|
|
if index == 0:
|
|
it+=1
|
|
newcolumn.append(column[it])
|
|
return newcolumn
|
|
|
|
def dateInRange(datetimeTested, begin=None, end=None):
|
|
if begin == None:
|
|
begin = datetime(1970,1,1)
|
|
if end == None:
|
|
end = datetime.now()
|
|
return begin <= datetimeTested <= end
|
|
|
|
def addColumn(dataframe, columnList, columnName):
|
|
dataframe.loc[:, columnName] = pd.Series(columnList, index=dataframe.index)
|
|
|
|
def dateInRange(datetimeTested, begin=None, end=None):
|
|
if begin == None:
|
|
begin = datetime(1970,1,1)
|
|
if end == None:
|
|
end = datetime.now()
|
|
return begin <= datetimeTested <= end
|
|
|
|
def toDatetime(date):
|
|
return parse(date)
|
|
|
|
def checkDateConsistancy(begindate, enddate, lastdate):
|
|
try:
|
|
if begindate is not None and enddate is not None:
|
|
if begindate > enddate:
|
|
raise DateError('begindate (' + begindate + ') cannot be after enddate (' + enddate + ')')
|
|
except DateError as e:
|
|
print('DateError: ' + e.value)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
if enddate is not None:
|
|
if toDatetime(enddate) < lastdate:
|
|
raise DateError('enddate (' + enddate + ') cannot be before lastdate (' + str(lastdate) + ')' )
|
|
except DateError as e:
|
|
print('DateError: ' + e.value)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
if begindate is not None:
|
|
if toDatetime(begindate) > datetime.now():
|
|
raise DateError('begindate (' + begindate + ') cannot be after today (' + str(datetime.now().date()) + ')')
|
|
except DateError as e:
|
|
print('DateError: ' + e.value)
|
|
sys.exit(1)
|
|
|
|
def setBegindate(begindate, lastdate):
|
|
return max(begindate, lastdate)
|
|
|
|
def setEnddate(enddate):
|
|
return min(enddate, datetime.now())
|
|
|
|
def getLastdate(last):
|
|
return (datetime.now() - timedelta(days=int(last))).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
################ Formatting ################
|
|
|
|
def eventsListBuildFromList(filename):
|
|
with open(filename, 'r') as myfile:
|
|
s=myfile.read().replace('\n', '')
|
|
decoder = JSONDecoder()
|
|
s_len = len(s)
|
|
Events = []
|
|
end = 0
|
|
while end != s_len:
|
|
Event, end = decoder.raw_decode(s, idx=end)
|
|
Events.append(Event)
|
|
data = []
|
|
for e in Events:
|
|
data.append(pd.DataFrame.from_dict(e, orient='index'))
|
|
Events = pd.concat(data)
|
|
for it in range(Events['attribute_count'].size):
|
|
if Events['attribute_count'][it] == None:
|
|
Events['attribute_count'][it]='0'
|
|
else:
|
|
Events['attribute_count'][it]=int(Events['attribute_count'][it])
|
|
Events = Events.set_index('id')
|
|
return Events
|
|
|
|
def eventsListBuildFromArray(filename):
|
|
'''
|
|
returns a structure listing all primary events in the sample
|
|
'''
|
|
jdata = json.load(open(filename))
|
|
jdata = jdata['response']
|
|
Events = []
|
|
for e in jdata:
|
|
Events.append(e)
|
|
data = []
|
|
for e in Events:
|
|
data.append(pd.DataFrame.from_dict(e, orient='index'))
|
|
Events = pd.concat(data)
|
|
for it in range(Events['attribute_count'].size):
|
|
if Events['attribute_count'][it] == None or (isinstance(Events['attribute_count'][it], float) and math.isnan(Events['attribute_count'][it])):
|
|
Events['attribute_count'][it]='0'
|
|
else:
|
|
Events['attribute_count'][it]=int(Events['attribute_count'][it])
|
|
Events = Events.set_index('id')
|
|
return Events
|
|
|
|
def attributesListBuild(Events):
|
|
Attributes = []
|
|
for Attribute in Events['Attribute']:
|
|
Attributes.append(pd.DataFrame(Attribute))
|
|
return pd.concat(Attributes)
|
|
|
|
def tagsListBuild(Events):
|
|
Tags = []
|
|
for Tag in Events['Tag']:
|
|
if type(Tag) is not list:
|
|
continue
|
|
Tags.append(pd.DataFrame(Tag))
|
|
Tags = pd.concat(Tags)
|
|
columnDate = buildNewColumn(Tags.index, Events['date'])
|
|
addColumn(Tags, columnDate, 'date')
|
|
index = buildDoubleIndex(Events.index, Tags.index, 'tag')
|
|
Tags = Tags.set_index(index)
|
|
return Tags
|
|
|
|
def selectInRange(Events, begin=None, end=None):
|
|
inRange = []
|
|
for i, Event in Events.iterrows():
|
|
if dateInRange(parse(Event['date']), begin, end):
|
|
inRange.append(Event.tolist())
|
|
inRange = pd.DataFrame(inRange)
|
|
temp = Events.columns.tolist()
|
|
inRange.columns = temp
|
|
return inRange
|
|
|
|
def isTagIn(dataframe, tag):
|
|
temp = dataframe[dataframe['name'].str.contains(tag)].index.tolist()
|
|
index = []
|
|
for i in range(len(temp)):
|
|
if temp[i][0] not in index:
|
|
index.append(temp[i][0])
|
|
return index
|
|
|
|
################ Basic Stats ################
|
|
|
|
def getNbitems(dataframe):
|
|
return len(dataframe.index)
|
|
|
|
def getNbAttributePerEventCategoryType(Attributes):
|
|
return Attributes.groupby(['event_id', 'category', 'type']).count()['id']
|
|
|
|
def getNbOccurenceTags(Tags):
|
|
return Tags.groupby('name').count()['id']
|
|
|
|
################ Charts ################
|
|
|
|
def createStyle(indexlevels):
|
|
colorsList = []
|
|
for i in range(len(indexlevels[0])):
|
|
colorsList.append("#%06X" % random.randint(0, 0xFFFFFF))
|
|
style = Style(
|
|
background='transparent',
|
|
plot_background='#FFFFFF',
|
|
foreground='#111111',
|
|
foreground_strong='#111111',
|
|
foreground_subtle='#111111',
|
|
opacity='.6',
|
|
opacity_hover='.9',
|
|
transition='400ms ease-in',
|
|
colors=tuple(colorsList))
|
|
return style, colorsList
|
|
|
|
def createLabelsTreemap(indexlevels, indexlabels):
|
|
categories_levels = indexlevels[0]
|
|
cat = 0
|
|
types = []
|
|
cattypes = []
|
|
categories_labels = indexlabels[0]
|
|
types_levels = indexlevels[1]
|
|
types_labels = indexlabels[1]
|
|
|
|
for it in range(len(indexlabels[0])):
|
|
if categories_labels[it] != cat:
|
|
cattypes.append(types)
|
|
types = []
|
|
cat += 1
|
|
|
|
types.append(types_levels[types_labels[it]])
|
|
cattypes.append(types)
|
|
|
|
return categories_levels, cattypes
|
|
|
|
|
|
def createTable(data, title, tablename, colorsList):
|
|
if tablename == None:
|
|
target = open('attribute_table.html', 'w')
|
|
else:
|
|
target = open(tablename, 'w')
|
|
target.truncate()
|
|
target.write('<!DOCTYPE html>\n<html>\n<head>\n<link rel="stylesheet" href="style.css">\n</head>\n<body>')
|
|
categories, types = createLabelsTreemap(data.index.levels, data.index.labels)
|
|
it = 0
|
|
|
|
for i in range(len(categories)):
|
|
table = pygal.Treemap(pretty_print=True)
|
|
target.write('\n <h1 style="color:'+ colorsList[i]+ ';">' + categories[i] + '</h1>\n')
|
|
for typ in types[i]:
|
|
table.add(typ, data[it])
|
|
it += 1
|
|
target.write(table.render_table(transpose=True))
|
|
target.write('\n</body>\n</html>')
|
|
target.close()
|
|
|
|
|
|
def createTreemap(data, title, treename = 'attribute_treemap.svg', tablename = 'attribute_table.html'):
|
|
style, colorsList = createStyle(data.index.levels)
|
|
treemap = pygal.Treemap(pretty_print=True, legend_at_bottom=True, style = style)
|
|
treemap.title = title
|
|
treemap.print_values = True
|
|
treemap.print_labels = True
|
|
|
|
categories, types = createLabelsTreemap(data.index.levels, data.index.labels)
|
|
it = 0
|
|
|
|
for i in range(len(categories)):
|
|
types_labels = []
|
|
for typ in types[i]:
|
|
tempdict = {}
|
|
tempdict['label'] = typ
|
|
tempdict['value'] = data[it]
|
|
types_labels.append(tempdict)
|
|
it += 1
|
|
treemap.add(categories[i], types_labels)
|
|
|
|
createTable(data, 'Attribute Distribution', tablename, colorsList)
|
|
if treename == None:
|
|
treemap.render_to_file('attribute_treemap.svg')
|
|
else:
|
|
treemap.render_to_file(treename)
|