mirror of https://github.com/MISP/PyMISP
chg: Add in sort/desc for sorting results and limit/page for pagination
parent
289ce47d64
commit
eb33a9deb9
|
@ -2659,6 +2659,10 @@ class PyMISP:
|
|||
]] = None,
|
||||
sharinggroup: Optional[List[SearchType]] = None,
|
||||
minimal: Optional[bool] = None,
|
||||
sort: Optional[str] = None,
|
||||
desc: Optional[bool] = None,
|
||||
limit: Optional[int] = None,
|
||||
page: Optional[int] = None,
|
||||
pythonify: Optional[bool] = None) -> Union[Dict, List[MISPEvent]]:
|
||||
"""Search event metadata shown on the event index page. Using ! in front of a value
|
||||
means NOT, except for parameters date_from, date_to and timestamp which cannot be negated.
|
||||
|
@ -2690,6 +2694,10 @@ class PyMISP:
|
|||
:param publish_timestamp: Filter on event's publish timestamp.
|
||||
:param sharinggroup: Restrict by a sharing group | list
|
||||
:param minimal: Return only event ID, UUID, timestamp, sighting_timestamp and published.
|
||||
:param sort: The field to sort the events by, such as 'id', 'date', 'attribute_count'.
|
||||
:param desc: Whether to sort events ascending (default) or descending.
|
||||
:param limit: Limit the number of events returned
|
||||
:param page: If a limit is set, sets the page to be returned. page 3, limit 100 will return records 201->300).
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output.
|
||||
Warning: it might use a lot of RAM
|
||||
"""
|
||||
|
@ -2708,7 +2716,8 @@ class PyMISP:
|
|||
query['timestamp'] = (self._make_timestamp(timestamp[0]), self._make_timestamp(timestamp[1]))
|
||||
else:
|
||||
query['timestamp'] = self._make_timestamp(timestamp)
|
||||
|
||||
if query.get("sort"):
|
||||
query["direction"] = "desc" if desc else "asc"
|
||||
url = urljoin(self.root_url, 'events/index')
|
||||
response = self._prepare_request('POST', url, data=query)
|
||||
normalized_response = self._check_json_response(response)
|
||||
|
@ -3588,7 +3597,6 @@ class PyMISP:
|
|||
# CakePHP params in URL
|
||||
to_append_url = '/'.join([f'{k}:{v}' for k, v in kw_params.items()])
|
||||
url = f'{url}/{to_append_url}'
|
||||
|
||||
req = requests.Request(request_type, url, data=d, params=params)
|
||||
user_agent = f'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}'
|
||||
if self.tool:
|
||||
|
|
|
@ -300,6 +300,35 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.admin_misp_connector.delete_event(second)
|
||||
self.admin_misp_connector.delete_event(third)
|
||||
|
||||
def test_search_index(self):
|
||||
try:
|
||||
first, second, third = self.environment()
|
||||
# Search as admin
|
||||
events = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), pythonify=True)
|
||||
self.assertEqual(len(events), 3)
|
||||
for e in events:
|
||||
self.assertIn(e.id, [first.id, second.id, third.id])
|
||||
|
||||
# Test limit and pagination
|
||||
event_one = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=1, pythonify=True)[0]
|
||||
event_two = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=2, pythonify=True)[0]
|
||||
self.assertTrue(event_one.id != event_two.id)
|
||||
two_events = self.admin_misp_connector.search_index(limit=2)
|
||||
self.assertTrue(len(two_events), 2)
|
||||
|
||||
# Test ordering by the Info field. Can't use timestamp as each will likely have the same
|
||||
event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=True, limit=1, pythonify=True)[0]
|
||||
# First|Second|*Third* event
|
||||
self.assertEqual(event.id, third.id)
|
||||
# *First*|Second|Third event
|
||||
event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=False, limit=1, pythonify=True)[0]
|
||||
self.assertEqual(event.id, first.id)
|
||||
finally:
|
||||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first)
|
||||
self.admin_misp_connector.delete_event(second)
|
||||
self.admin_misp_connector.delete_event(third)
|
||||
|
||||
def test_search_objects(self):
|
||||
'''Search for objects'''
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue