Merge branch 'master' into travisfix

pull/53/head
iglocska 2017-02-21 10:02:39 +01:00
commit 0f6d7907c2
17 changed files with 745 additions and 169 deletions

View File

@ -31,6 +31,6 @@ if __name__ == '__main__':
attribute = temp
break
misp.add_tag(attribute, args.tag, True)
misp.add_tag(attribute, args.tag, attribute=True)
else:
misp.add_tag(event['Event'], args.tag)

View File

@ -5,7 +5,7 @@ from pymisp import PyMISP
from keys import misp_url, misp_key, misp_verifycert
import argparse
import tools
import pygal_tools
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Take a sample of events (based on last.py of searchall.py) and create a treemap epresenting the distribution of attributes in this sample.')
@ -26,6 +26,6 @@ if __name__ == '__main__':
attributes = tools.attributesListBuild(events)
temp = tools.getNbAttributePerEventCategoryType(attributes)
temp = temp.groupby(level=['category', 'type']).sum()
tools.createTreemap(temp, 'Attributes Distribution', 'attribute_treemap.svg', 'attribute_table.html')
pygal_tools.createTreemap(temp, 'Attributes Distribution', 'attribute_treemap.svg', 'attribute_table.html')
else:
print ('There is no event answering the research criteria')

View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from bokeh.plotting import figure, output_file, show, ColumnDataSource
from bokeh.models import HoverTool
import date_tools
def tagsDistributionScatterPlot(NbTags, dates, plotname='Tags Distribution Plot'):
output_file(plotname + ".html")
counts = {}
glyphs = {}
desc = {}
hover = HoverTool()
plot = figure(plot_width=800, plot_height=800, x_axis_type="datetime", x_axis_label='Date', y_axis_label='Number of tags', tools=[hover])
for name in NbTags.keys():
desc[name] = []
for date in dates[name]:
desc[name].append(date_tools.datetimeToString(date, "%Y-%m-%d"))
counts[name] = plot.circle(dates[name], NbTags[name], legend="Number of events with y tags", source=ColumnDataSource(
data=dict(
desc=desc[name]
)
))
glyphs[name] = counts[name].glyph
glyphs[name].size = int(name) * 2
hover.tooltips = [("date", "@desc")]
if int(name) != 0:
glyphs[name].fill_alpha = 1/int(name)
show(plot)

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
from datetime import timedelta
from dateutil.parser import parse
class DateError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
# ############### Date Tools ################
def dateInRange(datetimeTested, begin=None, end=None):
if begin is None:
begin = datetime(1970, 1, 1)
if end is None:
end = datetime.now()
return begin <= datetimeTested <= end
def toDatetime(date):
return parse(date)
def datetimeToString(datetime, formatstring):
return datetime.strftime(formatstring)
def checkDateConsistancy(begindate, enddate, lastdate):
if begindate is not None and enddate is not None:
if begindate > enddate:
raise DateError('begindate ({}) cannot be after enddate ({})'.format(begindate, enddate))
if enddate is not None:
if toDatetime(enddate) < lastdate:
raise DateError('enddate ({}) cannot be before lastdate ({})'.format(enddate, lastdate))
if begindate is not None:
if toDatetime(begindate) > datetime.now():
raise DateError('begindate ({}) cannot be after today ({})'.format(begindate, datetime.now().date()))
def setBegindate(begindate, lastdate):
return max(begindate, lastdate)
def setEnddate(enddate):
return min(enddate, datetime.now())
def getLastdate(last):
return (datetime.now() - timedelta(days=int(last))).replace(hour=0, minute=0, second=0, microsecond=0)
def getNDaysBefore(date, days):
return (date - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
def getToday():
return (datetime.now()).replace(hour=0, minute=0, second=0, microsecond=0)
def days_between(date_1, date_2):
return abs((date_2 - date_1).days)

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pygal
from pygal.style import Style
import pandas
import random
def createTable(colors, categ_types_hash, tablename='attribute_table.html'):
with open(tablename, 'w') as target:
target.write('<!DOCTYPE html>\n<html>\n<head>\n<link rel="stylesheet" href="style.css">\n</head>\n<body>')
for categ_name, types in categ_types_hash.items():
table = pygal.Treemap(pretty_print=True)
target.write('\n <h1 style="color:{};">{}</h1>\n'.format(colors[categ_name], categ_name))
for d in types:
table.add(d['label'], d['value'])
target.write(table.render_table(transpose=True))
target.write('\n</body>\n</html>')
def createTreemap(data, title, treename='attribute_treemap.svg', tablename='attribute_table.html'):
labels_categ = data.index.labels[0]
labels_types = data.index.labels[1]
names_categ = data.index.levels[0]
names_types = data.index.levels[1]
categ_types_hash = {}
for categ_id, type_val, total in zip(labels_categ, labels_types, data):
if not categ_types_hash.get(names_categ[categ_id]):
categ_types_hash[names_categ[categ_id]] = []
dict_to_print = {'label': names_types[type_val], 'value': total}
categ_types_hash[names_categ[categ_id]].append(dict_to_print)
colors = {categ: "#%06X" % random.randint(0, 0xFFFFFF) for categ in categ_types_hash.keys()}
style = Style(background='transparent',
plot_background='#FFFFFF',
foreground='#111111',
foreground_strong='#111111',
foreground_subtle='#111111',
opacity='.6',
opacity_hover='.9',
transition='400ms ease-in',
colors=tuple(colors.values()))
treemap = pygal.Treemap(pretty_print=True, legend_at_bottom=True, style=style)
treemap.title = title
treemap.print_values = True
treemap.print_labels = True
for categ_name, types in categ_types_hash.items():
treemap.add(categ_name, types)
createTable(colors, categ_types_hash)
treemap.render_to_file(treename)

View File

@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key, misp_verifycert
import argparse
import numpy
import tools
import date_tools
import bokeh_tools
import time
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Show the evolution of trend of tags.')
parser.add_argument("-d", "--days", type=int, required=True, help='')
parser.add_argument("-s", "--begindate", required=True, help='format yyyy-mm-dd')
parser.add_argument("-e", "--enddate", required=True, help='format yyyy-mm-dd')
args = parser.parse_args()
misp = PyMISP(misp_url, misp_key, misp_verifycert)
result = misp.search(date_from=args.begindate, date_to=args.enddate, metadata=False)
# Getting data
if 'response' in result:
events = tools.eventsListBuildFromArray(result)
NbTags = []
dates = []
enddate = date_tools.toDatetime(args.enddate)
begindate = date_tools.toDatetime(args.begindate)
for i in range(round(date_tools.days_between(enddate, begindate)/args.days)):
begindate = date_tools.getNDaysBefore(enddate, args.days)
eventstemp = tools.selectInRange(events, begindate, enddate)
if eventstemp is not None:
for event in eventstemp.iterrows():
if 'Tag' in event[1]:
dates.append(enddate)
if isinstance(event[1]['Tag'], list):
NbTags.append(len(event[1]['Tag']))
else:
NbTags.append(0)
enddate = begindate
# Prepare plot
NbTagsPlot = {}
datesPlot = {}
for i in range(len(NbTags)):
if NbTags[i] == -1:
continue
count = 1
for j in range(i+1, len(NbTags)):
if NbTags[i] == NbTags[j] and dates[i] == dates[j]:
count = count + 1
NbTags[j] = -1
if str(count) in NbTagsPlot:
NbTagsPlot[str(count)].append(NbTags[i])
datesPlot[str(count)].append(dates[i])
else:
NbTagsPlot[str(count)] = [NbTags[i]]
datesPlot[str(count)] = [dates[i]]
NbTags[i] = -1
# Plot
bokeh_tools.tagsDistributionScatterPlot(NbTagsPlot, datesPlot)

View File

@ -6,6 +6,7 @@ from keys import misp_url, misp_key, misp_verifycert
from datetime import datetime
import argparse
import tools
import date_tools
def init(url, key):
@ -29,17 +30,17 @@ if __name__ == '__main__':
args.days = 7
result = misp.search(last='{}d'.format(args.days), metadata=True)
tools.checkDateConsistancy(args.begindate, args.enddate, tools.getLastdate(args.days))
date_tools.checkDateConsistancy(args.begindate, args.enddate, date_tools.getLastdate(args.days))
if args.begindate is None:
args.begindate = tools.getLastdate(args.days)
args.begindate = date_tools.getLastdate(args.days)
else:
args.begindate = tools.setBegindate(tools.toDatetime(args.begindate), tools.getLastdate(args.days))
args.begindate = date_tools.setBegindate(date_tools.toDatetime(args.begindate), tools.getLastdate(args.days))
if args.enddate is None:
args.enddate = datetime.now()
else:
args.enddate = tools.setEnddate(tools.toDatetime(args.enddate))
args.enddate = date_tools.setEnddate(date_tools.toDatetime(args.enddate))
if 'response' in result:
events = tools.selectInRange(tools.eventsListBuildFromArray(result), begin=args.begindate, end=args.enddate)

View File

@ -6,6 +6,7 @@ from keys import misp_url, misp_key, misp_verifycert
from datetime import datetime
import argparse
import tools
import date_tools
def init(url, key):
@ -28,17 +29,17 @@ if __name__ == '__main__':
args.days = 7
result = misp.search(last='{}d'.format(args.days), metadata=True)
tools.checkDateConsistancy(args.begindate, args.enddate, tools.getLastdate(args.days))
date_tools.checkDateConsistancy(args.begindate, args.enddate, date_tools.getLastdate(args.days))
if args.begindate is None:
args.begindate = tools.getLastdate(args.days)
args.begindate = date_tools.getLastdate(args.days)
else:
args.begindate = tools.setBegindate(tools.toDatetime(args.begindate), tools.getLastdate(args.days))
args.begindate = date_tools.setBegindate(date_tools.toDatetime(args.begindate), date_tools.getLastdate(args.days))
if args.enddate is None:
args.enddate = datetime.now()
else:
args.enddate = tools.setEnddate(tools.toDatetime(args.enddate))
args.enddate = date_tools.setEnddate(date_tools.toDatetime(args.enddate))
if 'response' in result:
events = tools.selectInRange(tools.eventsListBuildFromArray(result), begin=args.begindate, end=args.enddate)

View File

@ -5,6 +5,8 @@ from pymisp import PyMISP
from keys import misp_url, misp_key, misp_verifycert
import argparse
import tools
import date_tools
import bokeh_tools
def formattingDataframe(dataframe, dates, NanValue):
@ -54,12 +56,12 @@ if __name__ == '__main__':
events = tools.eventsListBuildFromArray(result)
result = []
dates = []
enddate = tools.getToday()
enddate = date_tools.getToday()
colourDict = {}
faketag = False
for i in range(split):
begindate = tools.getNDaysBefore(enddate, size)
begindate = date_tools.getNDaysBefore(enddate, size)
dates.append(str(enddate.date()))
eventstemp = tools.selectInRange(events, begin=begindate, end=enddate)
if eventstemp is not None:

View File

@ -6,7 +6,6 @@
height: 746px;
margin-top: 100px;
}
#treemap
{
width: 1000px;

View File

@ -2,13 +2,9 @@
# -*- coding: utf-8 -*-
from json import JSONDecoder
import random
import pygal
from pygal.style import Style
import pandas
from datetime import datetime
from datetime import timedelta
from dateutil.parser import parse
import numpy
from scipy import stats
from pytaxonomies import Taxonomies
@ -16,67 +12,25 @@ import re
import matplotlib.pyplot as plt
from matplotlib import pylab
import os
class DateError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
# ############### Date Tools ################
def dateInRange(datetimeTested, begin=None, end=None):
if begin is None:
begin = datetime(1970, 1, 1)
if end is None:
end = datetime.now()
return begin <= datetimeTested <= end
def toDatetime(date):
return parse(date)
def checkDateConsistancy(begindate, enddate, lastdate):
if begindate is not None and enddate is not None:
if begindate > enddate:
raise DateError('begindate ({}) cannot be after enddate ({})'.format(begindate, enddate))
if enddate is not None:
if toDatetime(enddate) < lastdate:
raise DateError('enddate ({}) cannot be before lastdate ({})'.format(enddate, lastdate))
if begindate is not None:
if toDatetime(begindate) > datetime.now():
raise DateError('begindate ({}) cannot be after today ({})'.format(begindate, datetime.now().date()))
def setBegindate(begindate, lastdate):
return max(begindate, lastdate)
def setEnddate(enddate):
return min(enddate, datetime.now())
def getLastdate(last):
return (datetime.now() - timedelta(days=int(last))).replace(hour=0, minute=0, second=0, microsecond=0)
def getNDaysBefore(date, days):
return (date - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
def getToday():
return (datetime.now()).replace(hour=0, minute=0, second=0, microsecond=0)
import date_tools
from dateutil.parser import parse
# ############### Tools ################
def selectInRange(Events, begin=None, end=None):
inRange = []
for i, Event in Events.iterrows():
if date_tools.dateInRange(parse(Event['date']), begin, end):
inRange.append(Event.tolist())
inRange = pandas.DataFrame(inRange)
temp = Events.columns.tolist()
if inRange.empty:
return None
inRange.columns = temp
return inRange
def getTaxonomies(dataframe):
taxonomies = Taxonomies()
taxonomies = list(taxonomies.keys())
@ -233,19 +187,6 @@ def tagsListBuild(Events):
return Tags
def selectInRange(Events, begin=None, end=None):
inRange = []
for i, Event in Events.iterrows():
if dateInRange(parse(Event['date']), begin, end):
inRange.append(Event.tolist())
inRange = pandas.DataFrame(inRange)
temp = Events.columns.tolist()
if inRange.empty:
return None
inRange.columns = temp
return inRange
def isTagIn(dataframe, tag):
temp = dataframe[dataframe['name'].str.contains(tag)].index.tolist()
index = []
@ -277,56 +218,10 @@ def getNbAttributePerEventCategoryType(attributes):
def getNbOccurenceTags(Tags):
return Tags.groupby('name').count()['id']
# ############### Charts ################
def createTable(colors, categ_types_hash, tablename='attribute_table.html'):
with open(tablename, 'w') as target:
target.write('<!DOCTYPE html>\n<html>\n<head>\n<link rel="stylesheet" href="style.css">\n</head>\n<body>')
for categ_name, types in categ_types_hash.items():
table = pygal.Treemap(pretty_print=True)
target.write('\n <h1 style="color:{};">{}</h1>\n'.format(colors[categ_name], categ_name))
for d in types:
table.add(d['label'], d['value'])
target.write(table.render_table(transpose=True))
target.write('\n</body>\n</html>')
def createTreemap(data, title, treename='attribute_treemap.svg', tablename='attribute_table.html'):
labels_categ = data.index.labels[0]
labels_types = data.index.labels[1]
names_categ = data.index.levels[0]
names_types = data.index.levels[1]
categ_types_hash = {}
for categ_id, type_val, total in zip(labels_categ, labels_types, data):
if not categ_types_hash.get(names_categ[categ_id]):
categ_types_hash[names_categ[categ_id]] = []
dict_to_print = {'label': names_types[type_val], 'value': total}
categ_types_hash[names_categ[categ_id]].append(dict_to_print)
colors = {categ: "#%06X" % random.randint(0, 0xFFFFFF) for categ in categ_types_hash.keys()}
style = Style(background='transparent',
plot_background='#FFFFFF',
foreground='#111111',
foreground_strong='#111111',
foreground_subtle='#111111',
opacity='.6',
opacity_hover='.9',
transition='400ms ease-in',
colors=tuple(colors.values()))
treemap = pygal.Treemap(pretty_print=True, legend_at_bottom=True, style=style)
treemap.title = title
treemap.print_values = True
treemap.print_labels = True
for categ_name, types in categ_types_hash.items():
treemap.add(categ_name, types)
createTable(colors, categ_types_hash)
treemap.render_to_file(treename)
def tagsToLineChart(dataframe, title, dates, colourDict):
style = createTagsPlotStyle(dataframe, colourDict)
line_chart = pygal.Line(x_label_rotation=20, style=style, show_legend=False)

93
examples/yara_dump.py Executable file
View File

@ -0,0 +1,93 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
YARA dumper for MISP
by Christophe Vandeplas
'''
import keys
from pymisp import PyMISP
import yara
import re
def dirty_cleanup(value):
changed = False
substitutions = (('', '"'),
('', '"'),
('', '"'),
('`', "'"),
('\r', '')
# ('$ ', '$'), # this breaks rules
# ('\t\t', '\n'), # this breaks rules
)
for substitution in substitutions:
if substitution[0] in value:
changed = True
value = value.replace(substitution[0], substitution[1])
return value, changed
misp = PyMISP(keys.misp_url, keys.misp_key, keys.misp_verify, 'json')
result = misp.search(controller='attributes', type_attribute='yara')
attr_cnt = 0
attr_cnt_invalid = 0
attr_cnt_duplicate = 0
attr_cnt_changed = 0
yara_rules = []
yara_rule_names = []
if 'response' in result and 'Attribute' in result['response']:
for attribute in result['response']['Attribute']:
value = attribute['value']
event_id = attribute['event_id']
attribute_id = attribute['id']
value = re.sub('^[ \t]*rule ', 'rule misp_e{}_'.format(event_id), value, flags=re.MULTILINE)
value, changed = dirty_cleanup(value)
if changed:
attr_cnt_changed += 1
if 'global rule' in value: # refuse any global rules as they might disable everything
continue
# compile the yara rule to confirm it's validity
# if valid, ignore duplicate rules
try:
attr_cnt += 1
yara.compile(source=value)
yara_rules.append(value)
# print("Rule e{} a{} OK".format(event_id, attribute_id))
except yara.SyntaxError as e:
attr_cnt_invalid += 1
# print("Rule e{} a{} NOK - {}".format(event_id, attribute_id, e))
except yara.Error as e:
attr_cnt_invalid += 1
print(e)
import traceback
print(traceback.format_exc())
# remove duplicates - process the full yara rule list and process errors to eliminate duplicate rule names
all_yara_rules = '\n'.join(yara_rules)
while True:
try:
yara.compile(source=all_yara_rules)
except yara.SyntaxError as e:
if 'duplicated identifier' in e.args[0]:
duplicate_rule_names = re.findall('duplicated identifier "(.*)"', e.args[0])
for item in duplicate_rule_names:
all_yara_rules = all_yara_rules.replace('rule {}'.format(item), 'rule duplicate_{}'.format(item), 1)
attr_cnt_duplicate += 1
continue
else:
# This should never happen as all rules were processed before separately. So logically we should only have duplicates.
exit("ERROR SyntaxError in rules: {}".format(e.args))
break
# save to a file
fname = 'misp.yara'
with open(fname, 'w') as f_out:
f_out.write(all_yara_rules)
print("")
print("MISP attributes with YARA rules: total={} valid={} invalid={} duplicate={} changed={}.".format(attr_cnt, attr_cnt - attr_cnt_invalid, attr_cnt_invalid, attr_cnt_duplicate, attr_cnt_changed))
print("Valid YARA rule file save to file '{}'. Invalid rules/attributes were ignored.".format(fname))

View File

@ -1,4 +1,4 @@
__version__ = '2.4.62'
__version__ = '2.4.65'
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey
from .api import PyMISP

View File

@ -33,9 +33,11 @@ from .mispevent import MISPEvent, MISPAttribute, EncodeUpdate
# Least dirty way to support python 2 and 3
try:
basestring
unicode
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.4")
except NameError:
basestring = str
unicode = str
class distributions(object):
@ -100,18 +102,19 @@ class PyMISP(object):
try:
# Make sure the MISP instance is working and the URL is valid
response = self.get_version()
misp_version = response['version'].split('.')
pymisp_version = __version__.split('.')
for a, b in zip(misp_version, pymisp_version):
if a == b:
continue
elif a < b:
warnings.warn("Remote MISP instance (v{}) older than PyMISP (v{}). You should update your MISP instance, or install an older PyMISP version.".format(response['version'], __version__))
else: # a > b
# NOTE: That can happen and should not be blocking
warnings.warn("Remote MISP instance (v{}) newer than PyMISP (v{}). Please check if a newer version of PyMISP is available.".format(response['version'], __version__))
continue
response = self.get_recommended_api_version()
if not response.get('version'):
warnings.warn("Unable to check the recommended PyMISP version (MISP <2.4.60), please upgrade.")
else:
recommended_pymisp_version = response['version'].split('.')
for a, b in zip(pymisp_version, recommended_pymisp_version):
if a == b:
continue
elif a > b:
warnings.warn("The version of PyMISP recommended by the MISP instance ({}) is older than the one you're using now ({}). Please upgrade the MISP instance or use an older PyMISP version.".format(response['version'], __version__))
else: # a < b
warnings.warn("The version of PyMISP recommended by the MISP instance ({}) is newer than the one you're using now ({}). Please upgrade PyMISP.".format(response['version'], __version__))
except Exception as e:
raise PyMISPError('Unable to connect to MISP ({}). Please make sure the API key and the URL are correct (http/https is required): {}'.format(self.root_url, e))
@ -174,7 +177,7 @@ class PyMISP(object):
for e in errors:
if not e:
continue
if isinstance(e, str):
if isinstance(e, basestring):
messages.append(e)
continue
for type_e, msgs in e.items():
@ -349,23 +352,35 @@ class PyMISP(object):
if e.published:
return {'error': 'Already published'}
e.publish()
return self.update(event)
return self.update(e)
def change_threat_level(self, event, threat_level_id):
e = self._make_mispevent(event)
e.threat_level_id = threat_level_id
return self.update(event)
return self.update(e)
def change_sharing_group(self, event, sharing_group_id):
e = self._make_mispevent(event)
e.distribution = 4 # Needs to be 'Sharing group'
e.sharing_group_id = sharing_group_id
return self.update(event)
return self.update(e)
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None):
misp_event = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published, orgc_id, org_id, sharing_group_id)
return self.add_event(json.dumps(misp_event, cls=EncodeUpdate))
def tag(self, uuid, tag):
session = self.__prepare_session()
path = '/tags/attachTagToObject/{}/{}/'.format(uuid, tag)
response = session.post(urljoin(self.root_url, path))
return self._check_response(response)
def untag(self, uuid, tag):
session = self.__prepare_session()
path = '/tags/removeTagFromObject/{}/{}/'.format(uuid, tag)
response = session.post(urljoin(self.root_url, path))
return self._check_response(response)
def add_tag(self, event, tag, attribute=False):
# FIXME: this is dirty, this function needs to be deprecated with something tagging a UUID
session = self.__prepare_session()
@ -373,6 +388,9 @@ class PyMISP(object):
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
path = 'attributes/addTag'
else:
# Allow for backwards-compat with old style
if "Event" in event:
event = event["Event"]
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
path = 'events/addTag'
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
@ -411,7 +429,7 @@ class PyMISP(object):
e = MISPEvent(self.describe_types)
e.load(event)
e.attributes += attributes
response = self.update(event)
response = self.update(e)
return response
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
@ -464,7 +482,7 @@ class PyMISP(object):
# It's a file handle - we can read it
fileData = attachment.read()
elif isinstance(attachment, str):
elif isinstance(attachment, basestring):
# It can either be the b64 encoded data or a file path
if os.path.exists(attachment):
# It's a path!
@ -1042,6 +1060,13 @@ class PyMISP(object):
else:
return {'error': 'Impossible to retrieve the version of the master branch.'}
def get_recommended_api_version(self):
"""Returns the recommended API version from the server"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/getPyMISPVersion.json')
response = session.get(url)
return self._check_response(response)
def get_version(self):
"""Returns the version of the instance."""
session = self.__prepare_session()
@ -1060,10 +1085,10 @@ class PyMISP(object):
# ############## Export Attributes in text ####################################
def get_all_attributes_txt(self, type_attr):
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported."""
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
session = self.__prepare_session('txt')
url = urljoin(self.root_url, 'attributes/text/download/%s' % type_attr)
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
response = session.get(url)
return response
@ -1110,13 +1135,18 @@ class PyMISP(object):
response = session.post(url)
return self._check_response(response)
def sighting_per_json(self, json_file):
def set_sightings(self, sightings):
if isinstance(sightings, dict):
sightings = json.dumps(sightings)
session = self.__prepare_session()
url = urljoin(self.root_url, 'sightings/add/')
response = session.post(url, data=sightings)
return self._check_response(response)
def sighting_per_json(self, json_file):
with open(json_file) as f:
jdata = json.load(f)
url = urljoin(self.root_url, 'sightings/add/')
response = session.post(url, data=json.dumps(jdata))
return self._check_response(response)
return self.set_sightings(jdata)
# ############## Sharing Groups ##################

View File

@ -340,6 +340,190 @@
"x509-fingerprint-sha1": {
"default_category": "Network activity",
"to_ids": 1
},
"dns-soa-email": {
"default_category": "Attribution",
"to_ids": 0
},
"size-in-bytes": {
"default_category": "Other",
"to_ids": 0
},
"counter": {
"default_category": "Other",
"to_ids": 0
},
"datetime": {
"default_category": "Other",
"to_ids": 0
},
"cpe": {
"default_category": "Other",
"to_ids": 0
},
"port": {
"default_category": "Network activity",
"to_ids": 0
},
"ip-dst|port": {
"default_category": "Network activity",
"to_ids": 1
},
"ip-src|port": {
"default_category": "Network activity",
"to_ids": 1
},
"hostname|port": {
"default_category": "Network activity",
"to_ids": 1
},
"email-dst-display-name": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-src-display-name": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-header": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-reply-to": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-x-mailer": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-mime-boundary": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-thread-index": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-message-id": {
"default_category": "",
"to_ids": 0
},
"github-username": {
"default_category": "Social network",
"to_ids": 0
},
"github-repository": {
"default_category": "Social network",
"to_ids": 0
},
"github-organisation": {
"default_category": "Social network",
"to_ids": 0
},
"jabber-id": {
"default_category": "Social network",
"to_ids": 0
},
"twitter-id": {
"default_category": "Social network",
"to_ids": 0
},
"first-name": {
"default_category": "Person",
"to_ids": 0
},
"middle-name": {
"default_category": "Person",
"to_ids": 0
},
"last-name": {
"default_category": "Person",
"to_ids": 0
},
"date-of-birth": {
"default_category": "Person",
"to_ids": 0
},
"place-of-birth": {
"default_category": "Person",
"to_ids": 0
},
"gender": {
"default_category": "",
"to_ids": 0
},
"passport-number": {
"default_category": "Person",
"to_ids": 0
},
"passport-country": {
"default_category": "Person",
"to_ids": 0
},
"passport-expiration": {
"default_category": "Person",
"to_ids": 0
},
"redress-number": {
"default_category": "Person",
"to_ids": 0
},
"nationality": {
"default_category": "Person",
"to_ids": 0
},
"visa-number": {
"default_category": "Person",
"to_ids": 0
},
"issue-date-of-the-visa": {
"default_category": "Person",
"to_ids": 0
},
"primary-residence": {
"default_category": "Person",
"to_ids": 0
},
"country-of-residence": {
"default_category": "Person",
"to_ids": 0
},
"special-service-request": {
"default_category": "Person",
"to_ids": 0
},
"frequent-flyer-number": {
"default_category": "Person",
"to_ids": 0
},
"travel-details": {
"default_category": "Person",
"to_ids": 0
},
"payment-details": {
"default_category": "Person",
"to_ids": 0
},
"place-port-of-original-embarkation": {
"default_category": "Person",
"to_ids": 0
},
"place-port-of-clearance": {
"default_category": "Person",
"to_ids": 0
},
"place-port-of-onward-foreign-destination": {
"default_category": "Person",
"to_ids": 0
},
"passenger-name-record-locator-number": {
"default_category": "Person",
"to_ids": 0
},
"mobile-application-id": {
"default_category": "Payload delivery",
"to_ids": 1
}
},
"types": [
@ -427,7 +611,53 @@
"whois-registrant-name",
"whois-registrar",
"whois-creation-date",
"x509-fingerprint-sha1"
"x509-fingerprint-sha1",
"dns-soa-email",
"size-in-bytes",
"counter",
"datetime",
"cpe",
"port",
"ip-dst|port",
"ip-src|port",
"hostname|port",
"email-dst-display-name",
"email-src-display-name",
"email-header",
"email-reply-to",
"email-x-mailer",
"email-mime-boundary",
"email-thread-index",
"email-message-id",
"github-username",
"github-repository",
"github-organisation",
"jabber-id",
"twitter-id",
"first-name",
"middle-name",
"last-name",
"date-of-birth",
"place-of-birth",
"gender",
"passport-number",
"passport-country",
"passport-expiration",
"redress-number",
"nationality",
"visa-number",
"issue-date-of-the-visa",
"primary-residence",
"country-of-residence",
"special-service-request",
"frequent-flyer-number",
"travel-details",
"payment-details",
"place-port-of-original-embarkation",
"place-port-of-clearance",
"place-port-of-onward-foreign-destination",
"passenger-name-record-locator-number",
"mobile-application-id"
],
"categories": [
"Internal reference",
@ -442,6 +672,9 @@
"Attribution",
"External analysis",
"Financial fraud",
"Support Tool",
"Social network",
"Person",
"Other"
],
"category_type_mappings": {
@ -497,6 +730,8 @@
"filename|pehash",
"ip-src",
"ip-dst",
"ip-dst|port",
"ip-src|port",
"hostname",
"domain",
"email-src",
@ -517,7 +752,19 @@
"text",
"vulnerability",
"x509-fingerprint-sha1",
"other"
"other",
"ip-dst|port",
"ip-src|port",
"hostname|port",
"email-dst-display-name",
"email-src-display-name",
"email-header",
"email-reply-to",
"email-x-mailer",
"email-mime-boundary",
"email-thread-index",
"email-message-id",
"mobile-application-id"
],
"Artifacts dropped": [
"md5",
@ -602,6 +849,7 @@
"comment",
"text",
"x509-fingerprint-sha1",
"mobile-application-id",
"other"
],
"Persistence mechanism": [
@ -615,6 +863,8 @@
"Network activity": [
"ip-src",
"ip-dst",
"ip-dst|port",
"ip-src|port",
"hostname",
"domain",
"domain|ip",
@ -662,6 +912,8 @@
"filename|sha256",
"ip-src",
"ip-dst",
"ip-dst|port",
"ip-src|port",
"hostname",
"domain",
"domain|ip",
@ -681,6 +933,7 @@
"comment",
"text",
"x509-fingerprint-sha1",
"github-repository",
"other"
],
"Financial fraud": [
@ -696,7 +949,60 @@
"text",
"other"
],
"Support Tool": [
"link",
"text",
"attachment",
"comment",
"text",
"other"
],
"Social network": [
"github-username",
"github-repository",
"github-organisation",
"jabber-id",
"twitter-id",
"email-src",
"email-dst",
"comment",
"text",
"other"
],
"Person": [
"first-name",
"middle-name",
"last-name",
"date-of-birth",
"place-of-birth",
"gender",
"passport-number",
"passport-country",
"passport-expiration",
"redress-number",
"nationality",
"visa-number",
"issue-date-of-the-visa",
"primary-residence",
"country-of-residence",
"special-service-request",
"frequent-flyer-number",
"travel-details",
"payment-details",
"place-port-of-original-embarkation",
"place-port-of-clearance",
"place-port-of-onward-foreign-destination",
"passenger-name-record-locator-number",
"comment",
"text",
"other"
],
"Other": [
"size-in-bytes",
"counter",
"datetime",
"cpe",
"port",
"comment",
"text",
"other"

View File

@ -101,6 +101,9 @@ class MISPAttribute(object):
def delete(self):
self.deleted = True
def add_tag(self, tag):
self.Tag.append({'name': tag})
def verify(self, gpg_uid):
if not has_pyme:
raise Exception('pyme is required, please install: pip install --pre pyme3. You will also need libgpg-error-dev and libgpgme11-dev.')
@ -116,7 +119,7 @@ class MISPAttribute(object):
def set_all_values(self, **kwargs):
if kwargs.get('type') and kwargs.get('category'):
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.format(self.type, self.category, (', '.join(self.category_type_mapping[kwargs['category']]))))
raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.format(kwargs.get('type'), kwargs.get('category'), (', '.join(self.category_type_mapping[kwargs['category']]))))
# Required
if kwargs.get('type'):
self.type = kwargs['type']
@ -174,7 +177,7 @@ class MISPAttribute(object):
if kwargs.get('sig'):
self.sig = kwargs['sig']
if kwargs.get('Tag'):
self.Tag = kwargs['Tag']
self.Tag = [t for t in kwargs['Tag'] if t]
# If the user wants to disable correlation, let them. Defaults to False.
self.disable_correlation = kwargs.get("disable_correlation", False)
@ -214,6 +217,8 @@ class MISPAttribute(object):
to_return = {'type': self.type, 'category': self.category, 'to_ids': self.to_ids,
'distribution': self.distribution, 'value': self.value,
'comment': self.comment, 'disable_correlation': self.disable_correlation}
if self.uuid:
to_return['uuid'] = self.uuid
if self.sig:
to_return['sig'] = self.sig
if self.sharing_group_id:
@ -231,9 +236,8 @@ class MISPAttribute(object):
to_return = self._json()
if self.id:
to_return['id'] = self.id
if self.uuid:
to_return['uuid'] = self.uuid
if self.timestamp:
# Should never be set on an update, MISP will automatically set it to now
to_return['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
if self.deleted is not None:
to_return['deleted'] = self.deleted
@ -436,6 +440,8 @@ class MISPEvent(object):
if self.analysis not in [0, 1, 2]:
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(self.analysis))
if kwargs.get('published') is not None:
self.unpublish()
if kwargs.get("published") == True:
self.publish()
if kwargs.get('date'):
self.set_date(kwargs['date'])
@ -481,7 +487,7 @@ class MISPEvent(object):
if kwargs.get('Galaxy'):
self.Galaxy = kwargs['Galaxy']
if kwargs.get('Tag'):
self.Tag = kwargs['Tag']
self.Tag = [t for t in kwargs['Tag'] if t]
if kwargs.get('sig'):
self.sig = kwargs['sig']
if kwargs.get('global_sig'):
@ -542,6 +548,7 @@ class MISPEvent(object):
if self.publish_timestamp:
to_return['Event']['publish_timestamp'] = int(time.mktime(self.publish_timestamp.timetuple()))
if self.timestamp:
# Should never be set on an update, MISP will automatically set it to now
to_return['Event']['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
to_return['Event'] = _int_to_str(to_return['Event'])
if self.attributes:
@ -549,6 +556,19 @@ class MISPEvent(object):
jsonschema.validate(to_return, self.json_schema)
return to_return
def add_tag(self, tag):
self.Tag.append({'name': tag})
def add_attribute_tag(self, tag, attribute_identifier):
attribute = None
for a in self.attributes:
if a.id == attribute_identifier or a.uuid == attribute_identifier or attribute_identifier in a.value:
a.add_tag(tag)
attribute = a
if not attribute:
raise Exception('No attribute with identifier {} found.'.format(attribute_identifier))
return attribute
def publish(self):
self.published = True

View File

@ -38,7 +38,8 @@ class TestOffline(unittest.TestCase):
def initURI(self, m):
m.register_uri('GET', self.domain + 'events/1', json=self.auth_error_msg, status_code=403)
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.56"})
m.register_uri('GET', self.domain + 'servers/getVersion.json', json={"version": "2.4.62"})
m.register_uri('GET', self.domain + 'servers/getPyMISPVersion.json', json={"version": "2.4.62"})
m.register_uri('GET', self.domain + 'sharing_groups.json', json=self.sharing_groups)
m.register_uri('GET', self.domain + 'attributes/describeTypes.json', json=self.types)
m.register_uri('GET', self.domain + 'events/2', json=self.event)
@ -97,7 +98,7 @@ class TestOffline(unittest.TestCase):
api_version = pymisp.get_api_version()
self.assertEqual(api_version, {'version': pm.__version__})
server_version = pymisp.get_version()
self.assertEqual(server_version, {"version": "2.4.56"})
self.assertEqual(server_version, {"version": "2.4.62"})
def test_getSharingGroups(self, m):
self.initURI(m)