mirror of https://github.com/MISP/PyMISP
new: [Search] Add a few new options in rest search
parent
edb17ab092
commit
66ccc7d082
|
@ -1260,7 +1260,7 @@ class ExpandedPyMISP(PyMISP):
|
|||
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
|
||||
to_ids: Optional[Union[ToIDSType, List[ToIDSType]]]=None,
|
||||
deleted: Optional[str]=None,
|
||||
include_event_uuid: Optional[str]=None, includeEventUuid: Optional[str]=None,
|
||||
include_event_uuid: Optional[bool]=None, includeEventUuid: Optional[bool]=None,
|
||||
event_timestamp: Optional[DateTypes]=None,
|
||||
sg_reference_only: Optional[bool]=None,
|
||||
eventinfo: Optional[str]=None,
|
||||
|
@ -1268,6 +1268,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
requested_attributes: Optional[str]=None,
|
||||
include_context: Optional[bool]=None, includeContext: Optional[bool]=None,
|
||||
headerless: Optional[bool]=None,
|
||||
include_sightings: Optional[bool]=None, includeSightings: Optional[bool]=None,
|
||||
include_correlations: Optional[bool]=None, includeCorrelations: Optional[bool]=None,
|
||||
pythonify: Optional[bool]=False,
|
||||
**kwargs):
|
||||
'''Search in the MISP instance
|
||||
|
@ -1299,8 +1301,10 @@ class ExpandedPyMISP(PyMISP):
|
|||
:param eventinfo: Filter on the event's info field.
|
||||
:param searchall: Search for a full or a substring (delimited by % for substrings) in the event info, event tags, attribute tags, attribute values or attribute comment fields.
|
||||
:param requested_attributes: [CSV only] Select the fields that you wish to include in the CSV export. By setting event level fields additionally, includeContext is not required to get event metadata.
|
||||
:param include_context: [CSV Only] Include the event data with each attribute.
|
||||
:param include_context: [Attribute only] Include the event data with each attribute.
|
||||
:param headerless: [CSV Only] The CSV created when this setting is set to true will not contain the header row.
|
||||
:param include_sightings: [JSON Only - Attribute] Include the sightings of the matching attributes.
|
||||
:param include_correlations: [JSON Only - attribute] Include the correlations of the matching attributes.
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
|
||||
Deprecated:
|
||||
|
@ -1332,7 +1336,10 @@ class ExpandedPyMISP(PyMISP):
|
|||
include_event_uuid = includeEventUuid
|
||||
if includeContext is not None:
|
||||
include_context = includeContext
|
||||
|
||||
if includeCorrelations is not None:
|
||||
include_correlations = includeCorrelations
|
||||
if includeSightings is not None:
|
||||
include_sightings = includeSightings
|
||||
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
|
||||
# They are passed as-is.
|
||||
query = kwargs
|
||||
|
@ -1352,8 +1359,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
query['from'] = self._make_timestamp(date_from)
|
||||
query['to'] = self._make_timestamp(date_to)
|
||||
query['eventid'] = eventid
|
||||
query['withAttachments'] = with_attachments
|
||||
query['metadata'] = metadata
|
||||
query['withAttachments'] = self._make_misp_bool(with_attachments)
|
||||
query['metadata'] = self._make_misp_bool(metadata)
|
||||
query['uuid'] = uuid
|
||||
if publish_timestamp is not None:
|
||||
if isinstance(publish_timestamp, (list, tuple)):
|
||||
|
@ -1366,24 +1373,26 @@ class ExpandedPyMISP(PyMISP):
|
|||
else:
|
||||
query['timestamp'] = self._make_timestamp(timestamp)
|
||||
query['published'] = published
|
||||
query['enforceWarninglist'] = enforce_warninglist
|
||||
query['enforceWarninglist'] = self._make_misp_bool(enforce_warninglist)
|
||||
if to_ids is not None:
|
||||
if int(to_ids) not in [0, 1]:
|
||||
raise ValueError('to_ids has to be in {}'.format(', '.join([0, 1])))
|
||||
query['to_ids'] = to_ids
|
||||
query['deleted'] = deleted
|
||||
query['includeEventUuid'] = include_event_uuid
|
||||
query['includeEventUuid'] = self._make_misp_bool(include_event_uuid)
|
||||
if event_timestamp is not None:
|
||||
if isinstance(event_timestamp, (list, tuple)):
|
||||
query['event_timestamp'] = (self._make_timestamp(event_timestamp[0]), self._make_timestamp(event_timestamp[1]))
|
||||
else:
|
||||
query['event_timestamp'] = self._make_timestamp(event_timestamp)
|
||||
query['sgReferenceOnly'] = sg_reference_only
|
||||
query['sgReferenceOnly'] = self._make_misp_bool(sg_reference_only)
|
||||
query['eventinfo'] = eventinfo
|
||||
query['searchall'] = searchall
|
||||
query['requested_attributes'] = requested_attributes
|
||||
query['includeContext'] = include_context
|
||||
query['headerless'] = headerless
|
||||
query['includeContext'] = self._make_misp_bool(include_context)
|
||||
query['headerless'] = self._make_misp_bool(headerless)
|
||||
query['includeSightings'] = self._make_misp_bool(include_sightings)
|
||||
query['includeCorrelations'] = self._make_misp_bool(include_correlations)
|
||||
url = urljoin(self.root_url, f'{controller}/restSearch')
|
||||
response = self._prepare_request('POST', url, data=query)
|
||||
if return_format == 'json':
|
||||
|
@ -1404,9 +1413,32 @@ class ExpandedPyMISP(PyMISP):
|
|||
me.load(e)
|
||||
to_return.append(me)
|
||||
elif controller == 'attributes':
|
||||
# FIXME: obvs, this is hurting my soul. We need something generic.
|
||||
for a in normalized_response.get('Attribute'):
|
||||
ma = MISPAttribute()
|
||||
ma.from_dict(**a)
|
||||
if 'Event' in ma:
|
||||
me = MISPEvent()
|
||||
me.from_dict(**ma.Event)
|
||||
ma.Event = me
|
||||
if 'RelatedAttribute' in ma:
|
||||
related_attributes = []
|
||||
for ra in ma.RelatedAttribute:
|
||||
r_attribute = MISPAttribute()
|
||||
r_attribute.from_dict(**ra)
|
||||
if 'Event' in r_attribute:
|
||||
me = MISPEvent()
|
||||
me.from_dict(**r_attribute.Event)
|
||||
r_attribute.Event = me
|
||||
related_attributes.append(r_attribute)
|
||||
ma.RelatedAttribute = related_attributes
|
||||
if 'Sighting' in ma:
|
||||
sightings = []
|
||||
for sighting in ma.Sighting:
|
||||
s = MISPSighting()
|
||||
s.from_dict(**sighting)
|
||||
sightings.append(s)
|
||||
ma.Sighting = sightings
|
||||
to_return.append(ma)
|
||||
elif controller == 'objects':
|
||||
raise PyMISPNotImplementedYet('Not implemented yet')
|
||||
|
@ -1764,6 +1796,12 @@ class ExpandedPyMISP(PyMISP):
|
|||
return obj['id']
|
||||
return obj['uuid']
|
||||
|
||||
def _make_misp_bool(self, parameter: Union[bool, str, None]):
|
||||
'''MISP wants 0 or 1 for bool, so we avoid True/False '0', '1' '''
|
||||
if parameter is None:
|
||||
return 0
|
||||
return 1 if int(parameter) else 0
|
||||
|
||||
def _make_timestamp(self, value: DateTypes):
|
||||
'''Catch-all method to normalize anything that can be converted to a timestamp'''
|
||||
if isinstance(value, datetime):
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit d2f955bc74eefdbe76fd8dabb835c5b9345b212b
|
||||
Subproject commit 8c445fe1a42ec88bf5e990ffcc48153c433c43e4
|
|
@ -593,7 +593,7 @@ class MISPEvent(AbstractMISP):
|
|||
if to_return.get('publish_timestamp'):
|
||||
to_return['publish_timestamp'] = self._datetime_to_timestamp(self.publish_timestamp)
|
||||
|
||||
return {'Event': _int_to_str(to_return)}
|
||||
return to_return
|
||||
|
||||
def add_proposal(self, shadow_attribute=None, **kwargs):
|
||||
"""Alias for add_shadow_attribute"""
|
||||
|
|
|
@ -102,7 +102,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
first_event.threat_level_id = ThreatLevel.low
|
||||
first_event.analysis = Analysis.completed
|
||||
first_event.set_date("2017-12-31")
|
||||
first_event.add_attribute('text', str(uuid4()))
|
||||
first_event.add_attribute('text', 'FIRST_EVENT' + str(uuid4()))
|
||||
first_event.attributes[0].add_tag('admin_only')
|
||||
first_event.attributes[0].add_tag('tlp:white___test')
|
||||
first_event.add_attribute('text', str(uuid4()))
|
||||
|
@ -114,7 +114,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
second_event.threat_level_id = ThreatLevel.medium
|
||||
second_event.analysis = Analysis.ongoing
|
||||
second_event.set_date("Aug 18 2018")
|
||||
second_event.add_attribute('text', str(uuid4()))
|
||||
second_event.add_attribute('text', 'SECOND_EVENT' + str(uuid4()))
|
||||
second_event.attributes[0].add_tag('tlp:white___test')
|
||||
second_event.add_attribute('ip-dst', '1.1.1.1')
|
||||
second_event.attributes[1].add_tag('tlp:amber___test')
|
||||
|
@ -128,7 +128,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
third_event.analysis = Analysis.initial
|
||||
third_event.set_date("Jun 25 2018")
|
||||
third_event.add_tag('tlp:white___test')
|
||||
third_event.add_attribute('text', str(uuid4()))
|
||||
third_event.add_attribute('text', 'THIRD_EVENT' + str(uuid4()))
|
||||
third_event.attributes[0].add_tag('tlp:amber___test')
|
||||
third_event.attributes[0].add_tag('foo_double___test')
|
||||
third_event.add_attribute('ip-src', '8.8.8.8')
|
||||
|
@ -186,6 +186,34 @@ class TestComprehensive(unittest.TestCase):
|
|||
# Non-existing value
|
||||
attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()))
|
||||
self.assertEqual(attributes, [])
|
||||
|
||||
# Include context - search as user (can only see one event)
|
||||
attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_context=True, pythonify=True)
|
||||
self.assertTrue(isinstance(attributes[0].Event, MISPEvent))
|
||||
self.assertEqual(attributes[0].Event.uuid, second.uuid)
|
||||
|
||||
# Include context - search as admin (can see both event)
|
||||
attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_context=True, pythonify=True)
|
||||
self.assertTrue(isinstance(attributes[0].Event, MISPEvent))
|
||||
self.assertEqual(attributes[0].Event.uuid, first.uuid)
|
||||
self.assertEqual(attributes[1].Event.uuid, second.uuid)
|
||||
|
||||
# Include correlations - search as admin (can see both event)
|
||||
attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_correlations=True, pythonify=True)
|
||||
self.assertTrue(isinstance(attributes[0].Event, MISPEvent))
|
||||
self.assertEqual(attributes[0].Event.uuid, first.uuid)
|
||||
self.assertEqual(attributes[1].Event.uuid, second.uuid)
|
||||
self.assertEqual(attributes[0].RelatedAttribute[0].Event.uuid, second.uuid)
|
||||
self.assertEqual(attributes[1].RelatedAttribute[0].Event.uuid, first.uuid)
|
||||
|
||||
# Include sightings - search as admin (can see both event)
|
||||
self.admin_misp_connector.add_sighting({'value': first.attributes[0].value})
|
||||
attributes = self.admin_misp_connector.search(controller='attributes', value=first.attributes[0].value, include_sightings=True, pythonify=True)
|
||||
self.assertTrue(isinstance(attributes[0].Event, MISPEvent))
|
||||
self.assertEqual(attributes[0].Event.uuid, first.uuid)
|
||||
self.assertEqual(attributes[1].Event.uuid, second.uuid)
|
||||
self.assertTrue(isinstance(attributes[0].Sighting[0], MISPSighting))
|
||||
|
||||
finally:
|
||||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first.id)
|
||||
|
@ -863,10 +891,8 @@ class TestComprehensive(unittest.TestCase):
|
|||
|
||||
# headerless
|
||||
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', headerless=True)
|
||||
# FIXME: The header is here.
|
||||
# print(csv)
|
||||
# Expects 2 lines after removing the empty ones.
|
||||
# self.assertEqual(len(csv.strip().split('\n')), 2)
|
||||
self.assertEqual(len(csv.strip().split('\n')), 2)
|
||||
|
||||
# include_context
|
||||
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', include_context=True)
|
||||
|
@ -876,15 +902,16 @@ class TestComprehensive(unittest.TestCase):
|
|||
|
||||
# requested_attributes
|
||||
columns = ['value', 'event_id']
|
||||
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', requested_attributes=columns)
|
||||
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01',
|
||||
date_to='2018-09-02', requested_attributes=columns)
|
||||
self.assertEqual(len(csv[0].keys()), 2)
|
||||
for k in columns:
|
||||
self.assertTrue(k in csv[0])
|
||||
|
||||
finally:
|
||||
# FIXME Publish is async, if we delete the event too fast, we have an empty one.
|
||||
# https://github.com/MISP/MISP/issues/4886
|
||||
time.sleep(10)
|
||||
finally:
|
||||
time.sleep(5)
|
||||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first.id)
|
||||
self.admin_misp_connector.delete_event(second.id)
|
||||
|
|
Loading…
Reference in New Issue