diff --git a/README.md b/README.md index 899becf..39f2855 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ pip3 install pymisp[virustotal,email] ``` git clone https://github.com/MISP/PyMISP.git && cd PyMISP git submodule update --init -poetry install -E fileobjects -E openioc -E virustotal -E docs -E pdfexport +poetry install -E fileobjects -E openioc -E virustotal -E docs -E pdfexport -E email ``` ### Running the tests diff --git a/pymisp/api.py b/pymisp/api.py index f7b053c..8cebd6e 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -643,13 +643,17 @@ class PyMISP: ref.from_dict(**object_reference) return ref - def delete_object_reference(self, object_reference: Union[MISPObjectReference, int, str, UUID]) -> Dict: - """Delete a reference to an object - - :param object_reference: object reference - """ + def delete_object_reference( + self, + object_reference: Union[MISPObjectReference, int, str, UUID], + hard: bool = False, + ) -> Dict: + """Delete a reference to an object.""" object_reference_id = get_uuid_or_id_from_abstract_misp(object_reference) - response = self._prepare_request('POST', f'objectReferences/delete/{object_reference_id}') + query_url = f"objectReferences/delete/{object_reference_id}" + if hard: + query_url += "/true" + response = self._prepare_request("POST", query_url) return self._check_json_response(response) # Object templates @@ -1446,7 +1450,11 @@ class PyMISP: # ## BEGIN Galaxy ### - def galaxies(self, pythonify: bool = False) -> Union[Dict, List[MISPGalaxy]]: + def galaxies( + self, + withCluster: bool = False, + pythonify: bool = False, + ) -> Union[Dict, List[MISPGalaxy]]: """Get all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/getGalaxies :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM @@ -1458,7 +1466,25 @@ class PyMISP: to_return = [] for galaxy in galaxies: g = MISPGalaxy() - g.from_dict(**galaxy) + g.from_dict(**galaxy, withCluster=withCluster) + to_return.append(g) + return to_return + + def search_galaxy( + self, + value: str, + withCluster: bool = False, + pythonify: bool = False, + ) -> Union[Dict, List[MISPGalaxy]]: + """Text search to find a matching galaxy name, namespace, description, or uuid.""" + r = self._prepare_request("POST", "galaxies", data={"value": value}) + galaxies = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or "errors" in galaxies: + return galaxies + to_return = [] + for galaxy in galaxies: + g = MISPGalaxy() + g.from_dict(**galaxy, withCluster=withCluster) to_return.append(g) return to_return @@ -2475,6 +2501,7 @@ class PyMISP: category: Optional[SearchParameterTypes] = None, org: Optional[SearchParameterTypes] = None, tags: Optional[SearchParameterTypes] = None, + event_tags: Optional[SearchParameterTypes] = None, quick_filter: Optional[str] = None, quickFilter: Optional[str] = None, date_from: Optional[Union[datetime, date, int, str, float, None]] = None, date_to: Optional[Union[datetime, date, int, str, float, None]] = None, @@ -2532,6 +2559,7 @@ class PyMISP: :param category: The attribute category, any valid MISP attribute category is accepted. :param org: Search by the creator organisation by supplying the organisation identifier. :param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query` + :param event_tags: Tags to search or to exclude at the event level. You can pass a list, or the output of `build_complex_query` :param quick_filter: The string passed to this field will ignore all of the other arguments. MISP will return an xml / json (depending on the header sent) of all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value1 / value2 fields, or in the attribute comment. :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. @@ -2619,6 +2647,7 @@ class PyMISP: query['category'] = category query['org'] = org query['tags'] = tags + query['event_tags'] = event_tags query['quickFilter'] = quick_filter query['from'] = self._make_timestamp(date_from) query['to'] = self._make_timestamp(date_to) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index bf511fe..9995955 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -1596,6 +1596,47 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first) + def test_add_event_with_attachment_object_controller__hard(self): + first = self.create_simple_event() + try: + first = self.user_misp_connector.add_event(first) + fo, peo, seos = make_binary_objects('tests/viper-test-files/test_files/whoami.exe') + for s in seos: + r = self.user_misp_connector.add_object(first, s) + self.assertEqual(r.name, 'pe-section', r) + + r = self.user_misp_connector.add_object(first, peo, pythonify=True) + self.assertEqual(r.name, 'pe', r) + for ref in peo.ObjectReference: + r = self.user_misp_connector.add_object_reference(ref) + self.assertEqual(r.object_uuid, peo.uuid, r.to_json()) + + r = self.user_misp_connector.add_object(first, fo) + obj_attrs = r.get_attributes_by_relation('ssdeep') + self.assertEqual(len(obj_attrs), 1, obj_attrs) + self.assertEqual(r.name, 'file', r) + + # Test break_on_duplicate at object level + fo_dup, peo_dup, _ = make_binary_objects('tests/viper-test-files/test_files/whoami.exe') + r = self.user_misp_connector.add_object(first, peo_dup, break_on_duplicate=True) + self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r) + + # Test break on duplicate with breakOnDuplicate key in object + fo_dup.breakOnDuplicate = True + r = self.user_misp_connector.add_object(first, fo_dup) + self.assertTrue("Duplicate object found" in r['errors'][1]['errors'], r) + + # Test refs + r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0]) + self.assertEqual(r.object_uuid, fo.uuid, r.to_json()) + self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json()) + r = self.user_misp_connector.delete_object_reference(r, hard=True) + self.assertEqual(r['message'], 'ObjectReference deleted') + # TODO: verify that the reference is not soft-deleted instead + finally: + # Delete event + self.admin_misp_connector.delete_event(first) + def test_lief_and_sign(self): first = self.create_simple_event() try: @@ -3008,6 +3049,13 @@ class TestComprehensive(unittest.TestCase): self.user_misp_connector.delete_event(event) self.user_misp_connector.delete_event_report(new_event_report) + def test_search_galaxy(self): + self.admin_misp_connector.toggle_global_pythonify() + galaxy = self.admin_misp_connector.galaxies()[0] + ret = self.admin_misp_connector.search_galaxy(value=galaxy.name) + self.assertEqual(len(ret), 1) + self.admin_misp_connector.toggle_global_pythonify() + def test_galaxy_cluster(self): self.admin_misp_connector.toggle_global_pythonify() galaxy = self.admin_misp_connector.galaxies()[0]