diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 69ca780..ff609da 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -32,7 +32,7 @@ def deprecated(func): try: - from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError # noqa + from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse # noqa from .api import PyMISP # noqa from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa diff --git a/pymisp/aping.py b/pymisp/aping.py index 8b91fe7..b2d0647 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError +from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute from typing import TypeVar, Optional, Tuple, List, Dict from datetime import date, datetime import json +import csv import logging from urllib.parse import urljoin @@ -194,5 +195,76 @@ class ExpandedPyMISP(PyMISP): ma.from_dict(**a) to_return.append(ma) elif controller == 'objects': - raise Exception('Not implemented yet') + raise PyMISPNotImplementedYet('Not implemented yet') return to_return + + def get_csv(self, + eventid: Optional[SearchType]=None, + ignore: Optional[bool]=None, + tags: Optional[SearchParameterTypes]=None, + category: Optional[SearchParameterTypes]=None, + type_attribute: Optional[SearchParameterTypes]=None, + include_context: Optional[bool]=None, includeContext: Optional[bool]=None, + date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None, + publish_timestamp: Optional[DateInterval]=None, # converted internally to last (consistent with search) + headerless: Optional[bool]=None, + enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None, + pythonify: Optional[bool]=False, + **kwargs): + + # Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized. + # They are passed as-is. + query = kwargs + if eventid is not None: + query['eventid'] = eventid + if ignore is not None: + query['ignore'] = ignore + if tags is not None: + query['tags'] = tags + if category is not None: + query['category'] = category + if type_attribute is not None: + query['type'] = type_attribute + if include_context is not None: + query['includeContext'] = include_context + if includeContext is not None: + query['includeContext'] = includeContext + if date_from is not None: + query['from'] = self.make_timestamp(date_from) + if date_to is not None: + query['to'] = self.make_timestamp(date_to) + if publish_timestamp is not None: + if isinstance(publish_timestamp, (list, tuple)): + query['last'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1])) + else: + query['last'] = self.make_timestamp(publish_timestamp) + if headerless is not None: + query['headerless'] = headerless + if enforce_warninglist is not None: + query['enforceWarninglist'] = enforce_warninglist + if enforceWarninglist is not None: + # Alias for enforce_warninglist + query['enforceWarninglist'] = enforceWarninglist + + url = urljoin(self.root_url, '/events/csv/download/') + response = self._prepare_request('POST', url, data=json.dumps(query)) + normalized_response = self._check_response(response) + if isinstance(normalized_response, str): + if pythonify: + # Make it a list of dict + fieldnames, lines = normalized_response.split('\n', 1) + fieldnames = fieldnames.split(',') + to_return = [] + for line in csv.reader(lines.split('\n')): + if line: + to_return.append({fname: value for fname, value in zip(fieldnames, line)}) + return to_return + + return normalized_response + elif isinstance(normalized_response, dict): + # The server returned a dictionary, it contains the error message. + logger.critical(f'The server should have returned a CSV file as text. instead it returned an error message:\n{normalized_response}') + return normalized_response + else: + # Should not happen... + raise PyMISPUnexpectedResponse(f'The server should have returned a CSV file as text. instead it returned:\n{normalized_response}') diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index 4e94c29..481720b 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -59,3 +59,11 @@ class PyMISPInvalidFormat(PyMISPError): class MISPServerError(PyMISPError): pass + + +class PyMISPNotImplementedYet(PyMISPError): + pass + + +class PyMISPUnexpectedResponse(PyMISPError): + pass diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 3184abb..976b7a1 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -487,6 +487,21 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first.id) + def test_get_csv(self): + first = self.create_simple_event() + try: + first.attributes[0].comment = 'This is the original comment' + first = self.user_misp_connector.add_event(first) + + response = self.user_misp_connector.fast_publish(first.id, alert=False) + self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.') + self.admin_misp_connector.fast_publish(first.id, alert=False) + csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True) + self.assertEqual(csv[0]['value'], first.attributes[0].value) + finally: + # Delete event + self.admin_misp_connector.delete_event(first.id) + if __name__ == '__main__': unittest.main()