mirror of https://github.com/MISP/PyMISP
fix: [search_index] Update date_from and date_to parameters to format as YYYY-MM-DD
parent
f4438dbe3a
commit
cf27fc6b75
|
@ -3139,9 +3139,9 @@ class PyMISP:
|
||||||
query.pop('self')
|
query.pop('self')
|
||||||
query.pop('pythonify')
|
query.pop('pythonify')
|
||||||
if query.get('date_from'):
|
if query.get('date_from'):
|
||||||
query['datefrom'] = self._make_timestamp(query.pop('date_from'))
|
query['datefrom'] = self._make_datestr(query.pop('date_from'))
|
||||||
if query.get('date_to'):
|
if query.get('date_to'):
|
||||||
query['dateuntil'] = self._make_timestamp(query.pop('date_to'))
|
query['dateuntil'] = self._make_datestr(query.pop('date_to'))
|
||||||
if isinstance(query.get('sharinggroup'), list):
|
if isinstance(query.get('sharinggroup'), list):
|
||||||
query['sharinggroup'] = '|'.join([str(sg) for sg in query['sharinggroup']])
|
query['sharinggroup'] = '|'.join([str(sg) for sg in query['sharinggroup']])
|
||||||
if query.get('timestamp') is not None:
|
if query.get('timestamp') is not None:
|
||||||
|
@ -3966,6 +3966,29 @@ class PyMISP:
|
||||||
return value
|
return value
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
def _make_datestr(self, value: datetime | date | int | str | float | None) -> str | None:
|
||||||
|
"""Catch-all method to normalize anything that can be converted to a YYYY-MM-DD date string"""
|
||||||
|
if not value:
|
||||||
|
return None
|
||||||
|
if isinstance(value, str):
|
||||||
|
if value.isdigit():
|
||||||
|
value = int(value)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
value = datetime.strptime(value, "%Y-%m-%d")
|
||||||
|
except Exception:
|
||||||
|
raise PyMISPError(f"Unable to parse {value} as date in format YYYY-MM-DD")
|
||||||
|
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
try:
|
||||||
|
value = datetime.fromtimestamp(value)
|
||||||
|
except Exception:
|
||||||
|
raise PyMISPError(f"Unable to parse {value} as timestamp")
|
||||||
|
|
||||||
|
if isinstance(value, (datetime, date)):
|
||||||
|
return value.strftime("%Y-%m-%d")
|
||||||
|
raise PyMISPError(f"{value} could not be resolved as a date")
|
||||||
|
|
||||||
def _check_json_response(self, response: requests.Response) -> dict[str, Any] | list[dict[str, Any]]:
|
def _check_json_response(self, response: requests.Response) -> dict[str, Any] | list[dict[str, Any]]:
|
||||||
r = self._check_response(response, expect_json=True)
|
r = self._check_response(response, expect_json=True)
|
||||||
if isinstance(r, (dict, list)):
|
if isinstance(r, (dict, list)):
|
||||||
|
|
|
@ -344,6 +344,32 @@ class TestComprehensive(unittest.TestCase):
|
||||||
# *First*|Second|Third event
|
# *First*|Second|Third event
|
||||||
event = self.admin_misp_connector.search_index(timestamp=ts, sort="info", desc=False, limit=1, pythonify=True)[0] # type: ignore[index,assignment]
|
event = self.admin_misp_connector.search_index(timestamp=ts, sort="info", desc=False, limit=1, pythonify=True)[0] # type: ignore[index,assignment]
|
||||||
self.assertEqual(event.id, first.id)
|
self.assertEqual(event.id, first.id)
|
||||||
|
|
||||||
|
# Test date_from (which in turn proves date_to)
|
||||||
|
# Represented as date
|
||||||
|
event: MISPEvent = self.admin_misp_connector.search_index(
|
||||||
|
date_from=first.date,
|
||||||
|
eventinfo=first.info, limit=1, pythonify=True
|
||||||
|
)[0]
|
||||||
|
self.assertEqual(event.id, first.id)
|
||||||
|
# Represented as timestamp
|
||||||
|
event: MISPEvent = self.admin_misp_connector.search_index(
|
||||||
|
date_from=datetime.combine(first.date, datetime.min.time()).timestamp(),
|
||||||
|
eventinfo=first.info, limit=1, pythonify=True
|
||||||
|
)[0]
|
||||||
|
self.assertEqual(event.id, first.id)
|
||||||
|
# Represented as string
|
||||||
|
event: MISPEvent = self.admin_misp_connector.search_index(
|
||||||
|
date_from=first.date.strftime("%Y-%m-%d"),
|
||||||
|
eventinfo=first.info, limit=1, pythonify=True
|
||||||
|
)[0]
|
||||||
|
self.assertEqual(event.id, first.id)
|
||||||
|
# Check it actually works (increase date to lose match)
|
||||||
|
no_events = self.admin_misp_connector.search_index(
|
||||||
|
date_from=first.date + timedelta(days=1),
|
||||||
|
eventinfo=first.info, limit=1, pythonify=True
|
||||||
|
)
|
||||||
|
self.assertTrue(len(no_events) == 0)
|
||||||
finally:
|
finally:
|
||||||
# Delete event
|
# Delete event
|
||||||
self.admin_misp_connector.delete_event(first)
|
self.admin_misp_connector.delete_event(first)
|
||||||
|
|
Loading…
Reference in New Issue