From 31958dd160668ded7f98ed422e46b3c152e5a207 Mon Sep 17 00:00:00 2001 From: Tom King Date: Thu, 21 Apr 2022 10:38:52 +0100 Subject: [PATCH 1/2] chg: Add ability to filter by sharing group for RestSearch for MISP >= v2.4.158 --- pymisp/api.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pymisp/api.py b/pymisp/api.py index ef25be3..ae96745 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -2413,6 +2413,7 @@ class PyMISP: include_decay_score: Optional[bool] = None, includeDecayScore: Optional[bool] = None, object_name: Optional[str] = None, exclude_decayed: Optional[bool] = None, + sharinggroup: Optional[Union[int, List[int]]] = None, pythonify: Optional[bool] = False, **kwargs) -> Union[Dict, str, List[Union[MISPEvent, MISPAttribute, MISPObject]]]: '''Search in the MISP instance @@ -2453,6 +2454,7 @@ class PyMISP: :param include_correlations: [JSON Only - attribute] Include the correlations of the matching attributes. :param object_name: [objects controller only] Search for objects with that name :param exclude_decayed: [attributes controller only] Exclude the decayed attributes from the response + :param sharinggroup: Filter by sharing group ID(s) :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM Deprecated: @@ -2553,6 +2555,8 @@ class PyMISP: query['includeCorrelations'] = self._make_misp_bool(include_correlations) query['object_name'] = object_name query['excludeDecayed'] = self._make_misp_bool(exclude_decayed) + query['sharinggroup'] = sharinggroup + url = urljoin(self.root_url, f'{controller}/restSearch') if return_format == 'stix-xml': response = self._prepare_request('POST', url, data=query, output_type='xml') From 1ac66a927a0c06fd0769df0f286e59edf18c85fb Mon Sep 17 00:00:00 2001 From: Tom King <15731689+tomking2@users.noreply.github.com> Date: Mon, 6 Jun 2022 11:51:41 +0100 Subject: [PATCH 2/2] chg: Add in test case --- tests/testlive_comprehensive.py | 45 +++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index ca19d56..2e149e9 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -2235,6 +2235,51 @@ class TestComprehensive(unittest.TestCase): finally: self.admin_misp_connector.delete_sharing_group(sharing_group.id) + def test_sharing_group_search(self): + # Add sharing group + sg = MISPSharingGroup() + sg.name = 'Testcases SG' + sg.releasability = 'Testing' + sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True) + # Add the org to the sharing group + self.admin_misp_connector.add_org_to_sharing_group( + sharing_group, + self.test_org, extend=True + ) + # Add event + event = self.create_simple_event() + event.distribution = Distribution.sharing_group + event.sharing_group_id = sharing_group.id + # Create two attributes, one specifically for the sharing group, + # another which inherits the event's SG + event.add_attribute('ip-dst', '8.8.8.8', distribution=4, sharing_group_id=sharing_group.id) + event.add_attribute('ip-dst', '9.9.9.9') + event = self.user_misp_connector.add_event(event) + attribute_ids = {a.id for a in event.attributes} + try: + # Try to query for the event + events = self.user_misp_connector.search(sharinggroup=sharing_group.id, controller="events") + # There should be one event + self.assertTrue(len(events) == 1) + # This event should be the one we added + self.assertEqual(events[0].id, event.id) + # Make sure the search isn't just returning everything + events = self.user_misp_connector.search(sharinggroup=99999, controller="events") + + self.assertTrue(len(events) == 0) + + # Try to query for the attributes + attributes = self.user_misp_connector.search(sharinggroup=sharing_group.id, controller="attributes") + searched_attribute_ids = {a.id for a in attributes} + # There should be two attributes + # The extra 1 is the random UUID now created in the event + self.assertTrue(len(attributes) == 2 + 1) + # We should not be missing any of the attributes + self.assertFalse(attribute_ids.difference(searched_attribute_ids)) + finally: + self.admin_misp_connector.delete_sharing_group(sharing_group.id) + self.user_misp_connector.delete_event(event.id) + def test_feeds(self): # Add feed = MISPFeed()