2017-08-31 10:40:18 +02:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
from .. import MISPObject
|
2018-03-27 14:57:07 +02:00
|
|
|
from ..exceptions import InvalidMISPObject
|
2018-03-28 09:47:36 +02:00
|
|
|
from datetime import datetime, date
|
2018-03-27 14:57:07 +02:00
|
|
|
from dateutil.parser import parse
|
2020-01-23 10:27:40 +01:00
|
|
|
from typing import Union, Optional
|
2017-08-31 10:40:18 +02:00
|
|
|
|
2020-01-30 11:44:13 +01:00
|
|
|
|
2017-08-31 10:40:18 +02:00
|
|
|
class AbstractMISPObjectGenerator(MISPObject):
|
|
|
|
|
2020-01-23 10:27:40 +01:00
|
|
|
def _detect_epoch(self, timestamp: Union[str, int, float]) -> bool:
|
2018-03-28 09:47:36 +02:00
|
|
|
try:
|
2018-03-28 10:02:47 +02:00
|
|
|
tmp = float(timestamp)
|
|
|
|
if tmp < 30000000:
|
|
|
|
# Assuming the user doesn't want to report anything before datetime(1970, 12, 14, 6, 20)
|
|
|
|
# The date is most probably in the format 20180301
|
|
|
|
return False
|
2018-03-28 09:47:36 +02:00
|
|
|
return True
|
|
|
|
except ValueError:
|
|
|
|
return False
|
|
|
|
|
2020-01-23 10:27:40 +01:00
|
|
|
def _sanitize_timestamp(self, timestamp: Optional[Union[datetime, date, dict, str, int, float]]=None) -> datetime:
|
2018-03-27 14:57:07 +02:00
|
|
|
if not timestamp:
|
|
|
|
return datetime.now()
|
2018-03-28 09:47:36 +02:00
|
|
|
|
|
|
|
if isinstance(timestamp, datetime):
|
|
|
|
return timestamp
|
|
|
|
elif isinstance(timestamp, date):
|
|
|
|
return datetime.combine(timestamp, datetime.min.time())
|
2018-03-27 14:57:07 +02:00
|
|
|
elif isinstance(timestamp, dict):
|
|
|
|
if not isinstance(timestamp['value'], datetime):
|
|
|
|
timestamp['value'] = parse(timestamp['value'])
|
2020-01-23 10:27:40 +01:00
|
|
|
return timestamp['value']
|
|
|
|
else: # Supported: float/int/string
|
|
|
|
if isinstance(timestamp, (str, int, float)) and self._detect_epoch(timestamp):
|
2020-05-26 15:37:24 +02:00
|
|
|
# It converts to the *local* datetime, which is consistent with the rest of the code.
|
2018-03-28 09:47:36 +02:00
|
|
|
return datetime.fromtimestamp(float(timestamp))
|
2020-01-23 10:27:40 +01:00
|
|
|
elif isinstance(timestamp, str):
|
|
|
|
return parse(timestamp)
|
|
|
|
else:
|
|
|
|
raise Exception(f'Unable to convert {timestamp} to a datetime.')
|
2018-03-27 14:57:07 +02:00
|
|
|
|
2017-08-31 10:40:18 +02:00
|
|
|
def generate_attributes(self):
|
|
|
|
"""Contains the logic where all the values of the object are gathered"""
|
2018-03-27 14:57:07 +02:00
|
|
|
if hasattr(self, '_parameters'):
|
|
|
|
for object_relation in self._definition['attributes']:
|
|
|
|
value = self._parameters.pop(object_relation, None)
|
|
|
|
if not value:
|
|
|
|
continue
|
|
|
|
if isinstance(value, dict):
|
|
|
|
self.add_attribute(object_relation, **value)
|
2019-04-09 17:54:12 +02:00
|
|
|
elif isinstance(value, list):
|
|
|
|
self.add_attributes(object_relation, *value)
|
2018-03-27 14:57:07 +02:00
|
|
|
else:
|
|
|
|
# Assume it is the value only
|
|
|
|
self.add_attribute(object_relation, value=value)
|
|
|
|
if self._strict and self._known_template and self._parameters:
|
|
|
|
raise InvalidMISPObject('Some object relations are unknown in the template and could not be attached: {}'.format(', '.join(self._parameters)))
|