diff --git a/pymisp/api.py b/pymisp/api.py index 33f2793..c41edd5 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -3027,9 +3027,6 @@ class PyMISP: else: raise MISPServerError("please fill path or data parameter") - if isinstance(to_post, bytes): - to_post = to_post.decode() - if str(version) == '1': url = urljoin(self.root_url, 'events/upload_stix') response = self._prepare_request('POST', url, data=to_post, output_type='xml', content_type='xml') # type: ignore @@ -3505,21 +3502,20 @@ class PyMISP: def __repr__(self): return f'<{self.__class__.__name__}(url={self.root_url})' - def _prepare_request(self, request_type: str, url: str, data: Union[str, Iterable, Mapping, AbstractMISP] = {}, params: Mapping = {}, + def _prepare_request(self, request_type: str, url: str, data: Union[Iterable, Mapping, AbstractMISP, bytes] = {}, params: Mapping = {}, kw_params: Mapping = {}, output_type: str = 'json', content_type: str = 'json') -> requests.Response: '''Prepare a request for python-requests''' if url[0] == '/': # strip it: it will fail if MISP is in a sub directory url = url[1:] url = urljoin(self.root_url, url) - if data == {} or isinstance(data, str): + if data == {} or isinstance(data, bytes): d = data elif data: - if not isinstance(data, str): # Else, we already have a text blob to send - if isinstance(data, dict): # Else, we can directly json encode. - # Remove None values. - data = {k: v for k, v in data.items() if v is not None} - d = json.dumps(data, default=pymisp_json_default) + if isinstance(data, dict): # Else, we can directly json encode. + # Remove None values. + data = {k: v for k, v in data.items() if v is not None} + d = json.dumps(data, default=pymisp_json_default) logger.debug(f'{request_type} - {url}') if d is not None: diff --git a/tests/stix1.xml-utf8 b/tests/stix1.xml-utf8 new file mode 100644 index 0000000..4c4e149 --- /dev/null +++ b/tests/stix1.xml-utf8 @@ -0,0 +1,110 @@ + + + Export from ORGNAME MISP + Threat Report + + + + + + Export from ORGNAME MISP © YADA YADA + Threat Report + + + + Test Stix + 612 + + 2021-08-24T00:00:00 + 2021-08-24T10:53:28 + + + + ORGNAME + + + New + + + Network activity + + + + 8.8.8.8 + + + + + + + + Event Threat Level: High + + + MISP Tag: misp:tool="misp2stix" + + + + + ORGNAME + + + + + + + + diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 6949dfd..f372ec7 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -955,8 +955,9 @@ class TestComprehensive(unittest.TestCase): response = self.user_misp_connector.add_event(event, metadata=True) self.assertEqual(len(response.attributes), 0) # response should contains zero attributes - event.info = "New name" + event.info = "New name ©" response = self.user_misp_connector.update_event(event, metadata=True) + self.assertEqual(response.info, event.info) self.assertEqual(len(response.attributes), 0) # response should contains zero attributes finally: # cleanup self.admin_misp_connector.delete_event(event) @@ -2512,11 +2513,15 @@ class TestComprehensive(unittest.TestCase): def test_upload_stix(self): # FIXME https://github.com/MISP/MISP/issues/4892 try: - # r1 = self.user_misp_connector.upload_stix('tests/stix1.xml', version='1') - # print('stix 1', r1.json()) - # event_stix_one = MISPEvent() - # event_stix_one.load(r.) + r1 = self.user_misp_connector.upload_stix('tests/stix1.xml-utf8', version='1') + print(r1.text) + event_stix_one = MISPEvent() + event_stix_one.load(r1.json()) # self.assertEqual(event_stix_one.attributes[0], '8.8.8.8') + self.admin_misp_connector.delete_event(event_stix_one) + bl = self.admin_misp_connector.delete_event_blocklist(event_stix_one.uuid) + self.assertTrue(bl['success']) + r2 = self.user_misp_connector.upload_stix('tests/stix2.json', version='2') print(json.dumps(r2.json(), indent=2)) event_stix_two = MISPEvent() @@ -2528,10 +2533,11 @@ class TestComprehensive(unittest.TestCase): bl = self.admin_misp_connector.delete_event_blocklist(event_stix_two.uuid) self.assertTrue(bl['success']) finally: - # try: - # self.admin_misp_connector.delete_event(event_stix_one) - # except Exception: - # pass + try: + self.admin_misp_connector.delete_event(event_stix_one) + self.admin_misp_connector.delete_event_blocklist(event_stix_one.uuid) + except Exception: + pass try: self.admin_misp_connector.delete_event(event_stix_two) self.admin_misp_connector.delete_event_blocklist(event_stix_two.uuid)