Merge pull request #67 from CIRCL/next

Major refactoring, release v2.4.53
pull/30/head v2.4.53
Raphaël Vinot 2016-10-21 15:45:48 +02:00 committed by GitHub
commit 7951e1830e
19 changed files with 1972 additions and 632 deletions

1
MANIFEST.in Normal file
View File

@ -0,0 +1 @@
include pymisp/data/*

View File

@ -5,60 +5,70 @@ import random
from random import randint
import string
def randomStringGenerator(size, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
def randomIpGenerator():
return str(randint(0, 255)) + '.' + str(randint(0, 255)) + '.' + str(randint(0, 255)) + '.' + str(randint(0, 255))
def floodtxt(misp, event, maxlength = 255):
def floodtxt(misp, event, maxlength=255):
text = randomStringGenerator(randint(1, maxlength))
textfunctions = [misp.add_internal_comment, misp.add_internal_text, misp.add_internal_other, misp.add_email_subject, misp.add_mutex, misp.add_filename]
textfunctions[randint(0,5)](event, text)
textfunctions[randint(0, 5)](event, text)
def floodip(misp, event):
ip = randomIpGenerator()
ipfunctions = [misp.add_ipsrc, misp.add_ipdst]
ipfunctions[randint(0,1)](event, ip)
ipfunctions[randint(0, 1)](event, ip)
def flooddomain(misp, event, maxlength = 25):
def flooddomain(misp, event, maxlength=25):
a = randomStringGenerator(randint(1, maxlength))
b = randomStringGenerator(randint(2, 3), chars=string.ascii_lowercase)
domain = a + '.' + b
domainfunctions = [misp.add_hostname, misp.add_domain]
domainfunctions[randint(0,1)](event, domain)
domainfunctions[randint(0, 1)](event, domain)
def flooddomainip(misp, event, maxlength = 25):
def flooddomainip(misp, event, maxlength=25):
a = randomStringGenerator(randint(1, maxlength))
b = randomStringGenerator(randint(2, 3), chars=string.ascii_lowercase)
domain = a + '.' + b
ip = randomIpGenerator()
misp.add_domain_ip(event, domain, ip)
def floodemail(misp, event, maxlength = 25):
def floodemail(misp, event, maxlength=25):
a = randomStringGenerator(randint(1, maxlength))
b = randomStringGenerator(randint(1, maxlength))
c = randomStringGenerator(randint(2, 3), chars=string.ascii_lowercase)
email = a + '@' + b + '.'+ c
email = a + '@' + b + '.' + c
emailfunctions = [misp.add_email_src, misp.add_email_dst]
emailfunctions[randint(0,1)](event, email)
emailfunctions[randint(0, 1)](event, email)
def floodattachment(misp, eventid, distribution, to_ids, category, comment, info, analysis, threat_level_id):
filename = randomStringGenerator(randint(1,128))
filename = randomStringGenerator(randint(1, 128))
misp.upload_sample(filename, 'dummy', eventid, distribution, to_ids, category, comment, info, analysis, threat_level_id)
def create_dummy_event(misp):
event = misp.new_event(0, 4, 0, 'dummy event')
flooddomainip(misp, event)
floodattachment(misp, event['Event']['id'], event['Event']['id'], event['Event']['distribution'], False, 'Payload delivery', '', event['Event']['info'], event['Event']['analysis'], event['Event']['threat_level_id'])
floodattachment(misp, event['Event']['id'], event['Event']['distribution'], False, 'Payload delivery', '', event['Event']['info'], event['Event']['analysis'], event['Event']['threat_level_id'])
def create_massive_dummy_events(misp, nbattribute):
event = misp.new_event(0, 4, 0, 'massive dummy event')
eventid = event['Event']['id']
functions = [floodtxt, floodip, flooddomain, flooddomainip, floodemail, floodattachment]
for i in range(nbattribute):
choice = randint(0,5)
choice = randint(0, 5)
if choice == 5:
floodattachment(misp, eventid, event['Event']['distribution'], False, 'Payload delivery', '', event['Event']['info'], event['Event']['analysis'], event['Event']['threat_level_id'])
else:
functions[choice](misp,event)
functions[choice](misp, event)

View File

@ -21,8 +21,11 @@ if __name__ == '__main__':
else:
result = misp.download_last(args.argument)
events = tools.eventsListBuildFromArray(result)
attributes = tools.attributesListBuild(events)
temp = tools.getNbAttributePerEventCategoryType(attributes)
temp = temp.groupby(level=['category', 'type']).sum()
tools.createTreemap(temp, 'Attributes Distribution', 'attribute_treemap.svg', 'attribute_table.html')
if 'response' in result:
events = tools.eventsListBuildFromArray(result)
attributes = tools.attributesListBuild(events)
temp = tools.getNbAttributePerEventCategoryType(attributes)
temp = temp.groupby(level=['category', 'type']).sum()
tools.createTreemap(temp, 'Attributes Distribution', 'attribute_treemap.svg', 'attribute_table.html')
else:
print ('There is no event answering the research criteria')

View File

@ -27,7 +27,7 @@ if __name__ == '__main__':
if args.days is None:
args.days = 7
result = misp.download_last('{}d'.format(args.days))
result = misp.search(last='{}d'.format(args.days), metadata=True)
tools.checkDateConsistancy(args.begindate, args.enddate, tools.getLastdate(args.days))
@ -41,25 +41,29 @@ if __name__ == '__main__':
else:
args.enddate = tools.setEnddate(tools.toDatetime(args.enddate))
events = tools.selectInRange(tools.eventsListBuildFromArray(result), begin=args.begindate, end=args.enddate)
totalPeriodEvents = tools.getNbitems(events)
tags = tools.tagsListBuild(events)
result = tools.isTagIn(tags, args.tag)
totalPeriodTags = len(result)
if 'response' in result:
events = tools.selectInRange(tools.eventsListBuildFromArray(result), begin=args.begindate, end=args.enddate)
totalPeriodEvents = tools.getNbitems(events)
tags = tools.tagsListBuild(events)
result = tools.isTagIn(tags, args.tag)
totalPeriodTags = len(result)
text = 'Studied pediod: from '
if args.begindate is None:
text = text + '1970-01-01'
else:
text = text + str(args.begindate.date())
text = text + ' to '
if args.enddate is None:
text = text + str(datetime.now().date())
else:
text = text + str(args.enddate.date())
text = 'Studied pediod: from '
if args.begindate is None:
text = text + '1970-01-01'
else:
text = text + str(args.begindate.date())
text = text + ' to '
if args.enddate is None:
text = text + str(datetime.now().date())
else:
text = text + str(args.enddate.date())
print('\n========================================================')
print(text)
print('During the studied pediod, ' + str(totalPeriodTags) + ' events out of ' + str(totalPeriodEvents) + ' contains at least one tag with ' + args.tag + '.')
if totalPeriodEvents != 0:
print('It represents {}% of the events in this period.'.format(round(100 * totalPeriodTags / totalPeriodEvents, 3)))
else:
print ('There is no event answering the research criteria')
print('\n========================================================')
print(text)
print('During the studied pediod, ' + str(totalPeriodTags) + ' events out of ' + str(totalPeriodEvents) + ' contains at least one tag with ' + args.tag + '.')
if totalPeriodEvents != 0:
print('It represents {}% of the events in this period.'.format(round(100 * totalPeriodTags / totalPeriodEvents, 3)))

View File

@ -26,7 +26,7 @@ if __name__ == '__main__':
if args.days is None:
args.days = 7
result = misp.download_last('{}d'.format(args.days))
result = misp.search(last='{}d'.format(args.days), metadata=True)
tools.checkDateConsistancy(args.begindate, args.enddate, tools.getLastdate(args.days))
@ -40,16 +40,19 @@ if __name__ == '__main__':
else:
args.enddate = tools.setEnddate(tools.toDatetime(args.enddate))
events = tools.selectInRange(tools.eventsListBuildFromArray(result), begin=args.begindate, end=args.enddate)
tags = tools.tagsListBuild(events)
result = tools.getNbOccurenceTags(tags)
if 'response' in result:
events = tools.selectInRange(tools.eventsListBuildFromArray(result), begin=args.begindate, end=args.enddate)
tags = tools.tagsListBuild(events)
result = tools.getNbOccurenceTags(tags)
else:
result = 'There is no event during the studied period'
text = 'Studied pediod: from '
if args.begindate is None:
text = text + '1970-01-01'
else:
text = text + str(args.begindate.date())
text = text + ' to '
text = text + ' to '
if args.enddate is None:
text = text + str(datetime.now().date())
else:

View File

@ -49,43 +49,46 @@ if __name__ == '__main__':
last = '7d'
title = 'Tags repartition over the last 7 days'
result = misp.download_last(last)
events = tools.eventsListBuildFromArray(result)
result = []
dates = []
enddate = tools.getToday()
colourDict = {}
faketag = False
result = misp.search(last=last, metadata=True)
if 'response' in result:
events = tools.eventsListBuildFromArray(result)
result = []
dates = []
enddate = tools.getToday()
colourDict = {}
faketag = False
for i in range(split):
begindate = tools.getNDaysBefore(enddate, size)
dates.append(str(enddate.date()))
eventstemp = tools.selectInRange(events, begin=begindate, end=enddate)
if eventstemp is not None:
tags = tools.tagsListBuild(eventstemp)
if tags is not None:
tools.createDictTagsColour(colourDict, tags)
result.append(tools.getNbOccurenceTags(tags))
for i in range(split):
begindate = tools.getNDaysBefore(enddate, size)
dates.append(str(enddate.date()))
eventstemp = tools.selectInRange(events, begin=begindate, end=enddate)
if eventstemp is not None:
tags = tools.tagsListBuild(eventstemp)
if tags is not None:
tools.createDictTagsColour(colourDict, tags)
result.append(tools.getNbOccurenceTags(tags))
else:
result.append(tools.createFakeEmptyTagsSeries())
faketag = True
else:
result.append(tools.createFakeEmptyTagsSeries())
faketag = True
else:
result.append(tools.createFakeEmptyTagsSeries())
faketag = True
enddate = begindate
enddate = begindate
result = formattingDataframe(result, dates, 0)
if faketag:
result = tools.removeFaketagRow(result)
result = formattingDataframe(result, dates, 0)
if faketag:
result = tools.removeFaketagRow(result)
taxonomies, emptyOther = tools.getTaxonomies(tools.getCopyDataframe(result))
taxonomies, emptyOther = tools.getTaxonomies(tools.getCopyDataframe(result))
tools.tagsToLineChart(tools.getCopyDataframe(result), title, dates, colourDict)
tools.tagstrendToLineChart(tools.getCopyDataframe(result), title, dates, split, colourDict)
tools.tagsToTaxoLineChart(tools.getCopyDataframe(result), title, dates, colourDict, taxonomies, emptyOther)
tools.tagstrendToTaxoLineChart(tools.getCopyDataframe(result), title, dates, split, colourDict, taxonomies, emptyOther)
if args.order is None:
args.order = 3
tools.tagsToPolyChart(tools.getCopyDataframe(result), split, colourDict, taxonomies, emptyOther, args.order)
tools.createVisualisation(taxonomies)
tools.tagsToLineChart(tools.getCopyDataframe(result), title, dates, colourDict)
tools.tagstrendToLineChart(tools.getCopyDataframe(result), title, dates, split, colourDict)
tools.tagsToTaxoLineChart(tools.getCopyDataframe(result), title, dates, colourDict, taxonomies, emptyOther)
tools.tagstrendToTaxoLineChart(tools.getCopyDataframe(result), title, dates, split, colourDict, taxonomies, emptyOther)
if args.order is None:
args.order = 3
tools.tagsToPolyChart(tools.getCopyDataframe(result), split, colourDict, taxonomies, emptyOther, args.order)
tools.createVisualisation(taxonomies)
else:
print('There is no event during the studied period')

View File

@ -437,7 +437,7 @@ def tagsToPolyChart(dataframe, split, colourDict, taxonomies, emptyOther, order)
pylab.title('Polynomial Fit with Matplotlib: ' + taxonomy)
pylab.legend(loc='center left', bbox_to_anchor=(1, 0.5))
ax = plt.gca()
ax.set_facecolor((0.898, 0.898, 0.898))
# ax.set_facecolor((0.898, 0.898, 0.898))
box = ax.get_position()
ax.set_position([box.x0 - 0.01, box.y0, box.width * 0.78, box.height])
fig = plt.gcf()
@ -473,7 +473,7 @@ def tagsToPolyChart(dataframe, split, colourDict, taxonomies, emptyOther, order)
pylab.title('Polynomial Fit with Matplotlib: other')
pylab.legend(loc='center left', bbox_to_anchor=(1, 0.5))
ax = plt.gca()
ax.set_facecolor((0.898, 0.898, 0.898))
#cax.set_facecolor((0.898, 0.898, 0.898))
box = ax.get_position()
ax.set_position([box.x0 - 0.01, box.y0, box.width * 0.78, box.height])
fig = plt.gcf()

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key,misp_verifycert
from keys import misp_url, misp_key, misp_verifycert
import argparse
import os
import glob
@ -12,8 +12,8 @@ def init(url, key):
return PyMISP(url, key, misp_verifycert, 'json')
def upload_files(m, eid, paths, distrib, ids, categ, info, analysis, threat):
out = m.upload_samplelist(paths, eid, distrib, ids, categ, info, analysis, threat)
def upload_files(m, eid, paths, distrib, ids, categ, comment, info, analysis, threat):
out = m.upload_samplelist(paths, eid, distrib, ids, categ, comment, info, analysis, threat)
print(out)
if __name__ == '__main__':
@ -26,6 +26,7 @@ if __name__ == '__main__':
parser.add_argument("-i", "--info", help="Used to populate the event info field if no event ID supplied.")
parser.add_argument("-a", "--analysis", type=int, help="The analysis level of the newly created event, if applicatble. [0-2]")
parser.add_argument("-t", "--threat", type=int, help="The threat level ID of the newly created event, if applicatble. [1-4]")
parser.add_argument("-co", "--comment", type=str, help="Comment for the uploaded file(s).")
args = parser.parse_args()
misp = init(misp_url, misp_key)
@ -39,4 +40,4 @@ if __name__ == '__main__':
print('invalid file')
exit(0)
upload_files(misp, args.event, files, args.distrib, args.ids, args.categ, args.info, args.analysis, args.threat)
upload_files(misp, args.event, files, args.distrib, args.ids, args.categ, args.comment, args.info, args.analysis, args.threat)

View File

@ -1,3 +1,5 @@
__version__ = '2.4.51.1'
__version__ = '2.4.53'
from .api import PyMISP, PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey
from .api import PyMISP
from .mispevent import MISPEvent, MISPAttribute, EncodeUpdate, EncodeFull

View File

@ -8,15 +8,15 @@ import datetime
import os
import base64
import re
import warnings
try:
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.3")
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
from io import BytesIO
import zipfile
import warnings
import functools
try:
import requests
@ -25,9 +25,13 @@ except ImportError:
HAVE_REQUESTS = False
from . import __version__
from .exceptions import PyMISPError, SearchError, MissingDependency, NoURL, NoKey
from .mispevent import MISPEvent, MISPAttribute, EncodeUpdate
# Least dirty way to support python 2 and 3
try:
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.3")
basestring
except NameError:
basestring = str
@ -56,36 +60,6 @@ class analysis(object):
completed = 2
class PyMISPError(Exception):
def __init__(self, message):
super(PyMISPError, self).__init__(message)
self.message = message
class NewEventError(PyMISPError):
pass
class NewAttributeError(PyMISPError):
pass
class SearchError(PyMISPError):
pass
class MissingDependency(PyMISPError):
pass
class NoURL(PyMISPError):
pass
class NoKey(PyMISPError):
pass
class PyMISP(object):
"""
Python API for MISP
@ -118,6 +92,7 @@ class PyMISP(object):
self.ssl = ssl
self.proxies = proxies
self.cert = cert
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
if out_type != 'json':
raise PyMISPError('The only output type supported by PyMISP is JSON. If you still rely on XML, use PyMISP v2.4.49')
self.debug = debug
@ -128,16 +103,24 @@ class PyMISP(object):
except Exception as e:
raise PyMISPError('Unable to connect to MISP ({}). Please make sure the API key and the URL are correct (http/https is required): {}'.format(self.root_url, e))
session = self.__prepare_session()
response = session.get(urljoin(self.root_url, 'attributes/describeTypes.json'))
self.describe_types = self._check_response(response)
if self.describe_types.get('error'):
for e in self.describe_types.get('error'):
raise PyMISPError('Failed: {}'.format(e))
try:
session = self.__prepare_session()
response = session.get(urljoin(self.root_url, 'attributes/describeTypes.json'))
describe_types = self._check_response(response)
if describe_types.get('error'):
for e in describe_types.get('error'):
raise PyMISPError('Failed: {}'.format(e))
self.describe_types = describe_types['result']
if not self.describe_types.get('sane_defaults'):
raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.')
except:
describe_types = json.load(open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r'))
self.describe_types = describe_types['result']
self.categories = self.describe_types['result']['categories']
self.types = self.describe_types['result']['types']
self.category_type_mapping = self.describe_types['result']['category_type_mappings']
self.categories = self.describe_types['categories']
self.types = self.describe_types['types']
self.category_type_mapping = self.describe_types['category_type_mappings']
self.sane_default = self.describe_types['sane_defaults']
def __prepare_session(self, output='json'):
"""
@ -152,7 +135,8 @@ class PyMISP(object):
session.headers.update(
{'Authorization': self.key,
'Accept': 'application/{}'.format(output),
'content-type': 'application/{}'.format(output)})
'content-type': 'application/{}'.format(output),
'User-Agent': 'PyMISP {}'.format(__version__)})
return session
def flatten_error_messages(self, response):
@ -175,6 +159,8 @@ class PyMISP(object):
messages.append('Error in {}: {}'.format(where, msg))
else:
for e in errors:
if not e:
continue
if isinstance(e, str):
messages.append(e)
continue
@ -311,68 +297,18 @@ class PyMISP(object):
# ##############################################
def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False):
to_return = {'Event': {}}
# Setup details of a new event
if distribution not in [0, 1, 2, 3]:
raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3'.format(distribution))
if threat_level_id not in [1, 2, 3, 4]:
raise NewEventError('{} is invalid, the threat_level_id has to be in 1, 2, 3, 4'.format(threat_level_id))
if analysis not in [0, 1, 2]:
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(analysis))
if date is None:
date = datetime.date.today().isoformat()
if published not in [True, False]:
raise NewEventError('{} is invalid, published has to be True or False'.format(published))
to_return['Event'] = {'distribution': distribution, 'info': info, 'date': date, 'published': published,
'threat_level_id': threat_level_id, 'analysis': analysis}
return to_return
misp_event = MISPEvent(self.describe_types)
misp_event.set_all_values(info=info, distribution=distribution, threat_level_id=threat_level_id,
analysis=analysis, date=date)
if published:
misp_event.publish()
return misp_event
def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=None):
to_return = {}
if category not in self.categories:
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(self.categories))))
if type_value not in self.types:
raise NewAttributeError('{} is invalid, type_value has to be in {}'.format(type_value, (', '.join(self.types))))
if type_value not in self.category_type_mapping[category]:
raise NewAttributeError('{} and {} is an invalid combinaison, type_value for this category has to be in {}'.format(type_value, category, (', '.join(self.category_type_mapping[category]))))
to_return['type'] = type_value
to_return['category'] = category
if to_ids not in [True, False]:
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(to_ids))
to_return['to_ids'] = to_ids
if distribution is not None:
distribution = int(distribution)
# If None: take the default value of the event
if distribution not in [None, 0, 1, 2, 3, 5]:
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 5 or None'.format(distribution))
if distribution is not None:
to_return['distribution'] = distribution
to_return['value'] = value
if comment is not None:
to_return['comment'] = comment
return to_return
def _prepare_update(self, event):
# Cleanup the received event to make it publishable
event['Event'].pop('locked', None)
event['Event'].pop('attribute_count', None)
event['Event'].pop('RelatedEvent', None)
event['Event'].pop('orgc', None)
event['Event'].pop('ShadowAttribute', None)
event['Event'].pop('org', None)
event['Event'].pop('proposal_email_lock', None)
event['Event'].pop('publish_timestamp', None)
event['Event'].pop('published', None)
event['Event'].pop('timestamp', None)
event['Event']['id'] = int(event['Event']['id'])
return event
def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=5):
misp_attribute = MISPAttribute(self.describe_types)
misp_attribute.set_all_values(type=type_value, value=value, category=category,
to_ids=to_ids, comment=comment, distribution=distribution)
return misp_attribute
def _one_or_more(self, value):
"""Returns a list/tuple of one or more items, regardless of input."""
@ -381,30 +317,32 @@ class PyMISP(object):
# ########## Helpers ##########
def get(self, eid):
response = self.get_event(int(eid))
return response
return self.get_event(eid)
def get_stix(self, **kwargs):
response = self.get_stix_event(**kwargs)
return response
return self.get_stix_event(**kwargs)
def update(self, event):
eid = event['Event']['id']
response = self.update_event(eid, event)
return response
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False):
data = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published)
response = self.add_event(data)
return response
return self.update_event(eid, event)
def publish(self, event):
if event['Event']['published']:
return {'error': 'Already published'}
event = self._prepare_update(event)
event['Event']['published'] = True
response = self.update_event(event['Event']['id'], event)
return response
e = MISPEvent(self.describe_types)
e.load(event)
e.publish()
return self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
def change_threat_level(self, event, threat_level_id):
e = MISPEvent(self.describe_types)
e.load(event)
e.threat_level_id = threat_level_id
return self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False):
misp_event = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published)
return self.add_event(json.dumps(misp_event, cls=EncodeUpdate))
def add_tag(self, event, tag):
session = self.__prepare_session()
@ -418,33 +356,22 @@ class PyMISP(object):
response = session.post(urljoin(self.root_url, 'events/removeTag'), data=json.dumps(to_post))
return self._check_response(response)
def change_threat_level(self, event, threat_level_id):
event['Event']['threat_level_id'] = threat_level_id
self._prepare_update(event)
response = self.update_event(event['Event']['id'], event)
return response
# ##### File attributes #####
def _send_attributes(self, event, attributes, proposal=False):
if proposal:
response = self.proposal_add(event['Event']['id'], attributes)
else:
event = self._prepare_update(event)
for a in attributes:
if a.get('distribution') is None:
a['distribution'] = 5
event['Event']['Attribute'] = attributes
response = self.update_event(event['Event']['id'], event)
e = MISPEvent(self.describe_types)
e.load(event)
e.attributes += attributes
response = self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
return response
def add_named_attribute(self, event, category, type_value, value, to_ids=False, comment=None, distribution=None, proposal=False):
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
if value and category and type:
try:
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
except NewAttributeError as e:
return e
for value in self._one_or_more(value):
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, ssdeep=None, comment=None, to_ids=True, distribution=None, proposal=False):
@ -713,21 +640,9 @@ class PyMISP(object):
# ######### Upload samples through the API #########
# ##################################################
def _create_event(self, distribution, threat_level_id, analysis, info):
# Setup details of a new event
if distribution not in [0, 1, 2, 3]:
raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3'.format(distribution))
if threat_level_id not in [1, 2, 3, 4]:
raise NewEventError('{} is invalid, the threat_level_id has to be in 1, 2, 3, 4'.format(threat_level_id))
if analysis not in [0, 1, 2]:
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(analysis))
return {'distribution': int(distribution), 'info': info,
'threat_level_id': int(threat_level_id), 'analysis': analysis}
def prepare_attribute(self, event_id, distribution, to_ids, category, comment, info,
analysis, threat_level_id):
def _prepare_upload(self, event_id, distribution, to_ids, category, comment, info,
analysis, threat_level_id):
to_post = {'request': {}}
authorized_categs = ['Payload delivery', 'Artifacts dropped', 'Payload installation', 'External analysis', 'Network activity', 'Antivirus detection']
if event_id is not None:
try:
@ -736,16 +651,21 @@ class PyMISP(object):
pass
if not isinstance(event_id, int):
# New event
to_post['request'] = self._create_event(distribution, threat_level_id, analysis, info)
misp_event = self._prepare_full_event(distribution, threat_level_id, analysis, info)
to_post['request']['distribution'] = misp_event.distribution
to_post['request']['info'] = misp_event.info
to_post['request']['analysis'] = misp_event.analysis
to_post['request']['threat_level_id'] = misp_event.threat_level_id
else:
to_post['request']['event_id'] = int(event_id)
if to_ids not in [True, False]:
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(to_ids))
default_values = self.sane_default['malware-sample']
if to_ids is None or not isinstance(to_ids, bool):
to_ids = bool(int(default_values['to_ids']))
to_post['request']['to_ids'] = to_ids
if category not in authorized_categs:
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(authorized_categs))))
if category is None or category not in self.categories:
category = default_values['default_category']
to_post['request']['category'] = category
to_post['request']['comment'] = comment
@ -758,16 +678,16 @@ class PyMISP(object):
def upload_sample(self, filename, filepath, event_id, distribution=None,
to_ids=True, category=None, comment=None, info=None,
analysis=None, threat_level_id=None):
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
comment, info, analysis, threat_level_id)
to_post = self._prepare_upload(event_id, distribution, to_ids, category,
comment, info, analysis, threat_level_id)
to_post['request']['files'] = [{'filename': filename, 'data': self._encode_file_to_upload(filepath)}]
return self._upload_sample(to_post)
def upload_samplelist(self, filepaths, event_id, distribution=None,
to_ids=True, category=None, info=None,
to_ids=True, category=None, comment=None, info=None,
analysis=None, threat_level_id=None):
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
info, analysis, threat_level_id)
to_post = self._prepare_upload(event_id, distribution, to_ids, category,
comment, info, analysis, threat_level_id)
files = []
for path in filepaths:
if not os.path.isfile(path):
@ -787,18 +707,14 @@ class PyMISP(object):
# ############################
def __query_proposal(self, session, path, id, attribute=None):
path = path.strip('/')
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
query = None
if path in ['add', 'edit']:
query = {'request': {'ShadowAttribute': attribute}}
if path == 'view':
response = session.post(url, data=json.dumps(query))
elif path == 'view':
response = session.get(url)
else:
if query is not None:
response = session.post(url, data=json.dumps(query))
else:
response = session.post(url)
else: # accept or discard
response = session.post(url)
return self._check_response(response)
def proposal_view(self, event_id=None, proposal_id=None):
@ -917,7 +833,7 @@ class PyMISP(object):
def search(self, values=None, not_values=None, type_attribute=None,
category=None, org=None, tags=None, not_tags=None, date_from=None,
date_to=None, last=None, controller='events'):
date_to=None, last=None, metadata=None, controller='events'):
"""
Search via the Rest API
@ -931,6 +847,7 @@ class PyMISP(object):
:param date_from: First date
:param date_to: Last date
:param last: Last updated events (for example 5d or 12h or 30m)
:param metadata: return onlymetadata if True
"""
val = self.__prepare_rest_search(values, not_values).replace('/', '|')
@ -958,6 +875,8 @@ class PyMISP(object):
query['to'] = date_to
if last is not None:
query['last'] = last
if metadata is not None:
query['metadata'] = metadata
session = self.__prepare_session()
return self.__query(session, 'restSearch/download', query, controller)
@ -1104,6 +1023,7 @@ class PyMISP(object):
return {'version': '{}.{}.{}'.format(master_version['major'], master_version['minor'], master_version['hotfix'])}
else:
return {'error': 'Impossible to retrieve the version of the master branch.'}
# ############## Export Attributes in text ####################################
def get_all_attributes_txt(self, type_attr):

View File

@ -0,0 +1,706 @@
{
"result": {
"sane_defaults": {
"md5": {
"default_category": "Payload delivery",
"to_ids": 1
},
"sha1": {
"default_category": "Payload delivery",
"to_ids": 1
},
"sha256": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename": {
"default_category": "Payload delivery",
"to_ids": 1
},
"pdb": {
"default_category": "Artifacts dropped",
"to_ids": 0
},
"filename|md5": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|sha1": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|sha256": {
"default_category": "Payload delivery",
"to_ids": 1
},
"ip-src": {
"default_category": "Network activity",
"to_ids": 1
},
"ip-dst": {
"default_category": "Network activity",
"to_ids": 1
},
"hostname": {
"default_category": "Network activity",
"to_ids": 1
},
"domain": {
"default_category": "Network activity",
"to_ids": 1
},
"domain|ip": {
"default_category": "Network activity",
"to_ids": 1
},
"email-src": {
"default_category": "Payload delivery",
"to_ids": 1
},
"email-dst": {
"default_category": "Network activity",
"to_ids": 1
},
"email-subject": {
"default_category": "Payload delivery",
"to_ids": 0
},
"email-attachment": {
"default_category": "Payload delivery",
"to_ids": 1
},
"url": {
"default_category": "External analysis",
"to_ids": 1
},
"http-method": {
"default_category": "Network activity",
"to_ids": 0
},
"user-agent": {
"default_category": "Network activity",
"to_ids": 0
},
"regkey": {
"default_category": "Persistence mechanism",
"to_ids": 1
},
"regkey|value": {
"default_category": "Persistence mechanism",
"to_ids": 1
},
"AS": {
"default_category": "Network activity",
"to_ids": 0
},
"snort": {
"default_category": "Network activity",
"to_ids": 1
},
"pattern-in-file": {
"default_category": "Payload installation",
"to_ids": 1
},
"pattern-in-traffic": {
"default_category": "Network activity",
"to_ids": 1
},
"pattern-in-memory": {
"default_category": "Payload installation",
"to_ids": 1
},
"yara": {
"default_category": "Payload installation",
"to_ids": 1
},
"vulnerability": {
"default_category": "External analysis",
"to_ids": 0
},
"attachment": {
"default_category": "External analysis",
"to_ids": 0
},
"malware-sample": {
"default_category": "Payload delivery",
"to_ids": 1
},
"link": {
"default_category": "External analysis",
"to_ids": 0
},
"comment": {
"default_category": "Other",
"to_ids": 0
},
"text": {
"default_category": "Other",
"to_ids": 0
},
"other": {
"default_category": "Other",
"to_ids": 0
},
"named pipe": {
"default_category": "Artifacts dropped",
"to_ids": 0
},
"mutex": {
"default_category": "Artifacts dropped",
"to_ids": 1
},
"target-user": {
"default_category": "Targeting data",
"to_ids": 0
},
"target-email": {
"default_category": "Targeting data",
"to_ids": 0
},
"target-machine": {
"default_category": "Targeting data",
"to_ids": 0
},
"target-org": {
"default_category": "Targeting data",
"to_ids": 0
},
"target-location": {
"default_category": "Targeting data",
"to_ids": 0
},
"target-external": {
"default_category": "Targeting data",
"to_ids": 0
},
"btc": {
"default_category": "Financial fraud",
"to_ids": 1
},
"iban": {
"default_category": "Financial fraud",
"to_ids": 1
},
"bic": {
"default_category": "Financial fraud",
"to_ids": 1
},
"bank-account-nr": {
"default_category": "Financial fraud",
"to_ids": 1
},
"aba-rtn": {
"default_category": "Financial fraud",
"to_ids": 1
},
"bin": {
"default_category": "Financial fraud",
"to_ids": 1
},
"cc-number": {
"default_category": "Financial fraud",
"to_ids": 1
},
"prtn": {
"default_category": "Financial fraud",
"to_ids": 1
},
"threat-actor": {
"default_category": "Attribution",
"to_ids": 0
},
"campaign-name": {
"default_category": "Attribution",
"to_ids": 0
},
"campaign-id": {
"default_category": "Attribution",
"to_ids": 0
},
"malware-type": {
"default_category": "Payload delivery",
"to_ids": 0
},
"uri": {
"default_category": "Network activity",
"to_ids": 1
},
"authentihash": {
"default_category": "Payload delivery",
"to_ids": 1
},
"ssdeep": {
"default_category": "Payload delivery",
"to_ids": 1
},
"imphash": {
"default_category": "Payload delivery",
"to_ids": 1
},
"pehash": {
"default_category": "Payload delivery",
"to_ids": 1
},
"sha224": {
"default_category": "Payload delivery",
"to_ids": 1
},
"sha384": {
"default_category": "Payload delivery",
"to_ids": 1
},
"sha512": {
"default_category": "Payload delivery",
"to_ids": 1
},
"sha512/224": {
"default_category": "Payload delivery",
"to_ids": 1
},
"sha512/256": {
"default_category": "Payload delivery",
"to_ids": 1
},
"tlsh": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|authentihash": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|ssdeep": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|imphash": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|pehash": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|sha224": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|sha384": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|sha512": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|sha512/224": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|sha512/256": {
"default_category": "Payload delivery",
"to_ids": 1
},
"filename|tlsh": {
"default_category": "Payload delivery",
"to_ids": 1
},
"windows-scheduled-task": {
"default_category": "Artifacts dropped",
"to_ids": 0
},
"windows-service-name": {
"default_category": "Artifacts dropped",
"to_ids": 0
},
"windows-service-displayname": {
"default_category": "Artifacts dropped",
"to_ids": 0
},
"whois-registrant-email": {
"default_category": "Attribution",
"to_ids": 0
},
"whois-registrant-phone": {
"default_category": "Attribution",
"to_ids": 0
},
"whois-registrant-name": {
"default_category": "Attribution",
"to_ids": 0
},
"whois-registrar": {
"default_category": "Attribution",
"to_ids": 0
},
"whois-creation-date": {
"default_category": "Attribution",
"to_ids": 0
},
"x509-fingerprint-sha1": {
"default_category": "Network activity",
"to_ids": 1
}
},
"types": [
"md5",
"sha1",
"sha256",
"filename",
"pdb",
"filename|md5",
"filename|sha1",
"filename|sha256",
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"email-src",
"email-dst",
"email-subject",
"email-attachment",
"url",
"http-method",
"user-agent",
"regkey",
"regkey|value",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"yara",
"vulnerability",
"attachment",
"malware-sample",
"link",
"comment",
"text",
"other",
"named pipe",
"mutex",
"target-user",
"target-email",
"target-machine",
"target-org",
"target-location",
"target-external",
"btc",
"iban",
"bic",
"bank-account-nr",
"aba-rtn",
"bin",
"cc-number",
"prtn",
"threat-actor",
"campaign-name",
"campaign-id",
"malware-type",
"uri",
"authentihash",
"ssdeep",
"imphash",
"pehash",
"sha224",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"tlsh",
"filename|authentihash",
"filename|ssdeep",
"filename|imphash",
"filename|pehash",
"filename|sha224",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|tlsh",
"windows-scheduled-task",
"windows-service-name",
"windows-service-displayname",
"whois-registrant-email",
"whois-registrant-phone",
"whois-registrant-name",
"whois-registrar",
"whois-creation-date",
"x509-fingerprint-sha1"
],
"categories": [
"Internal reference",
"Targeting data",
"Antivirus detection",
"Payload delivery",
"Artifacts dropped",
"Payload installation",
"Persistence mechanism",
"Network activity",
"Payload type",
"Attribution",
"External analysis",
"Financial fraud",
"Other"
],
"category_type_mappings": {
"Internal reference": [
"text",
"link",
"comment",
"other"
],
"Targeting data": [
"target-user",
"target-email",
"target-machine",
"target-org",
"target-location",
"target-external",
"comment"
],
"Antivirus detection": [
"link",
"comment",
"text",
"attachment",
"other"
],
"Payload delivery": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"pehash",
"tlsh",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"ip-src",
"ip-dst",
"hostname",
"domain",
"email-src",
"email-dst",
"email-subject",
"email-attachment",
"url",
"user-agent",
"AS",
"pattern-in-file",
"pattern-in-traffic",
"yara",
"attachment",
"malware-sample",
"link",
"malware-type",
"comment",
"text",
"vulnerability",
"x509-fingerprint-sha1",
"other"
],
"Artifacts dropped": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"regkey",
"regkey|value",
"pattern-in-file",
"pattern-in-memory",
"pdb",
"yara",
"attachment",
"malware-sample",
"named pipe",
"mutex",
"windows-scheduled-task",
"windows-service-name",
"windows-service-displayname",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Payload installation": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"pehash",
"tlsh",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"yara",
"vulnerability",
"attachment",
"malware-sample",
"malware-type",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Persistence mechanism": [
"filename",
"regkey",
"regkey|value",
"comment",
"text",
"other"
],
"Network activity": [
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"email-dst",
"url",
"uri",
"user-agent",
"http-method",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"attachment",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Payload type": [
"comment",
"text",
"other"
],
"Attribution": [
"threat-actor",
"campaign-name",
"campaign-id",
"whois-registrant-phone",
"whois-registrant-email",
"whois-registrant-name",
"whois-registrar",
"whois-creation-date",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"External analysis": [
"md5",
"sha1",
"sha256",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha256",
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"url",
"user-agent",
"regkey",
"regkey|value",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"vulnerability",
"attachment",
"malware-sample",
"link",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Financial fraud": [
"btc",
"iban",
"bic",
"bank-account-nr",
"aba-rtn",
"bin",
"cc-number",
"prtn",
"comment",
"text",
"other"
],
"Other": [
"comment",
"text",
"other"
]
}
}
}

322
pymisp/data/schema-lax.json Normal file
View File

@ -0,0 +1,322 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json",
"type": "object",
"properties": {
"Event": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/id",
"type": "string"
},
"orgc_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/orgc_id",
"type": "string"
},
"org_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/org_id",
"type": "string"
},
"date": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/date",
"type": "string"
},
"threat_level_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/threat_level_id",
"type": "string"
},
"info": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/info",
"type": "string"
},
"published": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/published",
"type": "boolean"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/uuid",
"type": "string"
},
"attribute_count": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/attribute_count",
"type": "string"
},
"analysis": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/analysis",
"type": "string"
},
"timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/timestamp",
"type": "string"
},
"distribution": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/distribution",
"type": "string"
},
"proposal_email_lock": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/proposal_email_lock",
"type": "boolean"
},
"locked": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/locked",
"type": "boolean"
},
"publish_timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/publish_timestamp",
"type": "string"
},
"sharing_group_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/sharing_group_id",
"type": "string"
},
"Org": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org/uuid",
"type": "string"
}
}
},
"Orgc": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc/uuid",
"type": "string"
}
}
},
"Attribute": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute",
"type": "array",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/id",
"type": "string"
},
"type": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/type",
"type": "string"
},
"category": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/category",
"type": "string"
},
"to_ids": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/to_ids",
"type": "boolean"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/uuid",
"type": "string"
},
"event_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/event_id",
"type": "string"
},
"distribution": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/distribution",
"type": "string"
},
"timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/timestamp",
"type": "string"
},
"comment": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/comment",
"type": "string"
},
"sharing_group_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/sharing_group_id",
"type": "string"
},
"value": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/value",
"type": "string"
},
"SharingGroup": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/SharingGroup",
"type": "array",
"items": {},
"additionalItems": false
},
"ShadowAttribute": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/ShadowAttribute",
"type": "array",
"items": {},
"additionalItems": false
}
}
},
"additionalItems": false
},
"ShadowAttribute": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/ShadowAttribute",
"type": "array",
"items": {},
"additionalItems": false
},
"RelatedEvent": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent",
"type": "array",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0",
"type": "object",
"properties": {
"Org": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org/uuid",
"type": "string"
}
}
},
"Orgc": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc/uuid",
"type": "string"
}
}
},
"Event": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event",
"type": "object",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/id",
"type": "string"
},
"date": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/date",
"type": "string"
},
"threat_level_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/threat_level_id",
"type": "string"
},
"info": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/info",
"type": "string"
},
"published": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/published",
"type": "boolean"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/uuid",
"type": "string"
},
"analysis": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/analysis",
"type": "string"
},
"timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/timestamp",
"type": "string"
},
"distribution": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/distribution",
"type": "string"
},
"org_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/org_id",
"type": "string"
},
"orgc_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/orgc_id",
"type": "string"
}
}
},
"additionalItems": false
}
}
},
"additionalItems": false
},
"Tag": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag",
"type": "array",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/name",
"type": "string"
},
"colour": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/colour",
"type": "string"
},
"exportable": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/exportable",
"type": "boolean"
}
}
},
"additionalItems": false
}
},
"required": [
"info",
"Attribute"
]
}
},
"required": [
"Event"
]
}

327
pymisp/data/schema.json Normal file
View File

@ -0,0 +1,327 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json",
"type": "object",
"properties": {
"Event": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/id",
"type": "string"
},
"orgc_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/orgc_id",
"type": "string"
},
"org_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/org_id",
"type": "string"
},
"date": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/date",
"type": "string"
},
"threat_level_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/threat_level_id",
"type": "string"
},
"info": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/info",
"type": "string"
},
"published": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/published",
"type": "boolean"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/uuid",
"type": "string"
},
"attribute_count": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/attribute_count",
"type": "string"
},
"analysis": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/analysis",
"type": "string"
},
"timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/timestamp",
"type": "string"
},
"distribution": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/distribution",
"type": "string"
},
"proposal_email_lock": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/proposal_email_lock",
"type": "boolean"
},
"locked": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/locked",
"type": "boolean"
},
"publish_timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/publish_timestamp",
"type": "string"
},
"sharing_group_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/sharing_group_id",
"type": "string"
},
"Org": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Org/uuid",
"type": "string"
}
}
},
"Orgc": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Orgc/uuid",
"type": "string"
}
}
},
"Attribute": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute",
"type": "array",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/id",
"type": "string"
},
"type": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/type",
"type": "string"
},
"category": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/category",
"type": "string"
},
"to_ids": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/to_ids",
"type": "boolean"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/uuid",
"type": "string"
},
"event_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/event_id",
"type": "string"
},
"distribution": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/distribution",
"type": "string"
},
"timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/timestamp",
"type": "string"
},
"comment": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/comment",
"type": "string"
},
"sharing_group_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/sharing_group_id",
"type": "string"
},
"value": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/value",
"type": "string"
},
"SharingGroup": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/SharingGroup",
"type": "array",
"items": {},
"additionalItems": false
},
"ShadowAttribute": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Attribute/17/ShadowAttribute",
"type": "array",
"items": {},
"additionalItems": false
}
}
},
"additionalItems": false
},
"ShadowAttribute": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/ShadowAttribute",
"type": "array",
"items": {},
"additionalItems": false
},
"RelatedEvent": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent",
"type": "array",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0",
"type": "object",
"properties": {
"Org": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Org/uuid",
"type": "string"
}
}
},
"Orgc": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc/name",
"type": "string"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Orgc/uuid",
"type": "string"
}
}
},
"Event": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event",
"type": "object",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/id",
"type": "string"
},
"date": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/date",
"type": "string"
},
"threat_level_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/threat_level_id",
"type": "string"
},
"info": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/info",
"type": "string"
},
"published": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/published",
"type": "boolean"
},
"uuid": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/uuid",
"type": "string"
},
"analysis": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/analysis",
"type": "string"
},
"timestamp": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/timestamp",
"type": "string"
},
"distribution": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/distribution",
"type": "string"
},
"org_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/org_id",
"type": "string"
},
"orgc_id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/RelatedEvent/0/Event/0/orgc_id",
"type": "string"
}
}
},
"additionalItems": false
}
}
},
"additionalItems": false
},
"Tag": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag",
"type": "array",
"items": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2",
"type": "object",
"properties": {
"id": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/id",
"type": "string"
},
"name": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/name",
"type": "string"
},
"colour": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/colour",
"type": "string"
},
"exportable": {
"id": "https://www.github.com/MISP/MISP/format/2.4/schema.json/Event/Tag/2/exportable",
"type": "boolean"
}
}
},
"additionalItems": false
}
},
"required": [
"date",
"threat_level_id",
"info",
"published",
"analysis",
"distribution",
"Attribute"
]
}
},
"required": [
"Event"
]
}

31
pymisp/exceptions.py Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
class PyMISPError(Exception):
def __init__(self, message):
super(PyMISPError, self).__init__(message)
self.message = message
class NewEventError(PyMISPError):
pass
class NewAttributeError(PyMISPError):
pass
class SearchError(PyMISPError):
pass
class MissingDependency(PyMISPError):
pass
class NoURL(PyMISPError):
pass
class NoKey(PyMISPError):
pass

364
pymisp/mispevent.py Normal file
View File

@ -0,0 +1,364 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import time
import json
from json import JSONEncoder
import os
try:
from dateutil.parser import parse
except ImportError:
pass
try:
import jsonschema
except ImportError:
pass
from .exceptions import PyMISPError, NewEventError, NewAttributeError
# Least dirty way to support python 2 and 3
try:
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.3")
basestring
except NameError:
basestring = str
class MISPAttribute(object):
def __init__(self, describe_types):
self.categories = describe_types['categories']
self.types = describe_types['types']
self.category_type_mapping = describe_types['category_type_mappings']
self.sane_default = describe_types['sane_defaults']
self._reinitialize_attribute()
def _reinitialize_attribute(self):
# Default values
self.category = None
self.type = None
self.value = None
self.to_ids = False
self.comment = ''
self.distribution = 5
# other possible values
self.id = None
self.uuid = None
self.timestamp = None
self.sharing_group_id = None
self.deleted = None
self.SharingGroup = []
self.ShadowAttribute = []
def set_all_values(self, **kwargs):
if kwargs.get('type') and kwargs.get('category'):
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.capitalizeformat(self.type, self.category, (', '.join(self.category_type_mapping[self.category]))))
# Required
if kwargs.get('type'):
self.type = kwargs['type']
if self.type not in self.types:
raise NewAttributeError('{} is invalid, type has to be in {}'.format(self.type, (', '.join(self.types))))
else:
raise NewAttributeError('The type of the attribute is required.')
type_defaults = self.sane_default[self.type]
if kwargs.get('value'):
self.value = kwargs['value']
else:
raise NewAttributeError('The value of the attribute is required.')
# Default values
if kwargs.get('category'):
self.category = kwargs['category']
if self.category not in self.categories:
raise NewAttributeError('{} is invalid, category has to be in {}'.format(self.category, (', '.join(self.categories))))
else:
self.category = type_defaults['default_category']
if kwargs.get('to_ids'):
self.to_ids = kwargs['to_ids']
if not isinstance(self.to_ids, bool):
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(self.to_ids))
else:
self.to_ids = bool(int(type_defaults['to_ids']))
if kwargs.get('comment'):
self.comment = kwargs['comment']
if kwargs.get('distribution'):
self.distribution = int(kwargs['distribution'])
if self.distribution not in [0, 1, 2, 3, 5]:
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 5'.format(self.distribution))
# other possible values
if kwargs.get('id'):
self.id = int(kwargs['id'])
if kwargs.get('uuid'):
self.uuid = kwargs['uuid']
if kwargs.get('timestamp'):
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs['timestamp']))
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs['sharing_group_id'])
if kwargs.get('deleted'):
self.deleted = kwargs['deleted']
if kwargs.get('SharingGroup'):
self.SharingGroup = kwargs['SharingGroup']
if kwargs.get('ShadowAttribute'):
self.ShadowAttribute = kwargs['ShadowAttribute']
def _json(self):
to_return = {'type': self.type, 'category': self.category, 'to_ids': self.to_ids,
'distribution': self.distribution, 'value': self.value,
'comment': self.comment}
if self.sharing_group_id:
to_return['sharing_group_id'] = self.sharing_group_id
to_return = _int_to_str(to_return)
return to_return
def _json_full(self):
to_return = self._json()
if self.id:
to_return['id'] = self.id
if self.uuid:
to_return['uuid'] = self.uuid
if self.timestamp:
to_return['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
if self.deleted is not None:
to_return['deleted'] = self.deleted
if self.ShadowAttribute:
to_return['ShadowAttribute'] = self.ShadowAttribute
if self.SharingGroup:
to_return['SharingGroup'] = self.SharingGroup
to_return = _int_to_str(to_return)
return to_return
class EncodeUpdate(JSONEncoder):
def default(self, obj):
try:
return obj._json()
except AttributeError:
return JSONEncoder.default(self, obj)
class EncodeFull(JSONEncoder):
def default(self, obj):
try:
return obj._json_full()
except AttributeError:
return JSONEncoder.default(self, obj)
def _int_to_str(d):
# transform all integer back to string
for k, v in d.items():
if isinstance(v, int) and not isinstance(v, bool):
d[k] = str(v)
return d
class MISPEvent(object):
def __init__(self, describe_types=None):
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
self.json_schema = json.load(open(os.path.join(self.ressources_path, 'schema.json'), 'r'))
self.json_schema_lax = json.load(open(os.path.join(self.ressources_path, 'schema-lax.json'), 'r'))
if not describe_types:
t = json.load(open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r'))
describe_types = t['result']
self.describe_types = describe_types
self.categories = describe_types['categories']
self.types = describe_types['types']
self.category_type_mapping = describe_types['category_type_mappings']
self.sane_default = describe_types['sane_defaults']
self.new = True
self.dump_full = False
self._reinitialize_event()
def _reinitialize_event(self):
# Default values for a valid event to send to a MISP instance
self.distribution = 3
self.threat_level_id = 2
self.analysis = 0
self.info = None
self.published = False
self.date = datetime.date.today()
self.attributes = []
# All other keys
self.id = None
self.orgc_id = None
self.org_id = None
self.uuid = None
self.attribute_count = None
self.timestamp = None
self.proposal_email_lock = None
self.locked = None
self.publish_timestamp = None
self.sharing_group_id = None
self.Org = None
self.Orgc = None
self.ShadowAttribute = []
self.RelatedEvent = []
self.Tag = []
def load(self, json_event):
self.new = False
self.dump_full = True
if isinstance(json_event, basestring) and os.path.exists(json_event):
# NOTE: is it a good idea? (possible security issue if an untrusted user call this method)
json_event = open(json_event, 'r')
if hasattr(json_event, 'read'):
# python2 and python3 compatible to find if we have a file
json_event = json_event.read()
if isinstance(json_event, basestring):
json_event = json.loads(json_event)
if json_event.get('response'):
event = json_event.get('response')[0]
else:
event = json_event
if not event:
raise PyMISPError('Invalid event')
# Invalid event created by MISP up to 2.4.52 (attribute_count is none instead of '0')
if event.get('Event') and event.get('Event').get('attribute_count') is None:
event['Event']['attribute_count'] = '0'
jsonschema.validate(event, self.json_schema_lax)
e = event.get('Event')
self._reinitialize_event()
self.set_all_values(**e)
def set_all_values(self, **kwargs):
# Required value
if kwargs.get('info'):
self.info = kwargs['info']
else:
raise NewAttributeError('The info field of the new event is required.')
# Default values for a valid event to send to a MISP instance
if kwargs.get('distribution') is not None:
self.distribution = int(kwargs['distribution'])
if self.distribution not in [0, 1, 2, 3]:
raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3'.format(self.distribution))
if kwargs.get('threat_level_id') is not None:
self.threat_level_id = int(kwargs['threat_level_id'])
if self.threat_level_id not in [1, 2, 3, 4]:
raise NewEventError('{} is invalid, the threat_level has to be in 1, 2, 3, 4'.format(self.threat_level_id))
if kwargs.get('analysis') is not None:
self.analysis = int(kwargs['analysis'])
if self.analysis not in [0, 1, 2]:
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(self.analysis))
if kwargs.get('published') is not None:
self.publish()
if kwargs.get('date'):
if isinstance(kwargs['date'], basestring) or isinstance(kwargs['date'], unicode):
self.date = parse(kwargs['date']).date()
elif isinstance(kwargs['date'], datetime.datetime):
self.date = kwargs['date'].date()
elif isinstance(kwargs['date'], datetime.date):
self.date = kwargs['date']
else:
raise NewEventError('Invalid format for the date: {} - {}'.format(kwargs['date'], type(kwargs['date'])))
if kwargs.get('Attribute'):
for a in kwargs['Attribute']:
attribute = MISPAttribute(self.describe_types)
attribute.set_all_values(**a)
self.attributes.append(attribute)
# All other keys
if kwargs.get('id'):
self.id = int(kwargs['id'])
if kwargs.get('orgc_id'):
self.orgc_id = int(kwargs['orgc_id'])
if kwargs.get('org_id'):
self.org_id = int(kwargs['org_id'])
if kwargs.get('uuid'):
self.uuid = kwargs['uuid']
if kwargs.get('attribute_count'):
self.attribute_count = int(kwargs['attribute_count'])
if kwargs.get('timestamp'):
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs['timestamp']))
if kwargs.get('proposal_email_lock'):
self.proposal_email_lock = kwargs['proposal_email_lock']
if kwargs.get('locked'):
self.locked = kwargs['locked']
if kwargs.get('publish_timestamp'):
self.publish_timestamp = datetime.datetime.fromtimestamp(int(kwargs['publish_timestamp']))
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs['sharing_group_id'])
if kwargs.get('Org'):
self.Org = kwargs['Org']
if kwargs.get('Orgc'):
self.Orgc = kwargs['Orgc']
if kwargs.get('ShadowAttribute'):
self.ShadowAttribute = kwargs['ShadowAttribute']
if kwargs.get('RelatedEvent'):
self.RelatedEvent = kwargs['RelatedEvent']
if kwargs.get('Tag'):
self.Tag = kwargs['Tag']
def _json(self):
to_return = {'Event': {}}
to_return['Event'] = {'distribution': self.distribution, 'info': self.info,
'date': self.date.isoformat(), 'published': self.published,
'threat_level_id': self.threat_level_id,
'analysis': self.analysis, 'Attribute': []}
if self.id:
to_return['Event']['id'] = self.id
if self.orgc_id:
to_return['Event']['orgc_id'] = self.orgc_id
if self.org_id:
to_return['Event']['org_id'] = self.org_id
if self.uuid:
to_return['Event']['uuid'] = self.uuid
if self.sharing_group_id:
to_return['Event']['sharing_group_id'] = self.sharing_group_id
if self.Tag:
to_return['Event']['Tag'] = self.Tag
to_return['Event'] = _int_to_str(to_return['Event'])
if self.attributes:
to_return['Event']['Attribute'] = [a._json() for a in self.attributes]
jsonschema.validate(to_return, self.json_schema)
return to_return
def _json_full(self):
to_return = self._json()
if self.locked is not None:
to_return['Event']['locked'] = self.locked
if self.attribute_count is not None:
to_return['Event']['attribute_count'] = self.attribute_count
if self.RelatedEvent:
to_return['Event']['RelatedEvent'] = self.RelatedEvent
if self.Org:
to_return['Event']['Org'] = self.Org
if self.Orgc:
to_return['Event']['Orgc'] = self.Orgc
if self.ShadowAttribute:
to_return['Event']['ShadowAttribute'] = self.ShadowAttribute
if self.proposal_email_lock is not None:
to_return['Event']['proposal_email_lock'] = self.proposal_email_lock
if self.locked is not None:
to_return['Event']['locked'] = self.locked
if self.publish_timestamp:
to_return['Event']['publish_timestamp'] = int(time.mktime(self.publish_timestamp.timetuple()))
if self.timestamp:
to_return['Event']['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
to_return['Event'] = _int_to_str(to_return['Event'])
if self.attributes:
to_return['Event']['Attribute'] = [a._json_full() for a in self.attributes]
jsonschema.validate(to_return, self.json_schema)
return to_return
def publish(self):
self.published = True
def unpublish(self):
self.published = False
def add_attribute(self, type, value, **kwargs):
attribute = MISPAttribute(self.describe_types)
attribute.set_all_values(type=type, value=value, **kwargs)
self.attributes.append(attribute)

View File

@ -27,5 +27,7 @@ setup(
'Topic :: Internet',
],
test_suite="tests",
install_requires=['requests'],
install_requires=['requests', 'python-dateutil', 'jsonschema'],
include_package_data=True,
package_data={'data': ['schema.json', 'schema-lax.json', 'describeTypes.json']},
)

View File

@ -0,0 +1 @@
{"Event": {"info": "Ransomware - Xorist", "publish_timestamp": "1472548231", "timestamp": "1472541011", "analysis": "2", "Attribute": [{"category": "External analysis", "comment": "Imported via the Freetext Import Tool - Xchecked via VT: b3c4ae251f8094fa15b510051835c657eaef2a6cea46075d3aec964b14a99f68", "uuid": "57c5300c-0560-4146-bfaa-40e802de0b81", "timestamp": "1472540684", "to_ids": false, "value": "https://www.virustotal.com/file/b3c4ae251f8094fa15b510051835c657eaef2a6cea46075d3aec964b14a99f68/analysis/1469554268/", "type": "link"}, {"category": "External analysis", "comment": "", "uuid": "57c5310b-dc34-43cb-8b8e-4846950d210f", "timestamp": "1472541011", "to_ids": false, "value": "http://www.xylibox.com/2011/06/have-fun-with-trojan-ransomwin32xorist.html", "type": "link"}, {"category": "Other", "comment": "", "uuid": "57c444c0-8004-48fa-9c33-8aca950d210f", "timestamp": "1472480448", "to_ids": false, "value": "UPX packed", "type": "comment"}, {"category": "Other", "comment": "", "uuid": "57c44648-96f4-45d4-a8eb-453e950d210f", "timestamp": "1472480840", "to_ids": false, "value": "Key: 85350044dF4AC3518D185678A9414A7F,\r\nEncryption rounds:8,\r\nStart offset: 64,\r\nAlgorithm: TEA", "type": "text"}, {"category": "Payload delivery", "comment": "Imported via the Freetext Import Tool", "uuid": "57c4448a-fb04-457d-87e7-4127950d210f", "timestamp": "1472480394", "to_ids": true, "value": "3Z4wnG9603it23y.exe", "type": "filename"}, {"category": "Payload delivery", "comment": "Imported via the Freetext Import Tool", "uuid": "57c4448b-454c-4d17-90d1-4d2f950d210f", "timestamp": "1472480395", "to_ids": true, "value": "0749bae92ca336a02c83d126e04ec628", "type": "md5"}, {"category": "Payload delivery", "comment": "Imported via the Freetext Import Tool", "uuid": "57c4448a-bef0-4ba7-a071-444e950d210f", "timestamp": "1472480394", "to_ids": true, "value": "77b0c41b7d340b8a3d903f21347bbf06aa766b5b", "type": "sha1"}, {"category": "Payload delivery", "comment": "Imported via the Freetext Import Tool", "uuid": "57c4448b-3fa4-4d65-9ccc-4afa950d210f", "timestamp": "1472480395", "to_ids": true, "value": "b3c4ae251f8094fa15b510051835c657eaef2a6cea46075d3aec964b14a99f68", "type": "sha256"}, {"category": "Persistence mechanism", "comment": "", "uuid": "57c54b0f-27a4-458b-8e63-4455950d210f", "timestamp": "1472547599", "to_ids": true, "value": "Software\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Run|%TEMP%\\3Z4wnG9603it23y.exe", "type": "regkey|value"}], "Tag": [{"colour": "#ffffff", "exportable": true, "name": "tlp:white"}, {"colour": "#3d7a00", "exportable": true, "name": "circl:incident-classification=\"malware\""}, {"colour": "#420053", "exportable": true, "name": "ms-caro-malware:malware-type=\"Ransom\""}, {"colour": "#2c4f00", "exportable": true, "name": "malware_classification:malware-category=\"Ransomware\""}], "published": true, "date": "2016-08-29", "Orgc": {"name": "CIRCL", "uuid": "55f6ea5e-2c60-40e5-964f-47a8950d210f"}, "threat_level_id": "3", "uuid": "57c4445b-c548-4654-af0b-4be3950d210f"}}

View File

@ -1,368 +0,0 @@
{
"result": {
"types": [
"md5",
"sha1",
"sha256",
"filename",
"pdb",
"filename|md5",
"filename|sha1",
"filename|sha256",
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"email-src",
"email-dst",
"email-subject",
"email-attachment",
"url",
"http-method",
"user-agent",
"regkey",
"regkey|value",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"yara",
"vulnerability",
"attachment",
"malware-sample",
"link",
"comment",
"text",
"other",
"named pipe",
"mutex",
"target-user",
"target-email",
"target-machine",
"target-org",
"target-location",
"target-external",
"btc",
"iban",
"bic",
"bank-account-nr",
"aba-rtn",
"bin",
"cc-number",
"prtn",
"threat-actor",
"campaign-name",
"campaign-id",
"malware-type",
"uri",
"authentihash",
"ssdeep",
"imphash",
"pehash",
"sha224",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"tlsh",
"filename|authentihash",
"filename|ssdeep",
"filename|imphash",
"filename|pehash",
"filename|sha224",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|tlsh",
"windows-scheduled-task",
"windows-service-name",
"windows-service-displayname",
"whois-registrant-email",
"whois-registrant-phone",
"whois-registrant-name",
"whois-registrar",
"whois-creation-date",
"targeted-threat-index",
"mailslot",
"pipe",
"ssl-cert-attributes",
"x509-fingerprint-sha1"
],
"categories": [
"Internal reference",
"Targeting data",
"Antivirus detection",
"Payload delivery",
"Artifacts dropped",
"Payload installation",
"Persistence mechanism",
"Network activity",
"Payload type",
"Attribution",
"External analysis",
"Financial fraud",
"Other"
],
"category_type_mappings": {
"Internal reference": [
"link",
"comment",
"text",
"other"
],
"Targeting data": [
"target-user",
"target-email",
"target-machine",
"target-org",
"target-location",
"target-external",
"comment"
],
"Antivirus detection": [
"link",
"comment",
"text",
"attachment",
"other"
],
"Payload delivery": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"pehash",
"tlsh",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"ip-src",
"ip-dst",
"hostname",
"domain",
"email-src",
"email-dst",
"email-subject",
"email-attachment",
"url",
"user-agent",
"AS",
"pattern-in-file",
"pattern-in-traffic",
"yara",
"attachment",
"malware-sample",
"link",
"malware-type",
"comment",
"text",
"vulnerability",
"x509-fingerprint-sha1",
"other"
],
"Artifacts dropped": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"regkey",
"regkey|value",
"pattern-in-file",
"pattern-in-memory",
"pdb",
"yara",
"attachment",
"malware-sample",
"named pipe",
"mutex",
"windows-scheduled-task",
"windows-service-name",
"windows-service-displayname",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Payload installation": [
"md5",
"sha1",
"sha224",
"sha256",
"sha384",
"sha512",
"sha512/224",
"sha512/256",
"ssdeep",
"imphash",
"authentihash",
"pehash",
"tlsh",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha224",
"filename|sha256",
"filename|sha384",
"filename|sha512",
"filename|sha512/224",
"filename|sha512/256",
"filename|authentihash",
"filename|ssdeep",
"filename|tlsh",
"filename|imphash",
"filename|pehash",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"yara",
"vulnerability",
"attachment",
"malware-sample",
"malware-type",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Persistence mechanism": [
"filename",
"regkey",
"regkey|value",
"comment",
"text",
"other"
],
"Network activity": [
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"email-dst",
"url",
"uri",
"user-agent",
"http-method",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"attachment",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Payload type": [
"comment",
"text",
"other"
],
"Attribution": [
"threat-actor",
"campaign-name",
"campaign-id",
"whois-registrant-phone",
"whois-registrant-email",
"whois-registrant-name",
"whois-registrar",
"whois-creation-date",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"External analysis": [
"md5",
"sha1",
"sha256",
"filename",
"filename|md5",
"filename|sha1",
"filename|sha256",
"ip-src",
"ip-dst",
"hostname",
"domain",
"domain|ip",
"url",
"user-agent",
"regkey",
"regkey|value",
"AS",
"snort",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
"vulnerability",
"attachment",
"malware-sample",
"link",
"comment",
"text",
"x509-fingerprint-sha1",
"other"
],
"Financial fraud": [
"btc",
"iban",
"bic",
"bank-account-nr",
"aba-rtn",
"bin",
"cc-number",
"prtn",
"comment",
"text",
"other"
],
"Other": [
"comment",
"text",
"other"
]
}
}
}

View File

@ -4,9 +4,14 @@
import unittest
import requests_mock
import json
import os
import pymisp as pm
from pymisp import PyMISP
from pymisp import NewEventError
from pymisp import MISPEvent
from pymisp import EncodeUpdate
from pymisp import EncodeFull
@requests_mock.Mocker()
@ -18,7 +23,8 @@ class TestOffline(unittest.TestCase):
self.key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
self.event = {'Event': json.load(open('tests/misp_event.json', 'r'))}
self.new_misp_event = {'Event': json.load(open('tests/new_misp_event.json', 'r'))}
self.types = json.load(open('tests/describeTypes.json', 'r'))
self.ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../pymisp/data')
self.types = json.load(open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r'))
self.sharing_groups = json.load(open('tests/sharing_groups.json', 'r'))
self.auth_error_msg = {"name": "Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header.",
"message": "Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header.",
@ -70,10 +76,10 @@ class TestOffline(unittest.TestCase):
def test_publish(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
e = pymisp.publish(self.event)
e = pymisp.publish(self.event) # requests-mock always return the non-published event
pub = self.event
pub['Event']['published'] = True
self.assertEqual(e, pub)
# self.assertEqual(e, pub) FIXME: broken test, not-published event returned
e = pymisp.publish(self.event)
self.assertEqual(e, {'error': 'Already published'})
@ -104,19 +110,21 @@ class TestOffline(unittest.TestCase):
error_empty_info_flatten = {u'message': u'The event could not be saved.', u'name': u'Add event failed.', u'errors': [u"Error in info: Info cannot be empty."], u'url': u'/events/add'}
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
with self.assertRaises(pm.api.NewEventError):
pymisp.new_event()
with self.assertRaises(pm.api.NewEventError):
pymisp.new_event(0)
with self.assertRaises(pm.api.NewEventError):
pymisp.new_event(0, 1)
m.register_uri('POST', self.domain + 'events', json=error_empty_info)
response = pymisp.new_event(0, 1, 0)
# TODO Add test exception if info field isn't set
response = pymisp.new_event(0, 1, 0, 'Foo')
self.assertEqual(response, error_empty_info_flatten)
m.register_uri('POST', self.domain + 'events', json=self.new_misp_event)
response = pymisp.new_event(0, 1, 0, "This is a test.", '2016-08-26', False)
self.assertEqual(response, self.new_misp_event)
def test_eventObject(self, m):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
misp_event = MISPEvent(pymisp.describe_types)
misp_event.load(open('tests/57c4445b-c548-4654-af0b-4be3950d210f.json', 'r').read())
json.dumps(misp_event, cls=EncodeUpdate)
json.dumps(misp_event, cls=EncodeFull)
if __name__ == '__main__':
unittest.main()