From 32445973bdb967510e552e9cbeb2bb67d2f2d1f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 27 Jan 2020 19:07:40 +0100 Subject: [PATCH] new: Support for first_seen/last_seen Cleaner import of datetime --- pymisp/abstract.py | 24 +++-- pymisp/mispevent.py | 185 +++++++++++++++++++++++++------- tests/testlive_comprehensive.py | 28 ++++- 3 files changed, 187 insertions(+), 50 deletions(-) diff --git a/pymisp/abstract.py b/pymisp/abstract.py index 51b232a..6ac564d 100644 --- a/pymisp/abstract.py +++ b/pymisp/abstract.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import datetime +from datetime import date, datetime, timezone from deprecated import deprecated # type: ignore from json import JSONEncoder @@ -86,7 +86,7 @@ class MISPEncode(JSONEncoder): def default(self, obj): if isinstance(obj, AbstractMISP): return obj.jsonable() - elif isinstance(obj, (datetime.datetime, datetime.date)): + elif isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, Enum): return obj.value @@ -189,6 +189,12 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): continue else: val = self._datetime_to_timestamp(val) + if (attribute in ['first_seen', 'last_seen', 'datetime'] + and isinstance(val, datetime) + and not val.tzinfo): + # Need to make sure the timezone is set. Otherwise, it will be processed as UTC on the server + val = val.astimezone() + to_return[attribute] = val to_return = _int_to_str(to_return) return to_return @@ -207,7 +213,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): if getattr(self, field, None) is not None: if field in ['timestamp', 'publish_timestamp']: to_return[field] = self._datetime_to_timestamp(getattr(self, field)) - elif isinstance(getattr(self, field), (datetime.datetime, datetime.date)): + elif isinstance(getattr(self, field), (datetime, date)): to_return[field] = getattr(self, field).isoformat() else: to_return[field] = getattr(self, field) @@ -274,8 +280,8 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): self.__edited = True super().__setattr__(name, value) - def _datetime_to_timestamp(self, d: Union[int, float, str, datetime.datetime]) -> int: - """Convert a datetime.datetime object to a timestamp (int)""" + def _datetime_to_timestamp(self, d: Union[int, float, str, datetime]) -> int: + """Convert a datetime object to a timestamp (int)""" if isinstance(d, (int, float, str)): # Assume we already have a timestamp return int(d) @@ -346,20 +352,20 @@ class MISPTag(AbstractMISP): if HAS_RAPIDJSON: - def pymisp_json_default(obj: Union[AbstractMISP, datetime.datetime, datetime.date, Enum, UUID]) -> Union[dict, str]: + def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[dict, str]: if isinstance(obj, AbstractMISP): return obj.jsonable() - elif isinstance(obj, (datetime.datetime, datetime.date)): + elif isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, Enum): return obj.value elif isinstance(obj, UUID): return str(obj) else: - def pymisp_json_default(obj: Union[AbstractMISP, datetime.datetime, datetime.date, Enum, UUID]) -> Union[dict, str]: + def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[dict, str]: if isinstance(obj, AbstractMISP): return obj.jsonable() - elif isinstance(obj, (datetime.datetime, datetime.date)): + elif isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, Enum): return obj.value diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 83753ff..f9b49b1 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -import datetime +from datetime import timezone, datetime, date import json import os import base64 @@ -88,6 +88,11 @@ class MISPSharingGroup(AbstractMISP): class MISPShadowAttribute(AbstractMISP): + def __init__(self): + super().__init__() + self.type: str + self.value: str + def from_dict(self, **kwargs): if 'ShadowAttribute' in kwargs: kwargs = kwargs['ShadowAttribute'] @@ -95,12 +100,17 @@ class MISPShadowAttribute(AbstractMISP): def __repr__(self) -> str: if hasattr(self, 'value'): - return f'<{self.__class__.__name__}(type={self.type}, value={self.value})' # type: ignore + return f'<{self.__class__.__name__}(type={self.type}, value={self.value})' return f'<{self.__class__.__name__}(NotInitialized)' class MISPSighting(AbstractMISP): + def __init__(self): + super().__init__() + self.id: int + self.value: str + def from_dict(self, **kwargs): """Initialize the MISPSighting from a dictionary :value: Value of the attribute the sighting is related too. Pushing this object @@ -117,11 +127,11 @@ class MISPSighting(AbstractMISP): def __repr__(self) -> str: if hasattr(self, 'value'): - return '<{self.__class__.__name__}(value={self.value})'.format(self=self) # type: ignore + return '<{self.__class__.__name__}(value={self.value})'.format(self=self) if hasattr(self, 'id'): - return '<{self.__class__.__name__}(id={self.id})'.format(self=self) # type: ignore + return '<{self.__class__.__name__}(id={self.id})'.format(self=self) if hasattr(self, 'uuid'): - return '<{self.__class__.__name__}(uuid={self.uuid})'.format(self=self) # type: ignore + return '<{self.__class__.__name__}(uuid={self.uuid})'.format(self=self) return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) @@ -142,6 +152,8 @@ class MISPAttribute(AbstractMISP): self.__sane_default: dict = self.describe_types['sane_defaults'] self.__strict: bool = strict self._data: Optional[BytesIO] = None + self.first_seen: datetime + self.last_seen: datetime self.uuid: str = str(uuid.uuid4()) self.ShadowAttribute: List[MISPShadowAttribute] = [] self.SharingGroup: MISPSharingGroup @@ -165,6 +177,30 @@ class MISPAttribute(AbstractMISP): """Set a list of prepared MISPTag.""" super()._set_tags(tags) + def __setattr__(self, name, value): + if name in ['first_seen', 'last_seen']: + if isinstance(value, (int, float)): + # Timestamp + value = datetime.fromtimestamp(value) + elif isinstance(value, str): + value = parse(value) + elif isinstance(value, date): + value = datetime.combine(falue, datetime.min.time()) + elif isinstance(value, datetime): + pass + else: + raise PyMISPError(f'Invalid format for {name}: {type(value)}.') + + if not value.tzinfo: + # set localtimezone if not present + value = value.astimezone() + + if name == 'last_seen' and hasattr(self, 'first_seen') and self.first_seen > value: + raise PyMISPError('last_seen ({value}) has to be after first_seen ({self.first_seen})') + if name == 'first_seen' and hasattr(self, 'last_seen') and self.last_seen < value: + raise PyMISPError('first_seen ({value}) has to be before last_seen ({self.last_seen})') + super().__setattr__(name, value) + def hash_values(self, algorithm: str='sha512') -> List[str]: """Compute the hash of every values for fast lookups""" if algorithm not in hashlib.algorithms_available: @@ -188,14 +224,14 @@ class MISPAttribute(AbstractMISP): if not hasattr(self, 'comment'): self.comment = '' if not hasattr(self, 'timestamp'): - self.timestamp = datetime.datetime.timestamp(datetime.datetime.now()) + self.timestamp = datetime.timestamp(datetime.now()) def _to_feed(self) -> dict: to_return = super()._to_feed() if self.data: to_return['data'] = base64.b64encode(self.data.getvalue()).decode() - if self.tags: # type: ignore - to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags])) # type: ignore + if self.tags: + to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags])) return to_return @property @@ -298,10 +334,12 @@ class MISPAttribute(AbstractMISP): raise NewAttributeError('The value of the attribute is required.') if self.type == 'datetime' and isinstance(self.value, str): try: - if '.' in self.value: - self.value = datetime.datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S.%f") + if '+' in self.value or '-' in self.value: + self.value = datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S.%f%z") + elif '.' in self.value: + self.value = datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S.%f") else: - self.value = datetime.datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S") + self.value = datetime.strptime(self.value, "%Y-%m-%dT%H:%M:%S") except ValueError: # Slower, but if the other ones fail, that's a good fallback self.value = parse(self.value) @@ -338,10 +376,28 @@ class MISPAttribute(AbstractMISP): self.event_id = int(kwargs.pop('event_id')) if kwargs.get('timestamp'): ts = kwargs.pop('timestamp') - if isinstance(ts, datetime.datetime): + if isinstance(ts, datetime): self.timestamp = ts else: - self.timestamp = datetime.datetime.fromtimestamp(int(ts), datetime.timezone.utc) + self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc) + if kwargs.get('first_seen'): + fs = kwargs.pop('first_seen') + try: + # Faster + self.first_seen = datetime.strptime(fs, "%Y-%m-%dT%H:%M:%S.%f%z") + except: + # Use __setattr__ + self.first_seen = fs + + if kwargs.get('last_seen'): + ls = kwargs.pop('last_seen') + try: + # Faster + self.last_seen = datetime.strptime(kwargs.pop('last_seen'), "%Y-%m-%dT%H:%M:%S.%f%z") + except: + # Use __setattr__ + self.last_seen = ls + if kwargs.get('sharing_group_id'): self.sharing_group_id = int(kwargs.pop('sharing_group_id')) @@ -490,7 +546,7 @@ class MISPObjectReference(AbstractMISP): if not hasattr(self, 'comment'): self.comment = '' if not hasattr(self, 'timestamp'): - self.timestamp = datetime.datetime.timestamp(datetime.datetime.now()) + self.timestamp = datetime.timestamp(datetime.now()) def from_dict(self, **kwargs): if 'ObjectReference' in kwargs: @@ -531,6 +587,8 @@ class MISPObject(AbstractMISP): self._set_template(kwargs.get('misp_objects_path_custom')) self.uuid: str = str(uuid.uuid4()) + self.first_seen: datetime + self.last_seen: datetime self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes] self.ObjectReference: List[MISPObjectReference] = [] self.Attribute: List[MISPAttribute] = [] @@ -579,7 +637,7 @@ class MISPObject(AbstractMISP): if not hasattr(self, 'comment'): self.comment = '' if not hasattr(self, 'timestamp'): - self.timestamp = datetime.datetime.timestamp(datetime.datetime.now()) + self.timestamp = datetime.timestamp(datetime.now()) def _to_feed(self) -> dict: to_return = super(MISPObject, self)._to_feed() @@ -587,6 +645,30 @@ class MISPObject(AbstractMISP): to_return['ObjectReference'] = [reference._to_feed() for reference in self.references] return to_return + def __setattr__(self, name, value): + if name in ['first_seen', 'last_seen']: + if isinstance(value, datetime): + pass + elif isinstance(value, (int, float)): + # Timestamp + value = datetime.fromtimestamp(value) + elif isinstance(value, str): + value = parse(value) + elif isinstance(value, date): + value = datetime.combine(falue, datetime.min.time()) + else: + raise PyMISPError(f'Invalid format for {name}: {type(value)}.') + + if not value.tzinfo: + # set localtimezone if not present + value = value.astimezone() + + if name == 'last_seen' and hasattr(self, 'first_seen') and self.first_seen > value: + raise PyMISPError('last_seen ({value}) has to be after first_seen ({self.first_seen})') + if name == 'first_seen' and hasattr(self, 'last_seen') and self.last_seen < value: + raise PyMISPError('first_seen ({value}) has to be before last_seen ({self.last_seen})') + super().__setattr__(name, value) + def force_misp_objects_path_custom(self, misp_objects_path_custom: Union[Path, str], object_name: Optional[str]=None): if object_name: self.name = object_name @@ -659,10 +741,29 @@ class MISPObject(AbstractMISP): if kwargs.get('timestamp'): ts = kwargs.pop('timestamp') - if isinstance(ts, datetime.datetime): + if isinstance(ts, datetime): self.timestamp = ts else: - self.timestamp = datetime.datetime.fromtimestamp(int(ts), datetime.timezone.utc) + self.timestamp = datetime.fromtimestamp(int(ts), timezone.utc) + + if kwargs.get('first_seen'): + fs = kwargs.pop('first_seen') + try: + # Faster + self.first_seen = datetime.strptime(fs, "%Y-%m-%dT%H:%M:%S.%f%z") + except: + # Use __setattr__ + self.first_seen = fs + + if kwargs.get('last_seen'): + ls = kwargs.pop('last_seen') + try: + # Faster + self.last_seen = datetime.strptime(kwargs.pop('last_seen'), "%Y-%m-%dT%H:%M:%S.%f%z") + except: + # Use __setattr__ + self.last_seen = ls + if kwargs.get('Attribute'): [self.add_attribute(**a) for a in kwargs.pop('Attribute')] if kwargs.get('ObjectReference'): @@ -803,6 +904,7 @@ class MISPEvent(AbstractMISP): # This variable is used in add_attribute in order to avoid duplicating the structure self.describe_types = describe_types + self.date: date self.Attribute: List[MISPAttribute] = [] self.Object: List[MISPObject] = [] self.RelatedEvent: List[MISPEvent] = [] @@ -832,11 +934,11 @@ class MISPEvent(AbstractMISP): if not hasattr(self, 'extends_uuid'): self.extends_uuid = '' if not hasattr(self, 'date'): - self.set_date(datetime.date.today()) + self.set_date(date.today()) if not hasattr(self, 'timestamp'): - self.timestamp = datetime.datetime.timestamp(datetime.datetime.now()) + self.timestamp = datetime.timestamp(datetime.now()) if not hasattr(self, 'publish_timestamp'): - self.publish_timestamp = datetime.datetime.timestamp(datetime.datetime.now()) + self.publish_timestamp = datetime.timestamp(datetime.now()) if not hasattr(self, 'analysis'): # analysis: 0 means initial, 1 ongoing, 2 completed self.analysis = 2 @@ -1006,21 +1108,28 @@ class MISPEvent(AbstractMISP): if validate: jsonschema.validate(json.loads(self.to_json()), self.__json_schema) - def set_date(self, date: Union[str, int, datetime.datetime, datetime.date, None], ignore_invalid: bool=False): - """Set a date for the event (string, datetime, or date object)""" - if isinstance(date, str): - self.date = parse(date).date() - elif isinstance(date, int): - self.date = datetime.datetime.utcfromtimestamp(date).date() - elif isinstance(date, datetime.datetime): - self.date = date.date() - elif isinstance(date, datetime.date): - self.date = date - else: - if ignore_invalid: - self.date = datetime.date.today() + def __setattr__(self, name, value): + if name in ['date']: + if isinstance(value, date): + pass + elif isinstance(value, str): + value = parse(value).date() + elif isinstance(value, (int, float)): + value = date.fromtimestamp(value) + elif isinstance(value, datetime): + value = value.date() else: - raise NewEventError('Invalid format for the date: {} - {}'.format(date, type(date))) + raise NewEventError(f'Invalid format for the date: {type(value)} - {value}') + super().__setattr__(name, value) + + def set_date(self, d: Optional[Union[str, int, float, datetime, date]]=None, ignore_invalid: bool=False): + """Set a date for the event (string, datetime, or date object)""" + if isinstance(d, (str, int, float, datetime, date)): + self.date = d # type: ignore + elif ignore_invalid: + self.date = date.today() + else: + raise NewEventError(f'Invalid format for the date: {type(d)} - {d}') def from_dict(self, **kwargs): if 'Event' in kwargs: @@ -1066,11 +1175,11 @@ class MISPEvent(AbstractMISP): if kwargs.get('org_id'): self.org_id = int(kwargs.pop('org_id')) if kwargs.get('timestamp'): - self.timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('timestamp')), datetime.timezone.utc) + self.timestamp = datetime.fromtimestamp(int(kwargs.pop('timestamp')), timezone.utc) if kwargs.get('publish_timestamp'): - self.publish_timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('publish_timestamp')), datetime.timezone.utc) + self.publish_timestamp = datetime.fromtimestamp(int(kwargs.pop('publish_timestamp')), timezone.utc) if kwargs.get('sighting_timestamp'): - self.sighting_timestamp = datetime.datetime.fromtimestamp(int(kwargs.pop('sighting_timestamp')), datetime.timezone.utc) + self.sighting_timestamp = datetime.fromtimestamp(int(kwargs.pop('sighting_timestamp')), timezone.utc) if kwargs.get('sharing_group_id'): self.sharing_group_id = int(kwargs.pop('sharing_group_id')) if kwargs.get('RelatedEvent'): @@ -1097,7 +1206,7 @@ class MISPEvent(AbstractMISP): to_return = super().to_dict() if to_return.get('date'): - if isinstance(self.date, datetime.datetime): + if isinstance(self.date, datetime): self.date = self.date.date() to_return['date'] = self.date.isoformat() if to_return.get('publish_timestamp'): diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 5346035..69279e0 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -7,7 +7,7 @@ import sys import unittest from pymisp.tools import make_binary_objects -from datetime import datetime, timedelta, date +from datetime import datetime, timedelta, date, timezone from io import BytesIO import json from pathlib import Path @@ -1977,7 +1977,6 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_user(test_roles_user) self.admin_misp_connector.delete_tag(test_tag) - @unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6') def test_expansion(self): first = self.create_simple_event() try: @@ -2046,7 +2045,6 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) - @unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6') def test_communities(self): communities = self.admin_misp_connector.communities(pythonify=True) self.assertEqual(communities[0].name, 'CIRCL Private Sector Information Sharing Community - aka MISPPRIV') @@ -2080,6 +2078,30 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(second) + def test_first_last_seen(self): + event = MISPEvent() + event.info = 'Test First Last seen' + event.add_attribute('ip-dst', '8.8.8.8', first_seen='2020-01-04', last_seen='2020-01-04T12:30:34.323242+8:00') + obj = event.add_object(name='file', first_seen=1580147259.268763, last_seen=1580147300) + attr = obj.add_attribute('filename', 'blah.exe') + attr.first_seen = '2022-01-30' + attr.last_seen = '2022-02-23' + try: + first = self.admin_misp_connector.add_event(event, pythonify=True) + # Simple attribute + self.assertEqual(first.attributes[0].first_seen, datetime(2020, 1, 3, 23, 0, tzinfo=timezone.utc)) + self.assertEqual(first.attributes[0].last_seen, datetime(2020, 1, 4, 4, 30, 34, 323242, tzinfo=timezone.utc)) + + # Object + self.assertEqual(first.objects[0].first_seen, datetime(2020, 1, 27, 17, 47, 39, 268763, tzinfo=timezone.utc)) + self.assertEqual(first.objects[0].last_seen, datetime(2020, 1, 27, 17, 48, 20, tzinfo=timezone.utc)) + + # Object attribute + self.assertEqual(first.objects[0].attributes[0].first_seen, datetime(2022, 1, 29, 23, 0, tzinfo=timezone.utc)) + self.assertEqual(first.objects[0].attributes[0].last_seen, datetime(2022, 2, 22, 23, 0, tzinfo=timezone.utc)) + finally: + self.admin_misp_connector.delete_event(first) + if __name__ == '__main__': unittest.main()