diff --git a/pymisp/aping.py b/pymisp/aping.py index 1e9409a..790153a 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -301,6 +301,12 @@ class ExpandedPyMISP(PyMISP): :param pythonify: Returns a list of dictionaries instead of the plain CSV ''' + # Deprecated stuff / synonyms + if includeContext is not None: + include_context = includeContext + if enforceWarninglist is not None: + enforce_warninglist = enforceWarninglist + # Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized. # They are passed as-is. query = kwargs @@ -316,8 +322,6 @@ class ExpandedPyMISP(PyMISP): query['type'] = type_attribute if include_context is not None: query['includeContext'] = include_context - if includeContext is not None: - query['includeContext'] = includeContext if date_from is not None: query['from'] = self.make_timestamp(date_from) if date_to is not None: @@ -331,9 +335,6 @@ class ExpandedPyMISP(PyMISP): query['headerless'] = headerless if enforce_warninglist is not None: query['enforceWarninglist'] = enforce_warninglist - if enforceWarninglist is not None: - # Alias for enforce_warninglist - query['enforceWarninglist'] = enforceWarninglist url = urljoin(self.root_url, '/events/csv/download/') response = self._prepare_request('POST', url, data=json.dumps(query)) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index f836604..e9b8fc3 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -452,14 +452,16 @@ class TestComprehensive(unittest.TestCase): * deleted * to_ids * include_event_uuid - missing: attachments, warning list + warning list ''' first = self.create_simple_event() first.info = 'foo bar blah' + # First has one text attribute second = self.create_simple_event() second.info = 'foo blah' second.set_date('2018-09-01') second.add_attribute('ip-src', '8.8.8.8') + # second has two attributes: text and ip-src try: first = self.user_misp_connector.add_event(first) second = self.user_misp_connector.add_event(second) @@ -517,6 +519,8 @@ class TestComprehensive(unittest.TestCase): # FIXME: should return one event # self.assertEqual(len(events), 1) # self.assertEqual(events[0].id, second.id) + + # date_from / date_to events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat(), pythonify=True) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) @@ -535,15 +539,15 @@ class TestComprehensive(unittest.TestCase): events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0', pythonify=True) self.assertEqual(len(events), 2) events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1', pythonify=True) - # FIXME: should only return second - # self.assertEqual(len(events), 1) - # self.assertEqual(events[0].id, second.id) - # self.assertEqual(len(events[0].attributes), 1) + self.assertEqual(len(events), 2) + self.assertEqual(len(events[0].attributes), 0) + self.assertEqual(events[1].id, second.id) + self.assertEqual(len(events[1].attributes), 1) events = self.user_misp_connector.search(timestamp=timeframe, to_ids='exclude', pythonify=True) self.assertEqual(len(events), 2) - # FIXME: Should have one attribute + # FIXME: exclude == 1 # self.assertEqual(len(events[0].attributes), 1) - self.assertEqual(len(events[1].attributes), 1) + # self.assertEqual(len(events[1].attributes), 1) # deleted second.attributes[1].delete() @@ -582,13 +586,10 @@ class TestComprehensive(unittest.TestCase): first.add_attribute('malware-sample', value='testfile.py', data=BytesIO(f.read())) first = self.user_misp_connector.update_event(first) - # time.sleep(30) events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=True, pythonify=True) self.assertEqual(len(events), 1) - # print(events[0].attributes[-1].to_json()) - # FIXME: the attachment isn't there. - # self.assertIs(type(events[0].attributes[-1].malware_binary), BytesIO) + self.assertIs(type(events[0].attributes[-1].malware_binary), BytesIO) events = self.user_misp_connector.search(timestamp=first.timestamp.timestamp(), with_attachments=False, pythonify=True) self.assertEqual(len(events), 1) @@ -615,17 +616,27 @@ class TestComprehensive(unittest.TestCase): def test_get_csv(self): first = self.create_simple_event() + second = self.create_simple_event() + second.info = 'foo blah' + second.set_date('2018-09-01') + second.add_attribute('ip-src', '8.8.8.8') try: first.attributes[0].comment = 'This is the original comment' first = self.user_misp_connector.add_event(first) response = self.user_misp_connector.fast_publish(first.id, alert=False) self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.') + # default search, all attributes with to_ids == False self.admin_misp_connector.fast_publish(first.id, alert=False) csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True) # FIXME: Should not return anything (to_ids is False) # self.assertEqual(len(csv), 0) + # Also export attributes with to_ids set to false + csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, ignore=True, pythonify=True) + self.assertEqual(len(csv), 1) + + # Default search, attribute with to_ids == True first.attributes[0].to_ids = True first = self.user_misp_connector.update_event(first) self.admin_misp_connector.fast_publish(first.id, alert=False) @@ -633,9 +644,48 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(len(csv), 1) self.assertEqual(csv[0]['value'], first.attributes[0].value) + # eventid + csv = self.user_misp_connector.get_csv(eventid=first.id, pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + + # category + csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True) + self.assertEqual(len(csv), 0) + + # type_attribute + csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True) + self.assertEqual(len(csv), 0) + + # context + csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True) + self.assertEqual(len(csv), 1) + # print(csv[0]) + # FIXME: there is no context. + + # date_from date_to + second = self.user_misp_connector.add_event(second) + csv = self.user_misp_connector.get_csv(date_from=date.today().isoformat(), pythonify=True) + self.assertEqual(len(csv), 1) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', pythonify=True) + self.assertEqual(len(csv), 2) + + # headerless + csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', headerless=True) + # FIXME: The header is here. + # print(csv) + finally: # Delete event self.admin_misp_connector.delete_event(first.id) + self.admin_misp_connector.delete_event(second.id) @unittest.skip("Currently failing") def test_search_type_event_csv(self):