mirror of https://github.com/MISP/PyMISP
new: Self registration, object level search (initial)
parent
e020bac5f6
commit
4a060b3c07
|
@ -24,14 +24,14 @@ Response (if any):
|
||||||
try:
|
try:
|
||||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
|
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
|
||||||
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting # noqa
|
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox # noqa
|
||||||
from .tools import AbstractMISPObjectGenerator # noqa
|
from .tools import AbstractMISPObjectGenerator # noqa
|
||||||
from .tools import Neo4j # noqa
|
from .tools import Neo4j # noqa
|
||||||
from .tools import stix # noqa
|
from .tools import stix # noqa
|
||||||
from .tools import openioc # noqa
|
from .tools import openioc # noqa
|
||||||
from .tools import ext_lookups # noqa
|
from .tools import ext_lookups # noqa
|
||||||
|
|
||||||
from .api import PyMISP # noqa
|
from .api import PyMISP, register_user # noqa
|
||||||
from .api import PyMISP as ExpandedPyMISP # noqa
|
from .api import PyMISP as ExpandedPyMISP # noqa
|
||||||
from .tools import load_warninglists # noqa
|
from .tools import load_warninglists # noqa
|
||||||
# Let's not bother with old python
|
# Let's not bother with old python
|
||||||
|
|
|
@ -21,7 +21,7 @@ except ImportError:
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Union, Optional
|
from typing import Union, Optional, Any, Dict, Iterable, List, Set
|
||||||
|
|
||||||
from .exceptions import PyMISPInvalidFormat, PyMISPError
|
from .exceptions import PyMISPInvalidFormat, PyMISPError
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ class Analysis(Enum):
|
||||||
completed = 2
|
completed = 2
|
||||||
|
|
||||||
|
|
||||||
def _int_to_str(d: dict) -> dict:
|
def _int_to_str(d: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
# transform all integer back to string
|
# transform all integer back to string
|
||||||
for k, v in d.items():
|
for k, v in d.items():
|
||||||
if isinstance(v, dict):
|
if isinstance(v, dict):
|
||||||
|
@ -114,9 +114,9 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
"""
|
"""
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.__edited: bool = True # As we create a new object, we assume it is edited
|
self.__edited: bool = True # As we create a new object, we assume it is edited
|
||||||
self.__not_jsonable: list = []
|
self.__not_jsonable: List[str] = []
|
||||||
self._fields_for_feed: set
|
self._fields_for_feed: Set
|
||||||
self.__self_defined_describe_types: Union[dict, None] = None
|
self.__self_defined_describe_types: Optional[Dict] = None
|
||||||
self.uuid: str
|
self.uuid: str
|
||||||
|
|
||||||
if kwargs.get('force_timestamps') is not None:
|
if kwargs.get('force_timestamps') is not None:
|
||||||
|
@ -126,13 +126,13 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
self.__force_timestamps: bool = False
|
self.__force_timestamps: bool = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def describe_types(self) -> dict:
|
def describe_types(self) -> Dict:
|
||||||
if self.__self_defined_describe_types:
|
if self.__self_defined_describe_types:
|
||||||
return self.__self_defined_describe_types
|
return self.__self_defined_describe_types
|
||||||
return self.__describe_types
|
return self.__describe_types
|
||||||
|
|
||||||
@describe_types.setter
|
@describe_types.setter
|
||||||
def describe_types(self, describe_types: dict):
|
def describe_types(self, describe_types: Dict):
|
||||||
self.__self_defined_describe_types = describe_types
|
self.__self_defined_describe_types = describe_types
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -166,7 +166,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
"""Add entries to the __not_jsonable list"""
|
"""Add entries to the __not_jsonable list"""
|
||||||
self.__not_jsonable += args
|
self.__not_jsonable += args
|
||||||
|
|
||||||
def set_not_jsonable(self, args: list) -> None:
|
def set_not_jsonable(self, args: List[str]) -> None:
|
||||||
"""Set __not_jsonable to a new list"""
|
"""Set __not_jsonable to a new list"""
|
||||||
self.__not_jsonable = args
|
self.__not_jsonable = args
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
"""Load a JSON string"""
|
"""Load a JSON string"""
|
||||||
self.from_dict(**loads(json_string))
|
self.from_dict(**loads(json_string))
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> Dict:
|
||||||
"""Dump the class to a dictionary.
|
"""Dump the class to a dictionary.
|
||||||
This method automatically removes the timestamp recursively in every object
|
This method automatically removes the timestamp recursively in every object
|
||||||
that has been edited is order to let MISP update the event accordingly."""
|
that has been edited is order to let MISP update the event accordingly."""
|
||||||
|
@ -206,11 +206,11 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
to_return = _int_to_str(to_return)
|
to_return = _int_to_str(to_return)
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
def jsonable(self) -> dict:
|
def jsonable(self) -> Dict:
|
||||||
"""This method is used by the JSON encoder"""
|
"""This method is used by the JSON encoder"""
|
||||||
return self.to_dict()
|
return self.to_dict()
|
||||||
|
|
||||||
def _to_feed(self) -> dict:
|
def _to_feed(self) -> Dict:
|
||||||
if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed:
|
if not hasattr(self, '_fields_for_feed') or not self._fields_for_feed:
|
||||||
raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.')
|
raise PyMISPError('Unable to export in the feed format, _fields_for_feed is missing.')
|
||||||
if hasattr(self, '_set_default') and callable(self._set_default): # type: ignore
|
if hasattr(self, '_set_default') and callable(self._set_default): # type: ignore
|
||||||
|
@ -281,7 +281,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
else:
|
else:
|
||||||
raise PyMISPError('edited can only be True or False')
|
raise PyMISPError('edited can only be True or False')
|
||||||
|
|
||||||
def __setattr__(self, name: str, value):
|
def __setattr__(self, name: str, value: Any):
|
||||||
if name[0] != '_' and not self.__edited and name in self:
|
if name[0] != '_' and not self.__edited and name in self:
|
||||||
# The private members don't matter
|
# The private members don't matter
|
||||||
# If we already have a key with that name, we're modifying it.
|
# If we already have a key with that name, we're modifying it.
|
||||||
|
@ -315,7 +315,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
|
||||||
self.edited = True
|
self.edited = True
|
||||||
return misp_tag
|
return misp_tag
|
||||||
|
|
||||||
def _set_tags(self, tags):
|
def _set_tags(self, tags: Iterable['MISPTag']):
|
||||||
"""Set a list of prepared MISPTag."""
|
"""Set a list of prepared MISPTag."""
|
||||||
if all(isinstance(x, MISPTag) for x in tags):
|
if all(isinstance(x, MISPTag) for x in tags):
|
||||||
self.Tag = tags
|
self.Tag = tags
|
||||||
|
@ -341,6 +341,7 @@ class MISPTag(AbstractMISP):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.name: str
|
self.name: str
|
||||||
|
self.exportable: bool
|
||||||
|
|
||||||
def from_dict(self, **kwargs):
|
def from_dict(self, **kwargs):
|
||||||
if kwargs.get('Tag'):
|
if kwargs.get('Tag'):
|
||||||
|
@ -351,14 +352,14 @@ class MISPTag(AbstractMISP):
|
||||||
if not hasattr(self, 'colour'):
|
if not hasattr(self, 'colour'):
|
||||||
self.colour = '#ffffff'
|
self.colour = '#ffffff'
|
||||||
|
|
||||||
def _to_feed(self):
|
def _to_feed(self) -> Dict:
|
||||||
if hasattr(self, 'exportable') and not self.exportable:
|
if hasattr(self, 'exportable') and not self.exportable:
|
||||||
return False
|
return {}
|
||||||
return super()._to_feed()
|
return super()._to_feed()
|
||||||
|
|
||||||
|
|
||||||
if HAS_RAPIDJSON:
|
if HAS_RAPIDJSON:
|
||||||
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[dict, str]:
|
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[Dict, str]:
|
||||||
if isinstance(obj, AbstractMISP):
|
if isinstance(obj, AbstractMISP):
|
||||||
return obj.jsonable()
|
return obj.jsonable()
|
||||||
elif isinstance(obj, (datetime, date)):
|
elif isinstance(obj, (datetime, date)):
|
||||||
|
@ -368,7 +369,7 @@ if HAS_RAPIDJSON:
|
||||||
elif isinstance(obj, UUID):
|
elif isinstance(obj, UUID):
|
||||||
return str(obj)
|
return str(obj)
|
||||||
else:
|
else:
|
||||||
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[dict, str]:
|
def pymisp_json_default(obj: Union[AbstractMISP, datetime, date, Enum, UUID]) -> Union[Dict, str]:
|
||||||
if isinstance(obj, AbstractMISP):
|
if isinstance(obj, AbstractMISP):
|
||||||
return obj.jsonable()
|
return obj.jsonable()
|
||||||
elif isinstance(obj, (datetime, date)):
|
elif isinstance(obj, (datetime, date)):
|
||||||
|
|
652
pymisp/api.py
652
pymisp/api.py
File diff suppressed because it is too large
Load Diff
|
@ -12,7 +12,7 @@ from collections import defaultdict
|
||||||
import logging
|
import logging
|
||||||
import hashlib
|
import hashlib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Optional, Union, IO
|
from typing import List, Optional, Union, IO, Dict, Any
|
||||||
|
|
||||||
from .abstract import AbstractMISP, MISPTag
|
from .abstract import AbstractMISP, MISPTag
|
||||||
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
|
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
|
||||||
|
@ -82,7 +82,7 @@ def _make_datetime(value) -> datetime:
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
def make_bool(value: Union[bool, int, str, dict, list, None]) -> bool:
|
def make_bool(value: Optional[Union[bool, int, str, dict, list]]) -> bool:
|
||||||
if isinstance(value, bool):
|
if isinstance(value, bool):
|
||||||
return value
|
return value
|
||||||
if isinstance(value, int):
|
if isinstance(value, int):
|
||||||
|
@ -102,6 +102,10 @@ class MISPOrganisation(AbstractMISP):
|
||||||
|
|
||||||
_fields_for_feed: set = {'name', 'uuid'}
|
_fields_for_feed: set = {'name', 'uuid'}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.id: int
|
||||||
|
|
||||||
def from_dict(self, **kwargs):
|
def from_dict(self, **kwargs):
|
||||||
if 'Organisation' in kwargs:
|
if 'Organisation' in kwargs:
|
||||||
kwargs = kwargs['Organisation']
|
kwargs = kwargs['Organisation']
|
||||||
|
@ -169,17 +173,17 @@ class MISPAttribute(AbstractMISP):
|
||||||
_fields_for_feed: set = {'uuid', 'value', 'category', 'type', 'comment', 'data',
|
_fields_for_feed: set = {'uuid', 'value', 'category', 'type', 'comment', 'data',
|
||||||
'timestamp', 'to_ids', 'disable_correlation', 'first_seen', 'last_seen'}
|
'timestamp', 'to_ids', 'disable_correlation', 'first_seen', 'last_seen'}
|
||||||
|
|
||||||
def __init__(self, describe_types: Optional[dict]=None, strict: bool=False):
|
def __init__(self, describe_types: Optional[Dict]=None, strict: bool=False):
|
||||||
"""Represents an Attribute
|
"""Represents an Attribute
|
||||||
:describe_type: Use it is you want to overwrite the defualt describeTypes.json file (you don't)
|
:describe_type: Use it is you want to overwrite the defualt describeTypes.json file (you don't)
|
||||||
:strict: If false, fallback to sane defaults for the attribute type if the ones passed by the user are incorrect
|
:strict: If false, fallback to sane defaults for the attribute type if the ones passed by the user are incorrect
|
||||||
"""
|
"""
|
||||||
super().__init__()
|
super().__init__()
|
||||||
if describe_types:
|
if describe_types:
|
||||||
self.describe_types: dict = describe_types
|
self.describe_types: Dict[str, Any] = describe_types
|
||||||
self.__categories: List[str] = self.describe_types['categories']
|
self.__categories: List[str] = self.describe_types['categories']
|
||||||
self.__category_type_mapping: dict = self.describe_types['category_type_mappings']
|
self.__category_type_mapping: Dict[str, List[str]] = self.describe_types['category_type_mappings']
|
||||||
self.__sane_default: dict = self.describe_types['sane_defaults']
|
self.__sane_default: Dict[str, Dict[str, Union[str, int]]] = self.describe_types['sane_defaults']
|
||||||
self.__strict: bool = strict
|
self.__strict: bool = strict
|
||||||
self.data: Optional[BytesIO] = None
|
self.data: Optional[BytesIO] = None
|
||||||
self.first_seen: datetime
|
self.first_seen: datetime
|
||||||
|
@ -194,7 +198,7 @@ class MISPAttribute(AbstractMISP):
|
||||||
self.Event: MISPEvent
|
self.Event: MISPEvent
|
||||||
self.RelatedAttribute: List[MISPAttribute]
|
self.RelatedAttribute: List[MISPAttribute]
|
||||||
|
|
||||||
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]]=None, **kwargs) -> MISPTag:
|
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]]=None, **kwargs) -> MISPTag:
|
||||||
return super()._add_tag(tag, **kwargs)
|
return super()._add_tag(tag, **kwargs)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -207,7 +211,7 @@ class MISPAttribute(AbstractMISP):
|
||||||
"""Set a list of prepared MISPTag."""
|
"""Set a list of prepared MISPTag."""
|
||||||
super()._set_tags(tags)
|
super()._set_tags(tags)
|
||||||
|
|
||||||
def _prepare_data(self, data: Union[Path, str, bytes, BytesIO, None]):
|
def _prepare_data(self, data: Optional[Union[Path, str, bytes, BytesIO]]):
|
||||||
if not data:
|
if not data:
|
||||||
super().__setattr__('data', None)
|
super().__setattr__('data', None)
|
||||||
return
|
return
|
||||||
|
@ -239,15 +243,15 @@ class MISPAttribute(AbstractMISP):
|
||||||
# not a encrypted zip file, assuming it is a new malware sample
|
# not a encrypted zip file, assuming it is a new malware sample
|
||||||
self._prepare_new_malware_sample()
|
self._prepare_new_malware_sample()
|
||||||
|
|
||||||
def __setattr__(self, name, value):
|
def __setattr__(self, name: str, value: Any):
|
||||||
if name in ['first_seen', 'last_seen']:
|
if name in ['first_seen', 'last_seen']:
|
||||||
value = _make_datetime(value)
|
_datetime = _make_datetime(value)
|
||||||
|
|
||||||
if name == 'last_seen' and hasattr(self, 'first_seen') and self.first_seen > value:
|
if name == 'last_seen' and hasattr(self, 'first_seen') and self.first_seen > _datetime:
|
||||||
raise PyMISPError('last_seen ({value}) has to be after first_seen ({self.first_seen})')
|
raise PyMISPError('last_seen ({value}) has to be after first_seen ({self.first_seen})')
|
||||||
if name == 'first_seen' and hasattr(self, 'last_seen') and self.last_seen < value:
|
if name == 'first_seen' and hasattr(self, 'last_seen') and self.last_seen < _datetime:
|
||||||
raise PyMISPError('first_seen ({value}) has to be before last_seen ({self.last_seen})')
|
raise PyMISPError('first_seen ({value}) has to be before last_seen ({self.last_seen})')
|
||||||
super().__setattr__(name, value)
|
super().__setattr__(name, _datetime)
|
||||||
elif name == 'data':
|
elif name == 'data':
|
||||||
self._prepare_data(value)
|
self._prepare_data(value)
|
||||||
else:
|
else:
|
||||||
|
@ -278,7 +282,7 @@ class MISPAttribute(AbstractMISP):
|
||||||
if not hasattr(self, 'timestamp'):
|
if not hasattr(self, 'timestamp'):
|
||||||
self.timestamp = datetime.timestamp(datetime.now())
|
self.timestamp = datetime.timestamp(datetime.now())
|
||||||
|
|
||||||
def _to_feed(self) -> dict:
|
def _to_feed(self) -> Dict:
|
||||||
to_return = super()._to_feed()
|
to_return = super()._to_feed()
|
||||||
if self.data:
|
if self.data:
|
||||||
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
||||||
|
@ -292,7 +296,7 @@ class MISPAttribute(AbstractMISP):
|
||||||
return self.describe_types['types']
|
return self.describe_types['types']
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def malware_binary(self) -> Union[BytesIO, None]:
|
def malware_binary(self) -> Optional[BytesIO]:
|
||||||
"""Returns a BytesIO of the malware (if the attribute has one, obvs)."""
|
"""Returns a BytesIO of the malware (if the attribute has one, obvs)."""
|
||||||
if hasattr(self, '_malware_binary'):
|
if hasattr(self, '_malware_binary'):
|
||||||
return self._malware_binary
|
return self._malware_binary
|
||||||
|
@ -330,7 +334,7 @@ class MISPAttribute(AbstractMISP):
|
||||||
"""Alias for add_shadow_attribute"""
|
"""Alias for add_shadow_attribute"""
|
||||||
return self.add_shadow_attribute(shadow_attribute, **kwargs)
|
return self.add_shadow_attribute(shadow_attribute, **kwargs)
|
||||||
|
|
||||||
def add_shadow_attribute(self, shadow_attribute: Union[MISPShadowAttribute, dict, None]=None, **kwargs) -> MISPShadowAttribute:
|
def add_shadow_attribute(self, shadow_attribute: Optional[Union[MISPShadowAttribute, Dict]]=None, **kwargs) -> MISPShadowAttribute:
|
||||||
"""Add a shadow attribute to the attribute (by name or a MISPShadowAttribute object)"""
|
"""Add a shadow attribute to the attribute (by name or a MISPShadowAttribute object)"""
|
||||||
if isinstance(shadow_attribute, MISPShadowAttribute):
|
if isinstance(shadow_attribute, MISPShadowAttribute):
|
||||||
misp_shadow_attribute = shadow_attribute
|
misp_shadow_attribute = shadow_attribute
|
||||||
|
@ -346,7 +350,7 @@ class MISPAttribute(AbstractMISP):
|
||||||
self.edited = True
|
self.edited = True
|
||||||
return misp_shadow_attribute
|
return misp_shadow_attribute
|
||||||
|
|
||||||
def add_sighting(self, sighting: Union[MISPSighting, dict, None]=None, **kwargs) -> MISPSighting:
|
def add_sighting(self, sighting: Optional[Union[MISPSighting, dict]]=None, **kwargs) -> MISPSighting:
|
||||||
"""Add a sighting to the attribute (by name or a MISPSighting object)"""
|
"""Add a sighting to the attribute (by name or a MISPSighting object)"""
|
||||||
if isinstance(sighting, MISPSighting):
|
if isinstance(sighting, MISPSighting):
|
||||||
misp_sighting = sighting
|
misp_sighting = sighting
|
||||||
|
@ -488,7 +492,7 @@ class MISPAttribute(AbstractMISP):
|
||||||
|
|
||||||
super().from_dict(**kwargs)
|
super().from_dict(**kwargs)
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> Dict:
|
||||||
to_return = super().to_dict()
|
to_return = super().to_dict()
|
||||||
if self.data:
|
if self.data:
|
||||||
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
||||||
|
@ -623,7 +627,7 @@ class MISPObject(AbstractMISP):
|
||||||
self.last_seen: datetime
|
self.last_seen: datetime
|
||||||
self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes]
|
self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes]
|
||||||
self.ObjectReference: List[MISPObjectReference] = []
|
self.ObjectReference: List[MISPObjectReference] = []
|
||||||
self.Attribute: List[MISPAttribute] = []
|
self.Attribute: List[MISPObjectAttribute] = []
|
||||||
self.SharingGroup: MISPSharingGroup
|
self.SharingGroup: MISPSharingGroup
|
||||||
self._default_attributes_parameters: dict
|
self._default_attributes_parameters: dict
|
||||||
if isinstance(default_attributes_parameters, MISPAttribute):
|
if isinstance(default_attributes_parameters, MISPAttribute):
|
||||||
|
@ -645,8 +649,8 @@ class MISPObject(AbstractMISP):
|
||||||
self._default_attributes_parameters.pop('data', None) # in case the original in a sample or an attachment
|
self._default_attributes_parameters.pop('data', None) # in case the original in a sample or an attachment
|
||||||
|
|
||||||
# Those values are set for the current object, if they exist, but not pop'd because they are still useful for the attributes
|
# Those values are set for the current object, if they exist, but not pop'd because they are still useful for the attributes
|
||||||
self.distribution = self._default_attributes_parameters.get('distribution', 5)
|
self.distribution: int = self._default_attributes_parameters.get('distribution', 5)
|
||||||
self.sharing_group_id = self._default_attributes_parameters.get('sharing_group_id', 0)
|
self.sharing_group_id: int = self._default_attributes_parameters.get('sharing_group_id', 0)
|
||||||
else:
|
else:
|
||||||
self.distribution = 5 # Default to inherit
|
self.distribution = 5 # Default to inherit
|
||||||
self.sharing_group_id = 0
|
self.sharing_group_id = 0
|
||||||
|
@ -656,7 +660,7 @@ class MISPObject(AbstractMISP):
|
||||||
self.update_not_jsonable('ObjectReference')
|
self.update_not_jsonable('ObjectReference')
|
||||||
|
|
||||||
def _load_template_path(self, template_path: Union[Path, str]) -> bool:
|
def _load_template_path(self, template_path: Union[Path, str]) -> bool:
|
||||||
self._definition: Union[dict, None] = self._load_json(template_path)
|
self._definition: Optional[Dict] = self._load_json(template_path)
|
||||||
if not self._definition:
|
if not self._definition:
|
||||||
return False
|
return False
|
||||||
setattr(self, 'meta-category', self._definition['meta-category'])
|
setattr(self, 'meta-category', self._definition['meta-category'])
|
||||||
|
@ -671,7 +675,7 @@ class MISPObject(AbstractMISP):
|
||||||
if not hasattr(self, 'timestamp'):
|
if not hasattr(self, 'timestamp'):
|
||||||
self.timestamp = datetime.timestamp(datetime.now())
|
self.timestamp = datetime.timestamp(datetime.now())
|
||||||
|
|
||||||
def _to_feed(self) -> dict:
|
def _to_feed(self) -> Dict:
|
||||||
to_return = super(MISPObject, self)._to_feed()
|
to_return = super(MISPObject, self)._to_feed()
|
||||||
if self.references:
|
if self.references:
|
||||||
to_return['ObjectReference'] = [reference._to_feed() for reference in self.references]
|
to_return['ObjectReference'] = [reference._to_feed() for reference in self.references]
|
||||||
|
@ -714,11 +718,11 @@ class MISPObject(AbstractMISP):
|
||||||
self._strict = False
|
self._strict = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def attributes(self) -> List[MISPAttribute]:
|
def attributes(self) -> List['MISPObjectAttribute']:
|
||||||
return self.Attribute
|
return self.Attribute
|
||||||
|
|
||||||
@attributes.setter
|
@attributes.setter
|
||||||
def attributes(self, attributes: List[MISPAttribute]):
|
def attributes(self, attributes: List['MISPObjectAttribute']):
|
||||||
if all(isinstance(x, MISPObjectAttribute) for x in attributes):
|
if all(isinstance(x, MISPObjectAttribute) for x in attributes):
|
||||||
self.Attribute = attributes
|
self.Attribute = attributes
|
||||||
self.__fast_attribute_access = defaultdict(list)
|
self.__fast_attribute_access = defaultdict(list)
|
||||||
|
@ -826,17 +830,17 @@ class MISPObject(AbstractMISP):
|
||||||
return self._fast_attribute_access.get(object_relation, [])
|
return self._fast_attribute_access.get(object_relation, [])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _fast_attribute_access(self):
|
def _fast_attribute_access(self) -> Dict:
|
||||||
if not self.__fast_attribute_access:
|
if not self.__fast_attribute_access:
|
||||||
for a in self.attributes:
|
for a in self.attributes:
|
||||||
self.__fast_attribute_access[a.object_relation].append(a)
|
self.__fast_attribute_access[a.object_relation].append(a)
|
||||||
return self.__fast_attribute_access
|
return self.__fast_attribute_access
|
||||||
|
|
||||||
def has_attributes_by_relation(self, list_of_relations: List[str]):
|
def has_attributes_by_relation(self, list_of_relations: List[str]) -> bool:
|
||||||
'''True if all the relations in the list are defined in the object'''
|
'''True if all the relations in the list are defined in the object'''
|
||||||
return all(relation in self._fast_attribute_access for relation in list_of_relations)
|
return all(relation in self._fast_attribute_access for relation in list_of_relations)
|
||||||
|
|
||||||
def add_attribute(self, object_relation: str, simple_value: Union[str, int, float]=None, **value) -> Union[MISPAttribute, None]:
|
def add_attribute(self, object_relation: str, simple_value: Optional[Union[str, int, float]]=None, **value) -> Optional[MISPAttribute]:
|
||||||
"""Add an attribute. object_relation is required and the value key is a
|
"""Add an attribute. object_relation is required and the value key is a
|
||||||
dictionary with all the keys supported by MISPAttribute"""
|
dictionary with all the keys supported by MISPAttribute"""
|
||||||
if simple_value is not None: # /!\ The value *can* be 0
|
if simple_value is not None: # /!\ The value *can* be 0
|
||||||
|
@ -876,7 +880,7 @@ class MISPObject(AbstractMISP):
|
||||||
to_return.append(a)
|
to_return.append(a)
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
def to_dict(self, strict: bool=False) -> dict:
|
def to_dict(self, strict: bool=False) -> Dict:
|
||||||
if strict or self._strict and self._known_template:
|
if strict or self._strict and self._known_template:
|
||||||
self._validate()
|
self._validate()
|
||||||
return super(MISPObject, self).to_dict()
|
return super(MISPObject, self).to_dict()
|
||||||
|
@ -886,10 +890,12 @@ class MISPObject(AbstractMISP):
|
||||||
self._validate()
|
self._validate()
|
||||||
return super(MISPObject, self).to_json(sort_keys=sort_keys, indent=indent)
|
return super(MISPObject, self).to_json(sort_keys=sort_keys, indent=indent)
|
||||||
|
|
||||||
def _validate(self):
|
def _validate(self) -> bool:
|
||||||
|
if not self._definition:
|
||||||
|
raise PyMISPError('No object definition available, unable to validate.')
|
||||||
"""Make sure the object we're creating has the required fields"""
|
"""Make sure the object we're creating has the required fields"""
|
||||||
if self._definition.get('required'):
|
if self._definition.get('required'):
|
||||||
required_missing = set(self._definition.get('required')) - set(self._fast_attribute_access.keys())
|
required_missing = set(self._definition['required']) - set(self._fast_attribute_access.keys())
|
||||||
if required_missing:
|
if required_missing:
|
||||||
raise InvalidMISPObject('{} are required.'.format(required_missing))
|
raise InvalidMISPObject('{} are required.'.format(required_missing))
|
||||||
if self._definition.get('requiredOneOf'):
|
if self._definition.get('requiredOneOf'):
|
||||||
|
@ -916,7 +922,7 @@ class MISPEvent(AbstractMISP):
|
||||||
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
||||||
'publish_timestamp', 'published', 'date', 'extends_uuid'}
|
'publish_timestamp', 'published', 'date', 'extends_uuid'}
|
||||||
|
|
||||||
def __init__(self, describe_types: dict=None, strict_validation: bool=False, **kwargs):
|
def __init__(self, describe_types: Optional[Dict]=None, strict_validation: bool=False, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
if strict_validation:
|
if strict_validation:
|
||||||
schema_file = 'schema.json'
|
schema_file = 'schema.json'
|
||||||
|
@ -971,7 +977,7 @@ class MISPEvent(AbstractMISP):
|
||||||
self.threat_level_id = 4
|
self.threat_level_id = 4
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def manifest(self) -> dict:
|
def manifest(self) -> Dict:
|
||||||
required = ['info', 'Orgc']
|
required = ['info', 'Orgc']
|
||||||
for r in required:
|
for r in required:
|
||||||
if not hasattr(self, r):
|
if not hasattr(self, r):
|
||||||
|
@ -1000,7 +1006,7 @@ class MISPEvent(AbstractMISP):
|
||||||
to_return += attribute.hash_values(algorithm)
|
to_return += attribute.hash_values(algorithm)
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
def to_feed(self, valid_distributions: List[int]=[0, 1, 2, 3, 4, 5], with_meta: bool=False) -> dict:
|
def to_feed(self, valid_distributions: List[int]=[0, 1, 2, 3, 4, 5], with_meta: bool=False) -> Dict:
|
||||||
""" Generate a json output for MISP Feed.
|
""" Generate a json output for MISP Feed.
|
||||||
Notes:
|
Notes:
|
||||||
* valid_distributions only makes sense if the distribution key is set (i.e. the event is exported from a MISP instance)
|
* valid_distributions only makes sense if the distribution key is set (i.e. the event is exported from a MISP instance)
|
||||||
|
@ -1090,7 +1096,7 @@ class MISPEvent(AbstractMISP):
|
||||||
raise PyMISPError('All the attributes have to be of type MISPShadowAttribute.')
|
raise PyMISPError('All the attributes have to be of type MISPShadowAttribute.')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def related_events(self): # -> List[MISPEvent]:
|
def related_events(self) -> List['MISPEvent']:
|
||||||
return self.RelatedEvent
|
return self.RelatedEvent
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -1233,7 +1239,7 @@ class MISPEvent(AbstractMISP):
|
||||||
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
||||||
super(MISPEvent, self).from_dict(**kwargs)
|
super(MISPEvent, self).from_dict(**kwargs)
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> Dict:
|
||||||
to_return = super().to_dict()
|
to_return = super().to_dict()
|
||||||
|
|
||||||
if to_return.get('date'):
|
if to_return.get('date'):
|
||||||
|
@ -1652,3 +1658,18 @@ class MISPUserSetting(AbstractMISP):
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f'<{self.__class__.__name__}(name={self.setting}'
|
return f'<{self.__class__.__name__}(name={self.setting}'
|
||||||
|
|
||||||
|
|
||||||
|
class MISPInbox(AbstractMISP):
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.data: Dict
|
||||||
|
|
||||||
|
def from_dict(self, **kwargs):
|
||||||
|
if 'Inbox' in kwargs:
|
||||||
|
kwargs = kwargs['Inbox']
|
||||||
|
super().from_dict(**kwargs)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'<{self.__class__.__name__}(name={self.type})>'
|
||||||
|
|
|
@ -26,7 +26,7 @@ logger = logging.getLogger('pymisp')
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting
|
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting
|
||||||
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
|
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
|
||||||
from pymisp.exceptions import MISPServerError
|
from pymisp.exceptions import MISPServerError
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -57,7 +57,8 @@ class TestComprehensive(unittest.TestCase):
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
cls.maxDiff = None
|
cls.maxDiff = None
|
||||||
# Connect as admin
|
# Connect as admin
|
||||||
cls.admin_misp_connector = ExpandedPyMISP(url, key, verifycert, debug=False)
|
cls.admin_misp_connector = PyMISP(url, key, verifycert, debug=False)
|
||||||
|
cls.admin_misp_connector.set_server_setting('Security.allow_self_registration', True, force=True)
|
||||||
if not fast_mode:
|
if not fast_mode:
|
||||||
r = cls.admin_misp_connector.update_misp()
|
r = cls.admin_misp_connector.update_misp()
|
||||||
print(r)
|
print(r)
|
||||||
|
@ -76,7 +77,7 @@ class TestComprehensive(unittest.TestCase):
|
||||||
user.email = 'testusr@user.local'
|
user.email = 'testusr@user.local'
|
||||||
user.org_id = cls.test_org.id
|
user.org_id = cls.test_org.id
|
||||||
cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True)
|
cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True)
|
||||||
cls.user_misp_connector = ExpandedPyMISP(url, cls.test_usr.authkey, verifycert, debug=True)
|
cls.user_misp_connector = PyMISP(url, cls.test_usr.authkey, verifycert, debug=True)
|
||||||
cls.user_misp_connector.toggle_global_pythonify()
|
cls.user_misp_connector.toggle_global_pythonify()
|
||||||
# Creates a publisher
|
# Creates a publisher
|
||||||
user = MISPUser()
|
user = MISPUser()
|
||||||
|
@ -84,14 +85,14 @@ class TestComprehensive(unittest.TestCase):
|
||||||
user.org_id = cls.test_org.id
|
user.org_id = cls.test_org.id
|
||||||
user.role_id = 4
|
user.role_id = 4
|
||||||
cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True)
|
cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True)
|
||||||
cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey, verifycert)
|
cls.pub_misp_connector = PyMISP(url, cls.test_pub.authkey, verifycert)
|
||||||
# Creates a user that can accept a delegation request
|
# Creates a user that can accept a delegation request
|
||||||
user = MISPUser()
|
user = MISPUser()
|
||||||
user.email = 'testusr@delegate.recipient.local'
|
user.email = 'testusr@delegate.recipient.local'
|
||||||
user.org_id = cls.test_org_delegate.id
|
user.org_id = cls.test_org_delegate.id
|
||||||
user.role_id = 2
|
user.role_id = 2
|
||||||
cls.test_usr_delegate = cls.admin_misp_connector.add_user(user, pythonify=True)
|
cls.test_usr_delegate = cls.admin_misp_connector.add_user(user, pythonify=True)
|
||||||
cls.delegate_user_misp_connector = ExpandedPyMISP(url, cls.test_usr_delegate.authkey, verifycert, debug=False)
|
cls.delegate_user_misp_connector = PyMISP(url, cls.test_usr_delegate.authkey, verifycert, debug=False)
|
||||||
cls.delegate_user_misp_connector.toggle_global_pythonify()
|
cls.delegate_user_misp_connector.toggle_global_pythonify()
|
||||||
if not fast_mode:
|
if not fast_mode:
|
||||||
# Update all json stuff
|
# Update all json stuff
|
||||||
|
@ -1907,7 +1908,7 @@ class TestComprehensive(unittest.TestCase):
|
||||||
try:
|
try:
|
||||||
test_roles_user = self.admin_misp_connector.add_user(user, pythonify=True)
|
test_roles_user = self.admin_misp_connector.add_user(user, pythonify=True)
|
||||||
test_tag = self.admin_misp_connector.add_tag(tag, pythonify=True)
|
test_tag = self.admin_misp_connector.add_tag(tag, pythonify=True)
|
||||||
test_roles_user_connector = ExpandedPyMISP(url, test_roles_user.authkey, verifycert, debug=False)
|
test_roles_user_connector = PyMISP(url, test_roles_user.authkey, verifycert, debug=False)
|
||||||
test_roles_user_connector.toggle_global_pythonify()
|
test_roles_user_connector.toggle_global_pythonify()
|
||||||
# ===== Read Only
|
# ===== Read Only
|
||||||
self.admin_misp_connector.update_user({'role_id': 6}, test_roles_user)
|
self.admin_misp_connector.update_user({'role_id': 6}, test_roles_user)
|
||||||
|
@ -2150,6 +2151,36 @@ class TestComprehensive(unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
self.admin_misp_connector.delete_event(first)
|
self.admin_misp_connector.delete_event(first)
|
||||||
|
|
||||||
|
def test_registrations(self):
|
||||||
|
r = register_user(url, 'self_register@user.local', organisation=self.test_org,
|
||||||
|
org_name=self.test_org.name, verify=verifycert)
|
||||||
|
self.assertTrue(r['saved'])
|
||||||
|
|
||||||
|
r = register_user(url, 'discard@tesst.de', verify=verifycert)
|
||||||
|
self.assertTrue(r['saved'])
|
||||||
|
|
||||||
|
registrations = self.admin_misp_connector.user_registrations(pythonify=True)
|
||||||
|
self.assertTrue(len(registrations), 2)
|
||||||
|
self.assertEqual(registrations[0].data['email'], 'self_register@user.local')
|
||||||
|
self.assertEqual(registrations[0].data['org_name'], 'Test Org')
|
||||||
|
self.assertEqual(registrations[1].data['email'], 'discard@tesst.de')
|
||||||
|
|
||||||
|
m = self.admin_misp_connector.accept_user_registration(registrations[0], unsafe_fallback=True)
|
||||||
|
self.assertTrue(m['saved'])
|
||||||
|
|
||||||
|
# delete new user
|
||||||
|
for user in self.admin_misp_connector.users(pythonify=True):
|
||||||
|
if user.email == registrations[0].data['email']:
|
||||||
|
self.admin_misp_connector.delete_user(user)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Expected: accept registration fails because the orgname is missing
|
||||||
|
m = self.admin_misp_connector.accept_user_registration(registrations[1], unsafe_fallback=True)
|
||||||
|
self.assertEqual(m['errors'][1]['message'], 'No organisation selected. Supply an Organisation ID')
|
||||||
|
|
||||||
|
m = self.admin_misp_connector.discard_user_registration(registrations[1].id)
|
||||||
|
self.assertEqual(m['name'], '1 registration(s) discarded.')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue