mirror of https://github.com/MISP/PyMISP
Allow search by 'event_tags' and improve the handling of galaxy clusters.
Changes: - Add 'event_tags' parameter when searching for events - Add new method to search for galaxies by value - Add new parameter to fetch cluster information when retrieving clusters - Add new parameter to hard-delete object referencespull/994/head
parent
d488ba3716
commit
565447ce36
|
@ -46,7 +46,7 @@ pip3 install pymisp[virustotal,email]
|
|||
```
|
||||
git clone https://github.com/MISP/PyMISP.git && cd PyMISP
|
||||
git submodule update --init
|
||||
poetry install -E fileobjects -E openioc -E virustotal -E docs -E pdfexport
|
||||
poetry install -E fileobjects -E openioc -E virustotal -E docs -E pdfexport -E email
|
||||
```
|
||||
|
||||
### Running the tests
|
||||
|
|
|
@ -643,13 +643,17 @@ class PyMISP:
|
|||
ref.from_dict(**object_reference)
|
||||
return ref
|
||||
|
||||
def delete_object_reference(self, object_reference: Union[MISPObjectReference, int, str, UUID]) -> Dict:
|
||||
"""Delete a reference to an object
|
||||
|
||||
:param object_reference: object reference
|
||||
"""
|
||||
def delete_object_reference(
|
||||
self,
|
||||
object_reference: Union[MISPObjectReference, int, str, UUID],
|
||||
hard: bool = False,
|
||||
) -> Dict:
|
||||
"""Delete a reference to an object."""
|
||||
object_reference_id = get_uuid_or_id_from_abstract_misp(object_reference)
|
||||
response = self._prepare_request('POST', f'objectReferences/delete/{object_reference_id}')
|
||||
query_url = f"objectReferences/delete/{object_reference_id}"
|
||||
if hard:
|
||||
query_url += "/true"
|
||||
response = self._prepare_request("POST", query_url)
|
||||
return self._check_json_response(response)
|
||||
|
||||
# Object templates
|
||||
|
@ -1446,7 +1450,11 @@ class PyMISP:
|
|||
|
||||
# ## BEGIN Galaxy ###
|
||||
|
||||
def galaxies(self, pythonify: bool = False) -> Union[Dict, List[MISPGalaxy]]:
|
||||
def galaxies(
|
||||
self,
|
||||
withCluster: bool = False,
|
||||
pythonify: bool = False,
|
||||
) -> Union[Dict, List[MISPGalaxy]]:
|
||||
"""Get all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/getGalaxies
|
||||
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
|
@ -1458,7 +1466,25 @@ class PyMISP:
|
|||
to_return = []
|
||||
for galaxy in galaxies:
|
||||
g = MISPGalaxy()
|
||||
g.from_dict(**galaxy)
|
||||
g.from_dict(**galaxy, withCluster=withCluster)
|
||||
to_return.append(g)
|
||||
return to_return
|
||||
|
||||
def search_galaxy(
|
||||
self,
|
||||
value: str,
|
||||
withCluster: bool = False,
|
||||
pythonify: bool = False,
|
||||
) -> Union[Dict, List[MISPGalaxy]]:
|
||||
"""Text search to find a matching galaxy name, namespace, description, or uuid."""
|
||||
r = self._prepare_request("POST", "galaxies", data={"value": value})
|
||||
galaxies = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or "errors" in galaxies:
|
||||
return galaxies
|
||||
to_return = []
|
||||
for galaxy in galaxies:
|
||||
g = MISPGalaxy()
|
||||
g.from_dict(**galaxy, withCluster=withCluster)
|
||||
to_return.append(g)
|
||||
return to_return
|
||||
|
||||
|
@ -2475,6 +2501,7 @@ class PyMISP:
|
|||
category: Optional[SearchParameterTypes] = None,
|
||||
org: Optional[SearchParameterTypes] = None,
|
||||
tags: Optional[SearchParameterTypes] = None,
|
||||
event_tags: Optional[SearchParameterTypes] = None,
|
||||
quick_filter: Optional[str] = None, quickFilter: Optional[str] = None,
|
||||
date_from: Optional[Union[datetime, date, int, str, float, None]] = None,
|
||||
date_to: Optional[Union[datetime, date, int, str, float, None]] = None,
|
||||
|
@ -2532,6 +2559,7 @@ class PyMISP:
|
|||
:param category: The attribute category, any valid MISP attribute category is accepted.
|
||||
:param org: Search by the creator organisation by supplying the organisation identifier.
|
||||
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
|
||||
:param event_tags: Tags to search or to exclude at the event level. You can pass a list, or the output of `build_complex_query`
|
||||
:param quick_filter: The string passed to this field will ignore all of the other arguments. MISP will return an xml / json (depending on the header sent) of all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value1 / value2 fields, or in the attribute comment.
|
||||
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
|
||||
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
|
||||
|
@ -2619,6 +2647,7 @@ class PyMISP:
|
|||
query['category'] = category
|
||||
query['org'] = org
|
||||
query['tags'] = tags
|
||||
query['event_tags'] = event_tags
|
||||
query['quickFilter'] = quick_filter
|
||||
query['from'] = self._make_timestamp(date_from)
|
||||
query['to'] = self._make_timestamp(date_to)
|
||||
|
|
|
@ -1596,6 +1596,47 @@ class TestComprehensive(unittest.TestCase):
|
|||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first)
|
||||
|
||||
def test_add_event_with_attachment_object_controller__hard(self):
|
||||
first = self.create_simple_event()
|
||||
try:
|
||||
first = self.user_misp_connector.add_event(first)
|
||||
fo, peo, seos = make_binary_objects('tests/viper-test-files/test_files/whoami.exe')
|
||||
for s in seos:
|
||||
r = self.user_misp_connector.add_object(first, s)
|
||||
self.assertEqual(r.name, 'pe-section', r)
|
||||
|
||||
r = self.user_misp_connector.add_object(first, peo, pythonify=True)
|
||||
self.assertEqual(r.name, 'pe', r)
|
||||
for ref in peo.ObjectReference:
|
||||
r = self.user_misp_connector.add_object_reference(ref)
|
||||
self.assertEqual(r.object_uuid, peo.uuid, r.to_json())
|
||||
|
||||
r = self.user_misp_connector.add_object(first, fo)
|
||||
obj_attrs = r.get_attributes_by_relation('ssdeep')
|
||||
self.assertEqual(len(obj_attrs), 1, obj_attrs)
|
||||
self.assertEqual(r.name, 'file', r)
|
||||
|
||||
# Test break_on_duplicate at object level
|
||||
fo_dup, peo_dup, _ = make_binary_objects('tests/viper-test-files/test_files/whoami.exe')
|
||||
r = self.user_misp_connector.add_object(first, peo_dup, break_on_duplicate=True)
|
||||
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)
|
||||
|
||||
# Test break on duplicate with breakOnDuplicate key in object
|
||||
fo_dup.breakOnDuplicate = True
|
||||
r = self.user_misp_connector.add_object(first, fo_dup)
|
||||
self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r)
|
||||
|
||||
# Test refs
|
||||
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0])
|
||||
self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
|
||||
self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())
|
||||
r = self.user_misp_connector.delete_object_reference(r, hard=True)
|
||||
self.assertEqual(r['message'], 'ObjectReference deleted')
|
||||
# TODO: verify that the reference is not soft-deleted instead
|
||||
finally:
|
||||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first)
|
||||
|
||||
def test_lief_and_sign(self):
|
||||
first = self.create_simple_event()
|
||||
try:
|
||||
|
@ -3008,6 +3049,13 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.user_misp_connector.delete_event(event)
|
||||
self.user_misp_connector.delete_event_report(new_event_report)
|
||||
|
||||
def test_search_galaxy(self):
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
galaxy = self.admin_misp_connector.galaxies()[0]
|
||||
ret = self.admin_misp_connector.search_galaxy(value=galaxy.name)
|
||||
self.assertEqual(len(ret), 1)
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
|
||||
def test_galaxy_cluster(self):
|
||||
self.admin_misp_connector.toggle_global_pythonify()
|
||||
galaxy = self.admin_misp_connector.galaxies()[0]
|
||||
|
|
Loading…
Reference in New Issue