mirror of https://github.com/MISP/PyMISP
new: page/limit in search
parent
cb2dbbd481
commit
a8a7193059
|
@ -116,8 +116,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
a.from_dict(**updated_attribute)
|
||||
return a
|
||||
|
||||
# TODO: Make that thing async & test it.
|
||||
def search(self, controller: str='events', return_format: str='json',
|
||||
page: Optional[int]=None, limit: Optional[int]=None,
|
||||
value: Optional[SearchParameterTypes]=None,
|
||||
type_attribute: Optional[SearchParameterTypes]=None,
|
||||
category: Optional[SearchParameterTypes]=None,
|
||||
|
@ -147,6 +147,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
Search in the MISP instance
|
||||
|
||||
:param returnFormat: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload.
|
||||
:param page: Page number (depending on the limit).
|
||||
:param limit: Upper limit of elements to return per page.
|
||||
:param value: Search for the given value in the attributes' value field.
|
||||
:param type_attribute: The attribute type, any valid MISP attribute type is accepted.
|
||||
:param category: The attribute category, any valid MISP attribute category is accepted.
|
||||
|
@ -181,6 +183,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
|
||||
'''
|
||||
|
||||
return_formats = ['openioc', 'json', 'xml', 'suricata', 'snort', 'text', 'rpz', 'csv', 'cache']
|
||||
|
||||
if controller not in ['events', 'attributes', 'objects']:
|
||||
raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects'])))
|
||||
|
||||
|
@ -198,9 +202,13 @@ class ExpandedPyMISP(PyMISP):
|
|||
# They are passed as-is.
|
||||
query = kwargs
|
||||
if return_format is not None:
|
||||
if return_format not in ['json', 'xml', 'openioc', 'suricata', 'snort']:
|
||||
raise ValueError('return_format has to be in {}'.format(', '.join(['json', 'xml', 'openioc', 'suricata', 'snort'])))
|
||||
if return_format not in return_formats:
|
||||
raise ValueError('return_format has to be in {}'.format(', '.join(return_formats)))
|
||||
query['returnFormat'] = return_format
|
||||
if page is not None:
|
||||
query['page'] = page
|
||||
if limit is not None:
|
||||
query['limit'] = limit
|
||||
if value is not None:
|
||||
query['value'] = value
|
||||
if type_attribute is not None:
|
||||
|
|
|
@ -602,6 +602,15 @@ class TestComprehensive(unittest.TestCase):
|
|||
response = self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%') # disable ipv4 DNS.
|
||||
self.assertDictEqual(response, {'saved': True, 'success': '3 warninglist(s) toggled'})
|
||||
|
||||
# Page / limit
|
||||
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3, pythonify=True)
|
||||
print(attributes)
|
||||
self.assertEqual(len(attributes), 3)
|
||||
|
||||
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3, pythonify=True)
|
||||
print(attributes)
|
||||
self.assertEqual(len(attributes), 1)
|
||||
|
||||
time.sleep(1) # make sure the next attribute is added one at least one second later
|
||||
|
||||
# attachments
|
||||
|
|
Loading…
Reference in New Issue