diff --git a/pymisp/api.py b/pymisp/api.py index ef25be3..ae96745 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -2413,6 +2413,7 @@ class PyMISP: include_decay_score: Optional[bool] = None, includeDecayScore: Optional[bool] = None, object_name: Optional[str] = None, exclude_decayed: Optional[bool] = None, + sharinggroup: Optional[Union[int, List[int]]] = None, pythonify: Optional[bool] = False, **kwargs) -> Union[Dict, str, List[Union[MISPEvent, MISPAttribute, MISPObject]]]: '''Search in the MISP instance @@ -2453,6 +2454,7 @@ class PyMISP: :param include_correlations: [JSON Only - attribute] Include the correlations of the matching attributes. :param object_name: [objects controller only] Search for objects with that name :param exclude_decayed: [attributes controller only] Exclude the decayed attributes from the response + :param sharinggroup: Filter by sharing group ID(s) :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM Deprecated: @@ -2553,6 +2555,8 @@ class PyMISP: query['includeCorrelations'] = self._make_misp_bool(include_correlations) query['object_name'] = object_name query['excludeDecayed'] = self._make_misp_bool(exclude_decayed) + query['sharinggroup'] = sharinggroup + url = urljoin(self.root_url, f'{controller}/restSearch') if return_format == 'stix-xml': response = self._prepare_request('POST', url, data=query, output_type='xml') diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index ca19d56..2e149e9 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -2235,6 +2235,51 @@ class TestComprehensive(unittest.TestCase): finally: self.admin_misp_connector.delete_sharing_group(sharing_group.id) + def test_sharing_group_search(self): + # Add sharing group + sg = MISPSharingGroup() + sg.name = 'Testcases SG' + sg.releasability = 'Testing' + sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True) + # Add the org to the sharing group + self.admin_misp_connector.add_org_to_sharing_group( + sharing_group, + self.test_org, extend=True + ) + # Add event + event = self.create_simple_event() + event.distribution = Distribution.sharing_group + event.sharing_group_id = sharing_group.id + # Create two attributes, one specifically for the sharing group, + # another which inherits the event's SG + event.add_attribute('ip-dst', '8.8.8.8', distribution=4, sharing_group_id=sharing_group.id) + event.add_attribute('ip-dst', '9.9.9.9') + event = self.user_misp_connector.add_event(event) + attribute_ids = {a.id for a in event.attributes} + try: + # Try to query for the event + events = self.user_misp_connector.search(sharinggroup=sharing_group.id, controller="events") + # There should be one event + self.assertTrue(len(events) == 1) + # This event should be the one we added + self.assertEqual(events[0].id, event.id) + # Make sure the search isn't just returning everything + events = self.user_misp_connector.search(sharinggroup=99999, controller="events") + + self.assertTrue(len(events) == 0) + + # Try to query for the attributes + attributes = self.user_misp_connector.search(sharinggroup=sharing_group.id, controller="attributes") + searched_attribute_ids = {a.id for a in attributes} + # There should be two attributes + # The extra 1 is the random UUID now created in the event + self.assertTrue(len(attributes) == 2 + 1) + # We should not be missing any of the attributes + self.assertFalse(attribute_ids.difference(searched_attribute_ids)) + finally: + self.admin_misp_connector.delete_sharing_group(sharing_group.id) + self.user_misp_connector.delete_event(event.id) + def test_feeds(self): # Add feed = MISPFeed()