mirror of https://github.com/MISP/PyMISP
parent
97d960883c
commit
32445973bd
|
@ -1,7 +1,7 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import datetime
|
from datetime import date, datetime, timezone
|
||||||
|
|
||||||
from deprecated import deprecated # type: ignore
|
from deprecated import deprecated # type: ignore
|
||||||
from json import JSONEncoder
|
from json import JSONEncoder
|
||||||
|
@ -86,7 +86,7 @@ class MISPEncode(JSONEncoder):
|
||||||
def default(self, obj):
|
def default(self, obj):
|
||||||
if isinstance(obj, AbstractMISP):
|
if isinstance(obj, AbstractMISP):
|
||||||
return obj.jsonable()
|
return obj.jsonable()
|
||||||
elif isinstance(obj, (datetime.datetime, datetime.date)):
|
elif isinstance(obj, (datetime, date)):
|
||||||
return obj.isoformat()
|
return obj.isoformat()
|
||||||
elif isinstance(obj, Enum):
|
elif isinstance(obj, Enum):
|
||||||
return obj.value
|
return obj.value
|
||||||
|
@ -189,6 +189,12 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
val = self._datetime_to_timestamp(val)
|
val = self._datetime_to_timestamp(val)
|
||||||
|
if (attribute in ['first_seen', 'last_seen', 'datetime']
|
||||||
|
and isinstance(val, datetime)
|
||||||
|
and not val.tzinfo):
|
||||||
|
# Need to make sure the timezone is set. Otherwise, it will be processed as UTC on the server
|
||||||
|
val = val.astimezone()
|
||||||
|
|
||||||
to_return[attribute] = val
|
to_return[attribute] = val
|
||||||
to_return = _int_to_str(to_return)
|
to_return = _int_to_str(to_return)
|
||||||
return to_return
|
return to_return
|
||||||
|
@ -207,7 +213,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
if getattr(self, field, None) is not None:
|
if getattr(self, field, None) is not None:
|
||||||
if field in ['timestamp', 'publish_timestamp']:
|
if field in ['timestamp', 'publish_timestamp']:
|
||||||
to_return[field] = self._datetime_to_timestamp(getattr(self, field))
|
to_return[field] = self._datetime_to_timestamp(getattr(self, field))
|
||||||
elif isinstance(getattr(self, field), (datetime.datetime, datetime.date)):
|
elif isinstance(getattr(self, field), (datetime, date)):
|
||||||
to_return[field] = getattr(self, field).isoformat()
|
to_return[field] = getattr(self, field).isoformat()
|
||||||
else:
|
else:
|
||||||
to_return[field] = getattr(self, field)
|
to_return[field] = getattr(self, field)
|
||||||
|
@ -274,8 +280,8 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
self.__edited = True
|
self.__edited = True
|
||||||
super().__setattr__(name, value)
|
super().__setattr__(name, value)
|
||||||
|
|
||||||
def _datetime_to_timestamp(self, d: Union[int, float, str, datetime.datetime]) -> int:
|
def _datetime_to_timestamp(self, d: Union[int, float, str, datetime]) -> int:
|
||||||
"""Convert a datetime.datetime object to a timestamp (int)"""
|
"""Convert a datetime object to a timestamp (int)"""
|
||||||
if isinstance(d, (int, float, str)):
|
if isinstance(d, (int, float, str)):
|
||||||
# Assume we already have a timestamp
|
# Assume we already have a timestamp
|
||||||
return int(d)
|
return int(d)
|
||||||
|
@ -346,20 +352,20 @@ class MISPTag(AbstractMISP):
|
||||||
|
|
||||||
|
|
||||||
if HAS_RAPIDJSON:
|
if HAS_RAPIDJSON:
|
||||||
def pymisp_json_default(obj: Union[AbstractMISP, datetime.datetime, datetime.date, Enum, UUID]) -> Union[dict, str]:
|
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[dict, str]:
|
||||||
if isinstance(obj, AbstractMISP):
|
if isinstance(obj, AbstractMISP):
|
||||||
return obj.jsonable()
|
return obj.jsonable()
|
||||||
elif isinstance(obj, (datetime.datetime, datetime.date)):
|
elif isinstance(obj, (datetime, date)):
|
||||||
return obj.isoformat()
|
return obj.isoformat()
|
||||||
elif isinstance(obj, Enum):
|
elif isinstance(obj, Enum):
|
||||||
return obj.value
|
return obj.value
|
||||||
elif isinstance(obj, UUID):
|
elif isinstance(obj, UUID):
|
||||||
return str(obj)
|
return str(obj)
|
||||||
else:
|
else:
|
||||||
def pymisp_json_default(obj: Union[AbstractMISP, datetime.datetime, datetime.date, Enum, UUID]) -> Union[dict, str]:
|
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[dict, str]:
|
||||||
if isinstance(obj, AbstractMISP):
|
if isinstance(obj, AbstractMISP):
|
||||||
return obj.jsonable()
|
return obj.jsonable()
|
||||||
elif isinstance(obj, (datetime.datetime, datetime.date)):
|
elif isinstance(obj, (datetime, date)):
|
||||||
return obj.isoformat()
|
return obj.isoformat()
|
||||||
elif isinstance(obj, Enum):
|
elif isinstance(obj, Enum):
|
||||||
return obj.value
|
return obj.value
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import datetime
|
from datetime import timezone, datetime, date
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import base64
|
import base64
|
||||||
|
@ -88,6 +88,11 @@ class MISPSharingGroup(AbstractMISP):
|
||||||
|
|
||||||
class MISPShadowAttribute(AbstractMISP):
|
class MISPShadowAttribute(AbstractMISP):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.type: str
|
||||||
|
self.value: str
|
||||||
|
|
||||||
def from_dict(self, **kwargs):
|
def from_dict(self, **kwargs):
|
||||||
if 'ShadowAttribute' in kwargs:
|
if 'ShadowAttribute' in kwargs:
|
||||||
kwargs = kwargs['ShadowAttribute']
|
kwargs = kwargs['ShadowAttribute']
|
||||||
|
@ -95,12 +100,17 @@ class MISPShadowAttribute(AbstractMISP):
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
if hasattr(self, 'value'):
|
if hasattr(self, 'value'):
|
||||||
return f'<{self.__class__.__name__}(type={self.type}, value={self.value})' # type: ignore
|
return f'<{self.__class__.__name__}(type={self.type}, value={self.value})'
|
||||||
return f'<{self.__class__.__name__}(NotInitialized)'
|
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||||
|
|
||||||
|
|
||||||
class MISPSighting(AbstractMISP):
|
class MISPSighting(AbstractMISP):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.id: int
|
||||||
|
self.value: str
|
||||||
|
|
||||||
def from_dict(self, **kwargs):
|
def from_dict(self, **kwargs):
|
||||||
"""Initialize the MISPSighting from a dictionary
|
"""Initialize the MISPSighting from a dictionary
|
||||||
:value: Value of the attribute the sighting is related too. Pushing this object
|
:value: Value of the attribute the sighting is related too. Pushing this object
|
||||||
|
@ -117,11 +127,11 @@ class MISPSighting(AbstractMISP):
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
if hasattr(self, 'value'):
|
if hasattr(self, 'value'):
|
||||||
return '<{self.__class__.__name__}(value={self.value})'.format(self=self) # type: ignore
|
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
|
||||||
if hasattr(self, 'id'):
|
if hasattr(self, 'id'):
|
||||||
return '<{self.__class__.__name__}(id={self.id})'.format(self=self) # type: ignore
|
return '<{self.__class__.__name__}(id={self.id})'.format(self=self)
|
||||||
if hasattr(self, 'uuid'):
|
if hasattr(self, 'uuid'):
|
||||||
return '<{self.__class__.__name__}(uuid={self.uuid})'.format(self=self) # type: ignore
|
return '<{self.__class__.__name__}(uuid={self.uuid})'.format(self=self)
|
||||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||||
|
|
||||||
|
|
||||||
|
@ -142,6 +152,8 @@ class MISPAttribute(AbstractMISP):
|
||||||
self.__sane_default: dict = self.describe_types['sane_defaults']
|
self.__sane_default: dict = self.describe_types['sane_defaults']
|
||||||
self.__strict: bool = strict
|
self.__strict: bool = strict
|
||||||
self._data: Optional[BytesIO] = None
|
self._data: Optional[BytesIO] = None
|
||||||
|
self.first_seen: datetime
|
||||||
|
self.last_seen: datetime
|
||||||
self.uuid: str = str(uuid.uuid4())
|
self.uuid: str = str(uuid.uuid4())
|
||||||
self.ShadowAttribute: List[MISPShadowAttribute] = []
|
self.ShadowAttribute: List[MISPShadowAttribute] = []
|
||||||
self.SharingGroup: MISPSharingGroup
|
self.SharingGroup: MISPSharingGroup
|
||||||
|
@ -165,6 +177,30 @@ class MISPAttribute(AbstractMISP):
|
||||||
"""Set a list of prepared MISPTag."""
|
"""Set a list of prepared MISPTag."""
|
||||||
super()._set_tags(tags)
|
super()._set_tags(tags)
|
||||||
|
|
||||||
|
def __setattr__(self, name, value):
|
||||||
|
if name in ['first_seen', 'last_seen']:
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
# Timestamp
|
||||||
|
value = datetime.fromtimestamp(value)
|
||||||
|
elif isinstance(value, str):
|
||||||
|
value = parse(value)
|
||||||
|
elif isinstance(value, date):
|
||||||
|
value = datetime.combine(falue, datetime.min.time())
|
||||||
|
elif isinstance(value, datetime):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise PyMISPError(f'Invalid format for {name}: {type(value)}.')
|
||||||
|
|
||||||
|
if not value.tzinfo:
|
||||||
|
# set localtimezone if not present
|
||||||
|
value = value.astimezone()
|
||||||
|
|
||||||
|
if name == 'last_seen' and hasattr(self, 'first_seen') and self.first_seen > value:
|
||||||
|
raise PyMISPError('last_seen ({value}) has to be after first_seen ({self.first_seen})')
|
||||||
|
if name == 'first_seen' and hasattr(self, 'last_seen') and self.last_seen < value:
|
||||||
|
raise PyMISPError('first_seen ({value}) has to be before last_seen ({self.last_seen})')
|
||||||
|
super().__setattr__(name, value)
|
||||||
|
|
||||||
def hash_values(self, algorithm: str='sha512') -> List[str]:
|
def hash_values(self, algorithm: str='sha512') -> List[str]:
|
||||||
"""Compute the hash of every values for fast lookups"""
|
"""Compute the hash of every values for fast lookups"""
|
||||||
if algorithm not in hashlib.algorithms_available:
|
if algorithm not in hashlib.algorithms_available:
|
||||||
|
@ -188,14 +224,14 @@ class MISPAttribute(AbstractMISP):
|
||||||
if not hasattr(self, 'comment'):
|
if not hasattr(self, 'comment'):
|
||||||
self.comment = ''
|
self.comment = ''
|
||||||
if not hasattr(self, 'timestamp'):
|
if not hasattr(self, 'timestamp'):
|
||||||
self.timestamp = datetime.datetime.timestamp(datetime.datetime.now())
|
self.timestamp = datetime.timestamp(datetime.now())
|
||||||
|
|
||||||
def _to_feed(self) -> dict:
|
def _to_feed(self) -> dict:
|
||||||
to_return = super()._to_feed()
|
to_return = super()._to_feed()
|
||||||
if self.data:
|
if self.data:
|
||||||
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
||||||
if self.tags: # type: ignore
|
if self.tags:
|
||||||
to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags])) # type: ignore
|
to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags]))
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -298,10 +334,12 @@ class MISPAttribute(AbstractMISP):
|
||||||
raise NewAttributeError('The value of the attribute is required.')
|
raise NewAttributeError('The value of the attribute is required.')
|
||||||
if self.type == 'datetime' and isinstance(self.value, str):
|
if self.type == 'datetime' and isinstance(self.value, str):
|
||||||
try:
|
try:
|
||||||
if '.' in self.value:
|
if '+' in self.value or '-' in self.value:
|
||||||
self.value = datetime.datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S.%f")
|
self.value = datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||||
|
elif '.' in self.value:
|
||||||
|
self.value = datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S.%f")
|
||||||
else:
|
else:
|
||||||
self.value = datetime.datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S")
|
self.value = datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S")
|
||||||
except ValueError:
|
except ValueError:
|
||||||
# Slower, but if the other ones fail, that's a good fallback
|
# Slower, but if the other ones fail, that's a good fallback
|
||||||
self.value = parse(self.value)
|
self.value = parse(self.value)
|
||||||
|
@ -338,10 +376,28 @@ class MISPAttribute(AbstractMISP):
|
||||||
self.event_id = int(kwargs.pop('event_id'))
|
self.event_id = int(kwargs.pop('event_id'))
|
||||||
if kwargs.get('timestamp'):
|
if kwargs.get('timestamp'):
|
||||||
ts = kwargs.pop('timestamp')
|
ts = kwargs.pop('timestamp')
|
||||||
if isinstance(ts, datetime.datetime):
|
if isinstance(ts, datetime):
|
||||||
self.timestamp = ts
|
self.timestamp = ts
|
||||||
else:
|
else:
|
||||||
self.timestamp = datetime.datetime.fromtimestamp(int(ts), datetime.timezone.utc)
|
self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc)
|
||||||
|
if kwargs.get('first_seen'):
|
||||||
|
fs = kwargs.pop('first_seen')
|
||||||
|
try:
|
||||||
|
# Faster
|
||||||
|
self.first_seen = datetime.strptime(fs, "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||||
|
except:
|
||||||
|
# Use __setattr__
|
||||||
|
self.first_seen = fs
|
||||||
|
|
||||||
|
if kwargs.get('last_seen'):
|
||||||
|
ls = kwargs.pop('last_seen')
|
||||||
|
try:
|
||||||
|
# Faster
|
||||||
|
self.last_seen = datetime.strptime(kwargs.pop('last_seen'), "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||||
|
except:
|
||||||
|
# Use __setattr__
|
||||||
|
self.last_seen = ls
|
||||||
|
|
||||||
if kwargs.get('sharing_group_id'):
|
if kwargs.get('sharing_group_id'):
|
||||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||||
|
|
||||||
|
@ -490,7 +546,7 @@ class MISPObjectReference(AbstractMISP):
|
||||||
if not hasattr(self, 'comment'):
|
if not hasattr(self, 'comment'):
|
||||||
self.comment = ''
|
self.comment = ''
|
||||||
if not hasattr(self, 'timestamp'):
|
if not hasattr(self, 'timestamp'):
|
||||||
self.timestamp = datetime.datetime.timestamp(datetime.datetime.now())
|
self.timestamp = datetime.timestamp(datetime.now())
|
||||||
|
|
||||||
def from_dict(self, **kwargs):
|
def from_dict(self, **kwargs):
|
||||||
if 'ObjectReference' in kwargs:
|
if 'ObjectReference' in kwargs:
|
||||||
|
@ -531,6 +587,8 @@ class MISPObject(AbstractMISP):
|
||||||
self._set_template(kwargs.get('misp_objects_path_custom'))
|
self._set_template(kwargs.get('misp_objects_path_custom'))
|
||||||
|
|
||||||
self.uuid: str = str(uuid.uuid4())
|
self.uuid: str = str(uuid.uuid4())
|
||||||
|
self.first_seen: datetime
|
||||||
|
self.last_seen: datetime
|
||||||
self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes]
|
self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes]
|
||||||
self.ObjectReference: List[MISPObjectReference] = []
|
self.ObjectReference: List[MISPObjectReference] = []
|
||||||
self.Attribute: List[MISPAttribute] = []
|
self.Attribute: List[MISPAttribute] = []
|
||||||
|
@ -579,7 +637,7 @@ class MISPObject(AbstractMISP):
|
||||||
if not hasattr(self, 'comment'):
|
if not hasattr(self, 'comment'):
|
||||||
self.comment = ''
|
self.comment = ''
|
||||||
if not hasattr(self, 'timestamp'):
|
if not hasattr(self, 'timestamp'):
|
||||||
self.timestamp = datetime.datetime.timestamp(datetime.datetime.now())
|
self.timestamp = datetime.timestamp(datetime.now())
|
||||||
|
|
||||||
def _to_feed(self) -> dict:
|
def _to_feed(self) -> dict:
|
||||||
to_return = super(MISPObject, self)._to_feed()
|
to_return = super(MISPObject, self)._to_feed()
|
||||||
|
@ -587,6 +645,30 @@ class MISPObject(AbstractMISP):
|
||||||
to_return['ObjectReference'] = [reference._to_feed() for reference in self.references]
|
to_return['ObjectReference'] = [reference._to_feed() for reference in self.references]
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
def __setattr__(self, name, value):
|
||||||
|
if name in ['first_seen', 'last_seen']:
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
pass
|
||||||
|
elif isinstance(value, (int, float)):
|
||||||
|
# Timestamp
|
||||||
|
value = datetime.fromtimestamp(value)
|
||||||
|
elif isinstance(value, str):
|
||||||
|
value = parse(value)
|
||||||
|
elif isinstance(value, date):
|
||||||
|
value = datetime.combine(falue, datetime.min.time())
|
||||||
|
else:
|
||||||
|
raise PyMISPError(f'Invalid format for {name}: {type(value)}.')
|
||||||
|
|
||||||
|
if not value.tzinfo:
|
||||||
|
# set localtimezone if not present
|
||||||
|
value = value.astimezone()
|
||||||
|
|
||||||
|
if name == 'last_seen' and hasattr(self, 'first_seen') and self.first_seen > value:
|
||||||
|
raise PyMISPError('last_seen ({value}) has to be after first_seen ({self.first_seen})')
|
||||||
|
if name == 'first_seen' and hasattr(self, 'last_seen') and self.last_seen < value:
|
||||||
|
raise PyMISPError('first_seen ({value}) has to be before last_seen ({self.last_seen})')
|
||||||
|
super().__setattr__(name, value)
|
||||||
|
|
||||||
def force_misp_objects_path_custom(self, misp_objects_path_custom: Union[Path, str], object_name: Optional[str]=None):
|
def force_misp_objects_path_custom(self, misp_objects_path_custom: Union[Path, str], object_name: Optional[str]=None):
|
||||||
if object_name:
|
if object_name:
|
||||||
self.name = object_name
|
self.name = object_name
|
||||||
|
@ -659,10 +741,29 @@ class MISPObject(AbstractMISP):
|
||||||
|
|
||||||
if kwargs.get('timestamp'):
|
if kwargs.get('timestamp'):
|
||||||
ts = kwargs.pop('timestamp')
|
ts = kwargs.pop('timestamp')
|
||||||
if isinstance(ts, datetime.datetime):
|
if isinstance(ts, datetime):
|
||||||
self.timestamp = ts
|
self.timestamp = ts
|
||||||
else:
|
else:
|
||||||
self.timestamp = datetime.datetime.fromtimestamp(int(ts), datetime.timezone.utc)
|
self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc)
|
||||||
|
|
||||||
|
if kwargs.get('first_seen'):
|
||||||
|
fs = kwargs.pop('first_seen')
|
||||||
|
try:
|
||||||
|
# Faster
|
||||||
|
self.first_seen = datetime.strptime(fs, "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||||
|
except:
|
||||||
|
# Use __setattr__
|
||||||
|
self.first_seen = fs
|
||||||
|
|
||||||
|
if kwargs.get('last_seen'):
|
||||||
|
ls = kwargs.pop('last_seen')
|
||||||
|
try:
|
||||||
|
# Faster
|
||||||
|
self.last_seen = datetime.strptime(kwargs.pop('last_seen'), "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||||
|
except:
|
||||||
|
# Use __setattr__
|
||||||
|
self.last_seen = ls
|
||||||
|
|
||||||
if kwargs.get('Attribute'):
|
if kwargs.get('Attribute'):
|
||||||
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
|
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
|
||||||
if kwargs.get('ObjectReference'):
|
if kwargs.get('ObjectReference'):
|
||||||
|
@ -803,6 +904,7 @@ class MISPEvent(AbstractMISP):
|
||||||
# This variable is used in add_attribute in order to avoid duplicating the structure
|
# This variable is used in add_attribute in order to avoid duplicating the structure
|
||||||
self.describe_types = describe_types
|
self.describe_types = describe_types
|
||||||
|
|
||||||
|
self.date: date
|
||||||
self.Attribute: List[MISPAttribute] = []
|
self.Attribute: List[MISPAttribute] = []
|
||||||
self.Object: List[MISPObject] = []
|
self.Object: List[MISPObject] = []
|
||||||
self.RelatedEvent: List[MISPEvent] = []
|
self.RelatedEvent: List[MISPEvent] = []
|
||||||
|
@ -832,11 +934,11 @@ class MISPEvent(AbstractMISP):
|
||||||
if not hasattr(self, 'extends_uuid'):
|
if not hasattr(self, 'extends_uuid'):
|
||||||
self.extends_uuid = ''
|
self.extends_uuid = ''
|
||||||
if not hasattr(self, 'date'):
|
if not hasattr(self, 'date'):
|
||||||
self.set_date(datetime.date.today())
|
self.set_date(date.today())
|
||||||
if not hasattr(self, 'timestamp'):
|
if not hasattr(self, 'timestamp'):
|
||||||
self.timestamp = datetime.datetime.timestamp(datetime.datetime.now())
|
self.timestamp = datetime.timestamp(datetime.now())
|
||||||
if not hasattr(self, 'publish_timestamp'):
|
if not hasattr(self, 'publish_timestamp'):
|
||||||
self.publish_timestamp = datetime.datetime.timestamp(datetime.datetime.now())
|
self.publish_timestamp = datetime.timestamp(datetime.now())
|
||||||
if not hasattr(self, 'analysis'):
|
if not hasattr(self, 'analysis'):
|
||||||
# analysis: 0 means initial, 1 ongoing, 2 completed
|
# analysis: 0 means initial, 1 ongoing, 2 completed
|
||||||
self.analysis = 2
|
self.analysis = 2
|
||||||
|
@ -1006,21 +1108,28 @@ class MISPEvent(AbstractMISP):
|
||||||
if validate:
|
if validate:
|
||||||
jsonschema.validate(json.loads(self.to_json()), self.__json_schema)
|
jsonschema.validate(json.loads(self.to_json()), self.__json_schema)
|
||||||
|
|
||||||
def set_date(self, date: Union[str, int, datetime.datetime, datetime.date, None], ignore_invalid: bool=False):
|
def __setattr__(self, name, value):
|
||||||
"""Set a date for the event (string, datetime, or date object)"""
|
if name in ['date']:
|
||||||
if isinstance(date, str):
|
if isinstance(value, date):
|
||||||
self.date = parse(date).date()
|
pass
|
||||||
elif isinstance(date, int):
|
elif isinstance(value, str):
|
||||||
self.date = datetime.datetime.utcfromtimestamp(date).date()
|
value = parse(value).date()
|
||||||
elif isinstance(date, datetime.datetime):
|
elif isinstance(value, (int, float)):
|
||||||
self.date = date.date()
|
value = date.fromtimestamp(value)
|
||||||
elif isinstance(date, datetime.date):
|
elif isinstance(value, datetime):
|
||||||
self.date = date
|
value = value.date()
|
||||||
else:
|
|
||||||
if ignore_invalid:
|
|
||||||
self.date = datetime.date.today()
|
|
||||||
else:
|
else:
|
||||||
raise NewEventError('Invalid format for the date: {} - {}'.format(date, type(date)))
|
raise NewEventError(f'Invalid format for the date: {type(value)} - {value}')
|
||||||
|
super().__setattr__(name, value)
|
||||||
|
|
||||||
|
def set_date(self, d: Optional[Union[str, int, float, datetime, date]]=None, ignore_invalid: bool=False):
|
||||||
|
"""Set a date for the event (string, datetime, or date object)"""
|
||||||
|
if isinstance(d, (str, int, float, datetime, date)):
|
||||||
|
self.date = d # type: ignore
|
||||||
|
elif ignore_invalid:
|
||||||
|
self.date = date.today()
|
||||||
|
else:
|
||||||
|
raise NewEventError(f'Invalid format for the date: {type(d)} - {d}')
|
||||||
|
|
||||||
def from_dict(self, **kwargs):
|
def from_dict(self, **kwargs):
|
||||||
if 'Event' in kwargs:
|
if 'Event' in kwargs:
|
||||||
|
@ -1066,11 +1175,11 @@ class MISPEvent(AbstractMISP):
|
||||||
if kwargs.get('org_id'):
|
if kwargs.get('org_id'):
|
||||||
self.org_id = int(kwargs.pop('org_id'))
|
self.org_id = int(kwargs.pop('org_id'))
|
||||||
if kwargs.get('timestamp'):
|
if kwargs.get('timestamp'):
|
||||||
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), datetime.timezone.utc)
|
self.timestamp = datetime.fromtimestamp(int(kwargs.pop('timestamp')), timezone.utc)
|
||||||
if kwargs.get('publish_timestamp'):
|
if kwargs.get('publish_timestamp'):
|
||||||
self.publish_timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('publish_timestamp')), datetime.timezone.utc)
|
self.publish_timestamp = datetime.fromtimestamp(int(kwargs.pop('publish_timestamp')), timezone.utc)
|
||||||
if kwargs.get('sighting_timestamp'):
|
if kwargs.get('sighting_timestamp'):
|
||||||
self.sighting_timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('sighting_timestamp')), datetime.timezone.utc)
|
self.sighting_timestamp = datetime.fromtimestamp(int(kwargs.pop('sighting_timestamp')), timezone.utc)
|
||||||
if kwargs.get('sharing_group_id'):
|
if kwargs.get('sharing_group_id'):
|
||||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||||
if kwargs.get('RelatedEvent'):
|
if kwargs.get('RelatedEvent'):
|
||||||
|
@ -1097,7 +1206,7 @@ class MISPEvent(AbstractMISP):
|
||||||
to_return = super().to_dict()
|
to_return = super().to_dict()
|
||||||
|
|
||||||
if to_return.get('date'):
|
if to_return.get('date'):
|
||||||
if isinstance(self.date, datetime.datetime):
|
if isinstance(self.date, datetime):
|
||||||
self.date = self.date.date()
|
self.date = self.date.date()
|
||||||
to_return['date'] = self.date.isoformat()
|
to_return['date'] = self.date.isoformat()
|
||||||
if to_return.get('publish_timestamp'):
|
if to_return.get('publish_timestamp'):
|
||||||
|
|
|
@ -7,7 +7,7 @@ import sys
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from pymisp.tools import make_binary_objects
|
from pymisp.tools import make_binary_objects
|
||||||
from datetime import datetime, timedelta, date
|
from datetime import datetime, timedelta, date, timezone
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
@ -1977,7 +1977,6 @@ class TestComprehensive(unittest.TestCase):
|
||||||
self.admin_misp_connector.delete_user(test_roles_user)
|
self.admin_misp_connector.delete_user(test_roles_user)
|
||||||
self.admin_misp_connector.delete_tag(test_tag)
|
self.admin_misp_connector.delete_tag(test_tag)
|
||||||
|
|
||||||
@unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6')
|
|
||||||
def test_expansion(self):
|
def test_expansion(self):
|
||||||
first = self.create_simple_event()
|
first = self.create_simple_event()
|
||||||
try:
|
try:
|
||||||
|
@ -2046,7 +2045,6 @@ class TestComprehensive(unittest.TestCase):
|
||||||
self.admin_misp_connector.delete_event(first)
|
self.admin_misp_connector.delete_event(first)
|
||||||
self.admin_misp_connector.delete_event(second)
|
self.admin_misp_connector.delete_event(second)
|
||||||
|
|
||||||
@unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6')
|
|
||||||
def test_communities(self):
|
def test_communities(self):
|
||||||
communities = self.admin_misp_connector.communities(pythonify=True)
|
communities = self.admin_misp_connector.communities(pythonify=True)
|
||||||
self.assertEqual(communities[0].name, 'CIRCL Private Sector Information Sharing Community - aka MISPPRIV')
|
self.assertEqual(communities[0].name, 'CIRCL Private Sector Information Sharing Community - aka MISPPRIV')
|
||||||
|
@ -2080,6 +2078,30 @@ class TestComprehensive(unittest.TestCase):
|
||||||
self.admin_misp_connector.delete_event(first)
|
self.admin_misp_connector.delete_event(first)
|
||||||
self.admin_misp_connector.delete_event(second)
|
self.admin_misp_connector.delete_event(second)
|
||||||
|
|
||||||
|
def test_first_last_seen(self):
|
||||||
|
event = MISPEvent()
|
||||||
|
event.info = 'Test First Last seen'
|
||||||
|
event.add_attribute('ip-dst', '8.8.8.8', first_seen='2020-01-04', last_seen='2020-01-04T12:30:34.323242+8:00')
|
||||||
|
obj = event.add_object(name='file', first_seen=1580147259.268763, last_seen=1580147300)
|
||||||
|
attr = obj.add_attribute('filename', 'blah.exe')
|
||||||
|
attr.first_seen = '2022-01-30'
|
||||||
|
attr.last_seen = '2022-02-23'
|
||||||
|
try:
|
||||||
|
first = self.admin_misp_connector.add_event(event, pythonify=True)
|
||||||
|
# Simple attribute
|
||||||
|
self.assertEqual(first.attributes[0].first_seen, datetime(2020, 1, 3, 23, 0, tzinfo=timezone.utc))
|
||||||
|
self.assertEqual(first.attributes[0].last_seen, datetime(2020, 1, 4, 4, 30, 34, 323242, tzinfo=timezone.utc))
|
||||||
|
|
||||||
|
# Object
|
||||||
|
self.assertEqual(first.objects[0].first_seen, datetime(2020, 1, 27, 17, 47, 39, 268763, tzinfo=timezone.utc))
|
||||||
|
self.assertEqual(first.objects[0].last_seen, datetime(2020, 1, 27, 17, 48, 20, tzinfo=timezone.utc))
|
||||||
|
|
||||||
|
# Object attribute
|
||||||
|
self.assertEqual(first.objects[0].attributes[0].first_seen, datetime(2022, 1, 29, 23, 0, tzinfo=timezone.utc))
|
||||||
|
self.assertEqual(first.objects[0].attributes[0].last_seen, datetime(2022, 2, 22, 23, 0, tzinfo=timezone.utc))
|
||||||
|
finally:
|
||||||
|
self.admin_misp_connector.delete_event(first)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue