PyMISP/pymisp/abstract.py

402 lines
14 KiB
Python
Raw Normal View History

2020-01-02 15:55:00 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2020-01-30 11:44:13 +01:00
from datetime import date, datetime
2020-01-02 15:55:00 +01:00
from deprecated import deprecated # type: ignore
from json import JSONEncoder
from uuid import UUID
2020-01-02 15:55:00 +01:00
from abc import ABCMeta
try:
2020-01-02 15:55:00 +01:00
from rapidjson import load # type: ignore
from rapidjson import loads # type: ignore
from rapidjson import dumps # type: ignore
HAS_RAPIDJSON = True
except ImportError:
from json import load
from json import loads
from json import dumps
HAS_RAPIDJSON = False
import logging
2018-08-09 18:11:45 +02:00
from enum import Enum
from typing import Union, Optional, Any, Dict, List, Set, Mapping
from .exceptions import PyMISPInvalidFormat, PyMISPError
2018-01-08 11:59:32 +01:00
2019-12-18 14:45:14 +01:00
from collections.abc import MutableMapping
from functools import lru_cache
from pathlib import Path
2019-12-18 14:45:14 +01:00
logger = logging.getLogger('pymisp')
2019-12-18 14:45:14 +01:00
resources_path = Path(__file__).parent / 'data'
misp_objects_path = resources_path / 'misp-objects' / 'objects'
with (resources_path / 'describeTypes.json').open('r') as f:
describe_types = load(f)['result']
2019-12-18 14:45:14 +01:00
class MISPFileCache(object):
# cache up to 150 JSON structures in class attribute
2019-12-18 14:45:14 +01:00
@staticmethod
@lru_cache(maxsize=150)
2020-01-02 15:55:00 +01:00
def _load_json(path: Path) -> Union[dict, None]:
2019-12-18 14:45:14 +01:00
if not path.exists():
return None
with path.open('r', encoding='utf-8') as f:
if HAS_RAPIDJSON:
data = load(f)
else:
data = load(f, encoding='utf-8')
2019-12-18 14:45:14 +01:00
return data
2017-08-28 19:01:53 +02:00
2018-08-09 18:11:45 +02:00
class Distribution(Enum):
your_organisation_only = 0
this_community_only = 1
connected_communities = 2
all_communities = 3
sharing_group = 4
inherit = 5
class ThreatLevel(Enum):
high = 1
medium = 2
low = 3
undefined = 4
class Analysis(Enum):
initial = 0
ongoing = 1
completed = 2
def _int_to_str(d: Dict[str, Any]) -> Dict[str, Any]:
2018-08-28 23:30:07 +02:00
# transform all integer back to string
for k, v in d.items():
if isinstance(v, dict):
d[k] = _int_to_str(v)
elif isinstance(v, int) and not isinstance(v, bool):
2018-08-28 23:30:07 +02:00
d[k] = str(v)
return d
@deprecated(reason=" Use method default=pymisp_json_default instead of cls=MISPEncode", version='2.4.117', action='default')
class MISPEncode(JSONEncoder):
def default(self, obj):
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime, date)):
return obj.isoformat()
2018-08-09 18:11:45 +02:00
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, UUID):
return str(obj)
return JSONEncoder.default(self, obj)
2020-01-02 15:55:00 +01:00
class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
__resources_path = resources_path
__misp_objects_path = misp_objects_path
__describe_types = describe_types
2017-12-12 17:34:09 +01:00
def __init__(self, **kwargs):
2019-12-02 23:32:58 +01:00
"""Abstract class for all the MISP objects.
NOTE: Every method in every classes inheriting this one are doing
changes in memory and do not modify data on a remote MISP instance.
To do so, you need to call the respective add_* or update_*
methods in ExpandedPyMISP/PyMISP.
"""
2019-12-18 14:45:14 +01:00
super().__init__()
2020-01-02 15:55:00 +01:00
self.__edited: bool = True # As we create a new object, we assume it is edited
self.__not_jsonable: List[str] = []
self._fields_for_feed: Set
self.__self_defined_describe_types: Optional[Dict] = None
2020-01-23 10:27:40 +01:00
self.uuid: str
2017-12-12 17:34:09 +01:00
2018-08-10 19:04:02 +02:00
if kwargs.get('force_timestamps') is not None:
# Ignore the edited objects and keep the timestamps.
2020-01-02 15:55:00 +01:00
self.__force_timestamps: bool = True
2018-08-10 19:04:02 +02:00
else:
2020-01-02 15:55:00 +01:00
self.__force_timestamps: bool = False
2018-08-10 19:04:02 +02:00
@property
def describe_types(self) -> Dict:
if self.__self_defined_describe_types:
return self.__self_defined_describe_types
return self.__describe_types
@describe_types.setter
def describe_types(self, describe_types: Dict):
self.__self_defined_describe_types = describe_types
@property
2020-01-02 15:55:00 +01:00
def resources_path(self) -> Path:
return self.__resources_path
@property
2020-01-02 15:55:00 +01:00
def misp_objects_path(self) -> Path:
return self.__misp_objects_path
@misp_objects_path.setter
2020-01-02 15:55:00 +01:00
def misp_objects_path(self, misp_objects_path: Union[str, Path]):
2019-12-18 14:45:14 +01:00
if isinstance(misp_objects_path, str):
2019-10-08 08:15:56 +02:00
misp_objects_path = Path(misp_objects_path)
self.__misp_objects_path = misp_objects_path
2020-01-02 15:55:00 +01:00
def from_dict(self, **kwargs) -> None:
2017-12-22 14:49:14 +01:00
"""Loading all the parameters as class properties, if they aren't `None`.
This method aims to be called when all the properties requiring a special
treatment are processed.
Note: This method is used when you initialize an object with existing data so by default,
the class is flaged as not edited."""
for prop, value in kwargs.items():
if value is None:
continue
setattr(self, prop, value)
# We load an existing dictionary, marking it an not-edited
2017-12-21 18:46:28 +01:00
self.__edited = False
2020-01-02 15:55:00 +01:00
def update_not_jsonable(self, *args) -> None:
2017-12-22 14:49:14 +01:00
"""Add entries to the __not_jsonable list"""
self.__not_jsonable += args
def set_not_jsonable(self, args: List[str]) -> None:
2017-12-22 14:49:14 +01:00
"""Set __not_jsonable to a new list"""
self.__not_jsonable = args
def _remove_from_not_jsonable(self, *args) -> None:
"""Remove the entries that are in the __not_jsonable list"""
for entry in args:
try:
self.__not_jsonable.remove(entry)
except ValueError:
pass
2020-01-02 15:55:00 +01:00
def from_json(self, json_string: str) -> None:
"""Load a JSON string"""
self.from_dict(**loads(json_string))
def to_dict(self) -> Dict:
"""Dump the class to a dictionary.
2017-12-22 14:49:14 +01:00
This method automatically removes the timestamp recursively in every object
that has been edited is order to let MISP update the event accordingly."""
is_edited = self.edited
to_return = {}
for attribute, val in self.items():
if val is None:
continue
elif isinstance(val, list) and len(val) == 0:
continue
elif isinstance(val, str):
val = val.strip()
2017-12-21 18:46:28 +01:00
if attribute == 'timestamp':
if not self.__force_timestamps and is_edited:
2017-12-21 18:46:28 +01:00
# In order to be accepted by MISP, the timestamp of an object
# needs to be either newer, or None.
# If the current object is marked as edited, the easiest is to
# skip the timestamp and let MISP deal with it
continue
else:
val = self._datetime_to_timestamp(val)
if (attribute in ['first_seen', 'last_seen', 'datetime']
and isinstance(val, datetime)
and not val.tzinfo):
# Need to make sure the timezone is set. Otherwise, it will be processed as UTC on the server
val = val.astimezone()
to_return[attribute] = val
2018-08-28 23:30:07 +02:00
to_return = _int_to_str(to_return)
return to_return
def jsonable(self) -> Dict:
2017-12-22 14:49:14 +01:00
"""This method is used by the JSON encoder"""
return self.to_dict()
def _to_feed(self) -> Dict:
2020-01-02 15:55:00 +01:00
if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed:
raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.')
2020-01-02 15:55:00 +01:00
if hasattr(self, '_set_default') and callable(self._set_default): # type: ignore
self._set_default() # type: ignore
2019-11-19 15:53:58 +01:00
to_return = {}
for field in self._fields_for_feed:
if getattr(self, field, None) is not None:
2019-11-20 12:49:42 +01:00
if field in ['timestamp', 'publish_timestamp']:
to_return[field] = self._datetime_to_timestamp(getattr(self, field))
elif isinstance(getattr(self, field), (datetime, date)):
2019-11-20 12:49:42 +01:00
to_return[field] = getattr(self, field).isoformat()
else:
to_return[field] = getattr(self, field)
else:
if field in ['data', 'first_seen', 'last_seen', 'deleted']:
# special fields
continue
raise PyMISPError('The field {} is required in {} when generating a feed.'.format(field, self.__class__.__name__))
to_return = _int_to_str(to_return)
2019-11-19 15:53:58 +01:00
return to_return
2020-07-28 20:05:42 +02:00
def to_json(self, sort_keys: bool = False, indent: Optional[int] = None):
2017-12-22 14:49:14 +01:00
"""Dump recursively any class of type MISPAbstract to a json string"""
return dumps(self, default=pymisp_json_default, sort_keys=sort_keys, indent=indent)
def __getitem__(self, key):
2017-12-12 17:34:09 +01:00
try:
if key[0] != '_' and key not in self.__not_jsonable:
return self.__dict__[key]
raise KeyError
2017-12-12 17:34:09 +01:00
except AttributeError:
# Expected by pop and other dict-related methods
raise KeyError
def __setitem__(self, key, value):
setattr(self, key, value)
def __delitem__(self, key):
delattr(self, key)
def __iter__(self):
'''When we call **self, skip keys:
* starting with _
* in __not_jsonable
* timestamp if the object is edited *unless* it is forced
'''
return iter({k: v for k, v in self.__dict__.items()
if not (k[0] == '_'
or k in self.__not_jsonable
or (not self.__force_timestamps and (k == 'timestamp' and self.__edited)))})
2020-02-24 17:09:42 +01:00
def __len__(self) -> int:
return len([k for k in self.__dict__.keys() if not (k[0] == '_' or k in self.__not_jsonable)])
@property
2020-01-02 15:55:00 +01:00
def edited(self) -> bool:
2017-12-22 14:49:14 +01:00
"""Recursively check if an object has been edited and update the flag accordingly
to the parent objects"""
2017-12-21 18:46:28 +01:00
if self.__edited:
return self.__edited
for p, val in self.items():
if isinstance(val, AbstractMISP) and val.edited:
2017-12-21 18:46:28 +01:00
self.__edited = True
break
elif isinstance(val, list) and all(isinstance(a, AbstractMISP) for a in val):
if any(a.edited for a in val):
2017-12-21 18:46:28 +01:00
self.__edited = True
break
return self.__edited
@edited.setter
2020-02-24 17:09:42 +01:00
def edited(self, val: bool):
2017-12-22 14:49:14 +01:00
"""Set the edit flag"""
if isinstance(val, bool):
self.__edited = val
else:
raise PyMISPError('edited can only be True or False')
def __setattr__(self, name: str, value: Any):
2020-02-24 17:09:42 +01:00
if name[0] != '_' and not self.__edited and name in self:
# The private members don't matter
# If we already have a key with that name, we're modifying it.
self.__edited = True
2019-12-18 14:45:14 +01:00
super().__setattr__(name, value)
2017-12-21 18:46:28 +01:00
def _datetime_to_timestamp(self, d: Union[int, float, str, datetime]) -> int:
"""Convert a datetime object to a timestamp (int)"""
2019-12-18 14:45:14 +01:00
if isinstance(d, (int, float, str)):
2017-12-21 18:46:28 +01:00
# Assume we already have a timestamp
2018-08-28 23:30:07 +02:00
return int(d)
2019-12-18 14:45:14 +01:00
return int(d.timestamp())
2018-01-08 11:59:32 +01:00
2020-07-28 20:05:42 +02:00
def _add_tag(self, tag: Optional[Union[str, 'MISPTag', Mapping]] = None, **kwargs):
2018-01-08 11:59:32 +01:00
"""Add a tag to the attribute (by name or a MISPTag object)"""
if isinstance(tag, str):
misp_tag = MISPTag()
misp_tag.from_dict(name=tag)
elif isinstance(tag, MISPTag):
misp_tag = tag
elif isinstance(tag, dict):
misp_tag = MISPTag()
misp_tag.from_dict(**tag)
elif kwargs:
misp_tag = MISPTag()
misp_tag.from_dict(**kwargs)
else:
2020-01-02 15:55:00 +01:00
raise PyMISPInvalidFormat(f"The tag is in an invalid format (can be either string, MISPTag, or an expanded dict): {tag}")
if misp_tag not in self.tags: # type: ignore
self.Tag.append(misp_tag)
self.edited = True
2020-01-02 15:55:00 +01:00
return misp_tag
2018-01-08 11:59:32 +01:00
def _set_tags(self, tags: List['MISPTag']):
2018-01-08 11:59:32 +01:00
"""Set a list of prepared MISPTag."""
if all(isinstance(x, MISPTag) for x in tags):
self.Tag = tags
else:
raise PyMISPInvalidFormat('All the attributes have to be of type MISPTag.')
2020-02-24 17:09:42 +01:00
def __eq__(self, other) -> bool:
if isinstance(other, AbstractMISP):
return self.to_dict() == other.to_dict()
elif isinstance(other, dict):
return self.to_dict() == other
else:
return False
2020-02-24 17:09:42 +01:00
def __repr__(self) -> str:
2020-03-10 10:27:52 +01:00
return '<{self.__class__.__name__} - please define me>'.format(self=self)
2019-07-12 17:35:02 +02:00
2018-01-08 11:59:32 +01:00
class MISPTag(AbstractMISP):
2019-11-19 15:53:58 +01:00
2020-01-02 15:55:00 +01:00
_fields_for_feed: set = {'name', 'colour'}
2019-11-19 15:53:58 +01:00
2020-01-23 10:27:40 +01:00
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name: str
self.exportable: bool
2020-01-23 10:27:40 +01:00
2019-07-12 17:35:02 +02:00
def from_dict(self, **kwargs):
if kwargs.get('Tag'):
kwargs = kwargs.get('Tag')
2019-12-18 14:45:14 +01:00
super().from_dict(**kwargs)
2019-11-20 12:49:42 +01:00
def _set_default(self):
if not hasattr(self, 'colour'):
self.colour = '#ffffff'
def _to_feed(self) -> Dict:
2019-11-20 12:49:42 +01:00
if hasattr(self, 'exportable') and not self.exportable:
return {}
2019-12-18 14:45:14 +01:00
return super()._to_feed()
2020-01-02 15:55:00 +01:00
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})>'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)>'.format(self=self)
2020-01-02 15:55:00 +01:00
if HAS_RAPIDJSON:
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[Dict, str]:
2020-01-02 15:55:00 +01:00
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime, date)):
2020-01-02 15:55:00 +01:00
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, UUID):
return str(obj)
else:
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[Dict, str]:
2020-01-02 15:55:00 +01:00
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, (datetime, date)):
2020-01-02 15:55:00 +01:00
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, UUID):
return str(obj)