Merge pull request #858 from tomking2/feature/index_sort_limit

Sort/desc for sorting results and limit/page for pagination
pull/860/head
Raphaël Vinot 2022-09-09 11:16:26 +02:00 committed by GitHub
commit c47e235a4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 2 deletions

View File

@ -2666,6 +2666,10 @@ class PyMISP:
]] = None,
sharinggroup: Optional[List[SearchType]] = None,
minimal: Optional[bool] = None,
sort: Optional[str] = None,
desc: Optional[bool] = None,
limit: Optional[int] = None,
page: Optional[int] = None,
pythonify: Optional[bool] = None) -> Union[Dict, List[MISPEvent]]:
"""Search event metadata shown on the event index page. Using ! in front of a value
means NOT, except for parameters date_from, date_to and timestamp which cannot be negated.
@ -2697,6 +2701,10 @@ class PyMISP:
:param publish_timestamp: Filter on event's publish timestamp.
:param sharinggroup: Restrict by a sharing group | list
:param minimal: Return only event ID, UUID, timestamp, sighting_timestamp and published.
:param sort: The field to sort the events by, such as 'id', 'date', 'attribute_count'.
:param desc: Whether to sort events ascending (default) or descending.
:param limit: Limit the number of events returned
:param page: If a limit is set, sets the page to be returned. page 3, limit 100 will return records 201->300).
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output.
Warning: it might use a lot of RAM
"""
@ -2715,7 +2723,8 @@ class PyMISP:
query['timestamp'] = (self._make_timestamp(timestamp[0]), self._make_timestamp(timestamp[1]))
else:
query['timestamp'] = self._make_timestamp(timestamp)
if query.get("sort"):
query["direction"] = "desc" if desc else "asc"
url = urljoin(self.root_url, 'events/index')
response = self._prepare_request('POST', url, data=query)
normalized_response = self._check_json_response(response)
@ -3595,7 +3604,6 @@ class PyMISP:
# CakePHP params in URL
to_append_url = '/'.join([f'{k}:{v}' for k, v in kw_params.items()])
url = f'{url}/{to_append_url}'
req = requests.Request(request_type, url, data=d, params=params)
user_agent = f'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}'
if self.tool:

View File

@ -300,6 +300,35 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_index(self):
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
# Test limit and pagination
event_one = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=1, pythonify=True)[0]
event_two = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=2, pythonify=True)[0]
self.assertTrue(event_one.id != event_two.id)
two_events = self.admin_misp_connector.search_index(limit=2)
self.assertTrue(len(two_events), 2)
# Test ordering by the Info field. Can't use timestamp as each will likely have the same
event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=True, limit=1, pythonify=True)[0]
# First|Second|*Third* event
self.assertEqual(event.id, third.id)
# *First*|Second|Third event
event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=False, limit=1, pythonify=True)[0]
self.assertEqual(event.id, first.id)
finally:
# Delete event
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
self.admin_misp_connector.delete_event(third)
def test_search_objects(self):
'''Search for objects'''
try: