|
|
|
@ -47,6 +47,10 @@ class NewAttributeError(PyMISPError):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SearchError(PyMISPError):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MissingDependency(PyMISPError):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
@ -112,7 +116,7 @@ class PyMISP(object):
|
|
|
|
|
'mutex', 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment',
|
|
|
|
|
'text', 'email-src', 'email-dst', 'email-subject', 'email-attachment',
|
|
|
|
|
'yara', 'target-user', 'target-email', 'target-machine', 'target-org',
|
|
|
|
|
'target-location', 'target-external', 'other']
|
|
|
|
|
'target-location', 'target-external', 'other', 'threat-actor']
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Make sure the MISP instance is working and the URL is valid
|
|
|
|
@ -170,6 +174,8 @@ class PyMISP(object):
|
|
|
|
|
raise PyMISPError('Unknown error: {}'.format(response.text))
|
|
|
|
|
|
|
|
|
|
errors = []
|
|
|
|
|
if isinstance(to_return, list):
|
|
|
|
|
to_return = {'response': to_return}
|
|
|
|
|
if to_return.get('error'):
|
|
|
|
|
if not isinstance(to_return['error'], list):
|
|
|
|
|
errors.append(to_return['error'])
|
|
|
|
@ -202,7 +208,7 @@ class PyMISP(object):
|
|
|
|
|
url = urljoin(self.root_url, 'events/index')
|
|
|
|
|
if filters is not None:
|
|
|
|
|
filters = json.dumps(filters)
|
|
|
|
|
print filters
|
|
|
|
|
print(filters)
|
|
|
|
|
return session.post(url, data=filters)
|
|
|
|
|
else:
|
|
|
|
|
return session.get(url)
|
|
|
|
@ -303,8 +309,8 @@ class PyMISP(object):
|
|
|
|
|
if distribution is not None:
|
|
|
|
|
distribution = int(distribution)
|
|
|
|
|
# If None: take the default value of the event
|
|
|
|
|
if distribution not in [None, 0, 1, 2, 3]:
|
|
|
|
|
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3 or None'.format(distribution))
|
|
|
|
|
if distribution not in [None, 0, 1, 2, 3, 5]:
|
|
|
|
|
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 5 or None'.format(distribution))
|
|
|
|
|
if distribution is not None:
|
|
|
|
|
to_return['distribution'] = distribution
|
|
|
|
|
|
|
|
|
@ -354,6 +360,20 @@ class PyMISP(object):
|
|
|
|
|
response = self.update_event(event['Event']['id'], event, 'json')
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_tag(self, event, tag):
|
|
|
|
|
session = self.__prepare_session('json')
|
|
|
|
|
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
|
|
|
|
response = session.post(urljoin(self.root_url, 'events/addTag'), data=json.dumps(to_post))
|
|
|
|
|
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def change_threat_level(self, event, threat_level_id):
|
|
|
|
|
event['Event']['threat_level_id'] = threat_level_id
|
|
|
|
|
self._prepare_update(event)
|
|
|
|
|
response = self.update_event(event['Event']['id'], event)
|
|
|
|
|
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ##### File attributes #####
|
|
|
|
|
|
|
|
|
|
def _send_attributes(self, event, attributes, proposal=False):
|
|
|
|
@ -527,6 +547,13 @@ class PyMISP(object):
|
|
|
|
|
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
|
|
|
|
|
return self._send_attributes(event, attributes, proposal)
|
|
|
|
|
|
|
|
|
|
# ##### Attribution attributes #####
|
|
|
|
|
|
|
|
|
|
def add_threat_actor(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
|
|
|
|
attributes = []
|
|
|
|
|
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
|
|
|
|
|
return self._send_attributes(event, attributes, proposal)
|
|
|
|
|
|
|
|
|
|
# ##################################################
|
|
|
|
|
# ######### Upload samples through the API #########
|
|
|
|
|
# ##################################################
|
|
|
|
@ -545,7 +572,7 @@ class PyMISP(object):
|
|
|
|
|
def prepare_attribute(self, event_id, distribution, to_ids, category, info,
|
|
|
|
|
analysis, threat_level_id):
|
|
|
|
|
to_post = {'request': {}}
|
|
|
|
|
authorized_categs = ['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis']
|
|
|
|
|
authorized_categs = ['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis', 'Network activity', 'Antivirus detection']
|
|
|
|
|
|
|
|
|
|
if event_id is not None:
|
|
|
|
|
try:
|
|
|
|
@ -654,6 +681,51 @@ class PyMISP(object):
|
|
|
|
|
response = session.post(url, data=json.dumps(query))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def search_index(self, published=None, eventid=None, tag=None, datefrom=None,
|
|
|
|
|
dateto=None, eventinfo=None, threatlevel=None, distribution=None,
|
|
|
|
|
analysis=None, attribute=None, org=None):
|
|
|
|
|
"""
|
|
|
|
|
Search only at the index level. Use ! infront of value as NOT, default OR
|
|
|
|
|
|
|
|
|
|
:param published: Published (0,1)
|
|
|
|
|
:param eventid: Evend ID(s) | str or list
|
|
|
|
|
:param tag: Tag(s) | str or list
|
|
|
|
|
:param datefrom: First date, in format YYYY-MM-DD
|
|
|
|
|
:param datefrom: Last date, in format YYYY-MM-DD
|
|
|
|
|
:param eventinfo: Event info(s) to match | str or list
|
|
|
|
|
:param threatlevel: Threat level(s) (1,2,3,4) | str or list
|
|
|
|
|
:param distribution: Distribution level(s) (0,1,2,3) | str or list
|
|
|
|
|
:param analysis: Analysis level(s) (0,1,2) | str or list
|
|
|
|
|
:param org: Organisation(s) | str or list
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
allowed = {'published': published, 'eventid': eventid, 'tag': tag, 'Dateto': dateto,
|
|
|
|
|
'Datefrom': datefrom, 'eventinfo': eventinfo, 'threatlevel': threatlevel,
|
|
|
|
|
'distribution': distribution, 'analysis': analysis, 'attribute': attribute,
|
|
|
|
|
'org': org}
|
|
|
|
|
rule_levels = {'distribution': ["0", "1", "2", "3", "!0", "!1", "!2", "!3"],
|
|
|
|
|
'threatlevel': ["1", "2", "3", "4", "!1", "!2", "!3", "!4"],
|
|
|
|
|
'analysis': ["0", "1", "2", "!0", "!1", "!2"]}
|
|
|
|
|
buildup_url = "events/index"
|
|
|
|
|
|
|
|
|
|
for rule in allowed.keys():
|
|
|
|
|
if allowed[rule] is not None:
|
|
|
|
|
if not isinstance(allowed[rule], list):
|
|
|
|
|
allowed[rule] = [allowed[rule]]
|
|
|
|
|
allowed[rule] = map(str, allowed[rule])
|
|
|
|
|
if rule in rule_levels:
|
|
|
|
|
if not set(allowed[rule]).issubset(rule_levels[rule]):
|
|
|
|
|
raise SearchError('Values in your {} are invalid, has to be in {}'.format(rule, ', '.join(str(x) for x in rule_levels[rule])))
|
|
|
|
|
if type(allowed[rule]) == list:
|
|
|
|
|
joined = '|'.join(str(x) for x in allowed[rule])
|
|
|
|
|
buildup_url += '/search{}:{}'.format(rule, joined)
|
|
|
|
|
else:
|
|
|
|
|
buildup_url += '/search{}:{}'.format(rule, allowed[rule])
|
|
|
|
|
session = self.__prepare_session('json')
|
|
|
|
|
url = urljoin(self.root_url, buildup_url)
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def search_all(self, value):
|
|
|
|
|
query = {'value': value, 'searchall': 1}
|
|
|
|
|
session = self.__prepare_session('json')
|
|
|
|
@ -824,6 +896,13 @@ class PyMISP(object):
|
|
|
|
|
to_return.append(tag['name'])
|
|
|
|
|
return to_return
|
|
|
|
|
|
|
|
|
|
def new_tag(self, name=None, colour="#00ace6", exportable=False):
|
|
|
|
|
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable}}
|
|
|
|
|
session = self.__prepare_session('json')
|
|
|
|
|
url = urljoin(self.root_url, 'tags/add')
|
|
|
|
|
response = session.post(url, data=json.dumps(to_post))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ########## Version ##########
|
|
|
|
|
|
|
|
|
|
def get_api_version(self):
|
|
|
|
@ -862,7 +941,14 @@ class PyMISP(object):
|
|
|
|
|
return {'version': '{}.{}.{}'.format(master_version['major'], master_version['minor'], master_version['hotfix'])}
|
|
|
|
|
else:
|
|
|
|
|
return {'error': 'Impossible to retrieve the version of the master branch.'}
|
|
|
|
|
# ############## Export Attributes in text ####################################
|
|
|
|
|
|
|
|
|
|
def get_all_attributes_txt(self, type_attr):
|
|
|
|
|
|
|
|
|
|
session = self.__prepare_session('txt')
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/text/download/%s' % type_attr)
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
return response
|
|
|
|
|
# ############## Deprecated (Pure XML API should not be used) ##################
|
|
|
|
|
|
|
|
|
|
@deprecated
|
|
|
|
|