mirror of https://github.com/MISP/PyMISP
New: use new CSV interface, add test cases
parent
ceedb6e95c
commit
cd1de8c6bf
|
@ -32,7 +32,7 @@ def deprecated(func):
|
|||
|
||||
|
||||
try:
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError # noqa
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse # noqa
|
||||
from .api import PyMISP # noqa
|
||||
from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError
|
||||
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse
|
||||
from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute
|
||||
from typing import TypeVar, Optional, Tuple, List, Dict
|
||||
from datetime import date, datetime
|
||||
import json
|
||||
import csv
|
||||
|
||||
import logging
|
||||
from urllib.parse import urljoin
|
||||
|
@ -194,5 +195,76 @@ class ExpandedPyMISP(PyMISP):
|
|||
ma.from_dict(**a)
|
||||
to_return.append(ma)
|
||||
elif controller == 'objects':
|
||||
raise Exception('Not implemented yet')
|
||||
raise PyMISPNotImplementedYet('Not implemented yet')
|
||||
return to_return
|
||||
|
||||
def get_csv(self,
|
||||
eventid: Optional[SearchType]=None,
|
||||
ignore: Optional[bool]=None,
|
||||
tags: Optional[SearchParameterTypes]=None,
|
||||
category: Optional[SearchParameterTypes]=None,
|
||||
type_attribute: Optional[SearchParameterTypes]=None,
|
||||
include_context: Optional[bool]=None, includeContext: Optional[bool]=None,
|
||||
date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None,
|
||||
publish_timestamp: Optional[DateInterval]=None, # converted internally to last (consistent with search)
|
||||
headerless: Optional[bool]=None,
|
||||
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
|
||||
pythonify: Optional[bool]=False,
|
||||
**kwargs):
|
||||
|
||||
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
|
||||
# They are passed as-is.
|
||||
query = kwargs
|
||||
if eventid is not None:
|
||||
query['eventid'] = eventid
|
||||
if ignore is not None:
|
||||
query['ignore'] = ignore
|
||||
if tags is not None:
|
||||
query['tags'] = tags
|
||||
if category is not None:
|
||||
query['category'] = category
|
||||
if type_attribute is not None:
|
||||
query['type'] = type_attribute
|
||||
if include_context is not None:
|
||||
query['includeContext'] = include_context
|
||||
if includeContext is not None:
|
||||
query['includeContext'] = includeContext
|
||||
if date_from is not None:
|
||||
query['from'] = self.make_timestamp(date_from)
|
||||
if date_to is not None:
|
||||
query['to'] = self.make_timestamp(date_to)
|
||||
if publish_timestamp is not None:
|
||||
if isinstance(publish_timestamp, (list, tuple)):
|
||||
query['last'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1]))
|
||||
else:
|
||||
query['last'] = self.make_timestamp(publish_timestamp)
|
||||
if headerless is not None:
|
||||
query['headerless'] = headerless
|
||||
if enforce_warninglist is not None:
|
||||
query['enforceWarninglist'] = enforce_warninglist
|
||||
if enforceWarninglist is not None:
|
||||
# Alias for enforce_warninglist
|
||||
query['enforceWarninglist'] = enforceWarninglist
|
||||
|
||||
url = urljoin(self.root_url, '/events/csv/download/')
|
||||
response = self._prepare_request('POST', url, data=json.dumps(query))
|
||||
normalized_response = self._check_response(response)
|
||||
if isinstance(normalized_response, str):
|
||||
if pythonify:
|
||||
# Make it a list of dict
|
||||
fieldnames, lines = normalized_response.split('\n', 1)
|
||||
fieldnames = fieldnames.split(',')
|
||||
to_return = []
|
||||
for line in csv.reader(lines.split('\n')):
|
||||
if line:
|
||||
to_return.append({fname: value for fname, value in zip(fieldnames, line)})
|
||||
return to_return
|
||||
|
||||
return normalized_response
|
||||
elif isinstance(normalized_response, dict):
|
||||
# The server returned a dictionary, it contains the error message.
|
||||
logger.critical(f'The server should have returned a CSV file as text. instead it returned an error message:\n{normalized_response}')
|
||||
return normalized_response
|
||||
else:
|
||||
# Should not happen...
|
||||
raise PyMISPUnexpectedResponse(f'The server should have returned a CSV file as text. instead it returned:\n{normalized_response}')
|
||||
|
|
|
@ -59,3 +59,11 @@ class PyMISPInvalidFormat(PyMISPError):
|
|||
|
||||
class MISPServerError(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class PyMISPNotImplementedYet(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class PyMISPUnexpectedResponse(PyMISPError):
|
||||
pass
|
||||
|
|
|
@ -487,6 +487,21 @@ class TestComprehensive(unittest.TestCase):
|
|||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first.id)
|
||||
|
||||
def test_get_csv(self):
|
||||
first = self.create_simple_event()
|
||||
try:
|
||||
first.attributes[0].comment = 'This is the original comment'
|
||||
first = self.user_misp_connector.add_event(first)
|
||||
|
||||
response = self.user_misp_connector.fast_publish(first.id, alert=False)
|
||||
self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.')
|
||||
self.admin_misp_connector.fast_publish(first.id, alert=False)
|
||||
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
|
||||
self.assertEqual(csv[0]['value'], first.attributes[0].value)
|
||||
finally:
|
||||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first.id)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue