From 75a100a4851598c11af7753c4a0a4c5b5b01eb1d Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 1 Dec 2022 10:05:38 +0100 Subject: [PATCH 1/4] add: Added the `Galaxy` field to MISPAttribute using the MISPGalaxy class - Including an `add_galaxy` method similar to the one used for events - `attribute.galaxies` gives the list of attached galaxy clusters --- pymisp/__init__.py | 9 +- pymisp/exceptions.py | 4 + pymisp/mispevent.py | 686 +++++++++++++++++++++++-------------------- 3 files changed, 370 insertions(+), 329 deletions(-) diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 188c523..7bd23ae 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -26,14 +26,15 @@ Response (if any): try: warning_2022() - from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa - from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa + from .exceptions import (PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, # noqa + InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse) + from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa from .mispevent import (MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, # noqa MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, - MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation, - MISPCorrelationExclusion, MISPGalaxy, MISPDecayingModel) + MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster, + MISPGalaxyClusterElement, MISPGalaxyClusterRelation) from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa from .tools import stix # noqa diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index 58d3a52..96f3544 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -65,6 +65,10 @@ class UnknownMISPObjectTemplate(MISPObjectException): pass +class InvalidMISPGalaxy(PyMISPError): + pass + + class PyMISPInvalidFormat(PyMISPError): pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 3bc9d13..65a8b0d 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -17,7 +17,9 @@ from pathlib import Path from typing import List, Optional, Union, IO, Dict, Any from .abstract import AbstractMISP, MISPTag -from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, NewGalaxyClusterError, NewGalaxyClusterRelationError +from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, + PyMISPError, NewEventError, NewAttributeError, NewEventReportError, + NewGalaxyClusterError, NewGalaxyClusterRelationError) logger = logging.getLogger('pymisp') @@ -251,6 +253,324 @@ class MISPSighting(AbstractMISP): return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) +class MISPGalaxyClusterElement(AbstractMISP): + """A MISP Galaxy cluster element, providing further info on a cluster + + Creating a new galaxy cluster element can take the following parameters + + :param key: The key/identifier of the element + :type key: str + :param value: The value of the element + :type value: str + """ + + key: str + value: str + + def __repr__(self) -> str: + if hasattr(self, 'key') and hasattr(self, 'value'): + return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + def __setattr__(self, key, value): + if key == "value" and isinstance(value, list): + raise PyMISPError("You tried to set a list to a cluster element's value. " + "Instead, create seperate elements for each value") + super().__setattr__(key, value) + + def from_dict(self, **kwargs): + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('galaxy_cluster_id'): + self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id')) + + super().from_dict(**kwargs) + + +class MISPGalaxyClusterRelation(AbstractMISP): + """A MISP Galaxy cluster relation, linking one cluster to another + + Creating a new galaxy cluster can take the following parameters + + :param galaxy_cluster_uuid: The UUID of the galaxy the relation links to + :param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by + :param referenced_galaxy_cluster_uuid: The UUID of the related galaxy + :param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0 + :param sharing_group_id: The sharing group of the relation, only when distribution is 4 + """ + + def __repr__(self) -> str: + if hasattr(self, "referenced_galaxy_cluster_type"): + return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + def __init__(self) -> None: + super().__init__() + self.galaxy_cluster_uuid: str + self.referenced_galaxy_cluster_uuid: str + self.distribution: int = 0 + self.referenced_galaxy_cluster_type: str + self.Tag: List[MISPTag] = [] + + def from_dict(self, **kwargs): + # Default values for a valid event to send to a MISP instance + self.distribution = int(kwargs.pop('distribution', 0)) + if self.distribution not in [0, 1, 2, 3, 4, 5]: + raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) + + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('orgc_id'): + self.orgc_id = int(kwargs.pop('orgc_id')) + if kwargs.get('org_id'): + self.org_id = int(kwargs.pop('org_id')) + if kwargs.get('galaxy_id'): + self.galaxy_id = int(kwargs.pop('galaxy_id')) + if kwargs.get('tag_id'): + self.tag_id = int(kwargs.pop('tag_id')) + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + if kwargs.get('Tag'): + [self.add_tag(**t) for t in kwargs.pop('Tag')] + if kwargs.get('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + super().from_dict(**kwargs) + + def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag: + return super()._add_tag(tag, **kwargs) + + @property + def tags(self) -> List[MISPTag]: + """Returns a list of tags associated to this Attribute""" + return self.Tag + + @tags.setter + def tags(self, tags: List[MISPTag]): + """Set a list of prepared MISPTag.""" + super()._set_tags(tags) + + +class MISPGalaxyCluster(AbstractMISP): + """A MISP galaxy cluster, storing respective galaxy elements and relations. + Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters + + Creating a new galaxy cluster can take the following parameters + + :param value: The value of the galaxy cluster + :type value: str + :param description: The description of the galaxy cluster + :type description: str + :param distribution: The distribution type, one of 0, 1, 2, 3, 4 + :type distribution: int + :param sharing_group_id: The sharing group ID, if distribution is set to 4 + :type sharing_group_id: int, optional + :param authors: A list of authors of the galaxy cluster + :type authors: list[str], optional + :param cluster_elements: List of MISPGalaxyClusterElement + :type cluster_elements: list[MISPGalaxyClusterElement], optional + :param cluster_relations: List of MISPGalaxyClusterRelation + :type cluster_relations: list[MISPGalaxyClusterRelation], optional + """ + + def __init__(self) -> None: + super().__init__() + self.Galaxy: MISPGalaxy + self.GalaxyElement: List[MISPGalaxyClusterElement] = [] + self.meta: Dict = {} + self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = [] + self.Org: MISPOrganisation + self.Orgc: MISPOrganisation + self.SharingGroup: MISPSharingGroup + self.value: str + # Set any inititialized cluster to be False + self.default = False + + @property + def cluster_elements(self) -> List[MISPGalaxyClusterElement]: + return self.GalaxyElement + + @cluster_elements.setter + def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]): + self.GalaxyElement = cluster_elements + + @property + def cluster_relations(self) -> List[MISPGalaxyClusterRelation]: + return self.GalaxyClusterRelation + + @cluster_relations.setter + def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]): + self.GalaxyClusterRelation = cluster_relations + + def parse_meta_as_elements(self): + """Function to parse the meta field into GalaxyClusterElements""" + # Parse the cluster elements from the kwargs meta fields + for key, value in self.meta.items(): + # The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up + if not isinstance(value, list): + value = [value] + for v in value: + self.add_cluster_element(key=key, value=v) + + @property + def elements_meta(self) -> Dict: + """Function to return the galaxy cluster elements as a dictionary structure of lists + that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID + """ + response = defaultdict(list) + for element in self.cluster_elements: + response[element.key].append(element.value) + return dict(response) + + def from_dict(self, **kwargs): + if 'GalaxyCluster' in kwargs: + kwargs = kwargs['GalaxyCluster'] + self.default = kwargs.pop('default', False) + # If the default field is set, we shouldn't have distribution or sharing group ID set + if self.default: + blocked_fields = ["distribution" "sharing_group_id"] + for field in blocked_fields: + if kwargs.get(field, None): + raise NewGalaxyClusterError( + f"The field '{field}' cannot be set on a default galaxy cluster" + ) + + self.distribution = int(kwargs.pop('distribution', 0)) + if self.distribution not in [0, 1, 2, 3, 4]: + raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) + + if 'uuid' in kwargs: + self.uuid = kwargs.pop('uuid') + if 'meta' in kwargs: + self.meta = kwargs.pop('meta') + if 'Galaxy' in kwargs: + self.Galaxy = MISPGalaxy() + self.Galaxy.from_dict(**kwargs.pop('Galaxy')) + if 'GalaxyElement' in kwargs: + [self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')] + if 'Org' in kwargs: + self.Org = MISPOrganisation() + self.Org.from_dict(**kwargs.pop('Org')) + if 'Orgc' in kwargs: + self.Orgc = MISPOrganisation() + self.Orgc.from_dict(**kwargs.pop('Orgc')) + if 'GalaxyClusterRelation' in kwargs: + [self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')] + if 'SharingGroup' in kwargs: + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + super().from_dict(**kwargs) + + def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement: + """Add a cluster relation to a MISPGalaxyCluster, key and value are required + + :param key: The key name of the element + :type key: str + :param value: The value of the element + :type value: str + """ + + cluster_element = MISPGalaxyClusterElement() + cluster_element.from_dict(key=key, value=value, **kwargs) + self.cluster_elements.append(cluster_element) + return cluster_element + + def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation: + """Add a cluster relation to a MISPGalaxyCluster. + + :param referenced_galaxy_cluster_uuid: UUID of the related cluster + :type referenced_galaxy_cluster_uuid: uuid + :param referenced_galaxy_cluster_type: Relation type + :type referenced_galaxy_cluster_type: uuid + :param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID + :param galaxy_cluster_uuid: uuid, Optional + """ + + if not getattr(self, "uuid", None): + raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster") + cluster_relation = MISPGalaxyClusterRelation() + + if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster): + referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid + + cluster_relation.from_dict( + referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid, + referenced_galaxy_cluster_type=referenced_galaxy_cluster_type, + galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid, + **kwargs + ) + self.cluster_relations.append(cluster_relation) + return cluster_relation + + def __repr__(self) -> str: + if hasattr(self, 'value'): + return '<{self.__class__.__name__}(value={self.value})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + +class MISPGalaxy(AbstractMISP): + """Galaxy class, used to view a galaxy and respective clusters""" + + def __init__(self) -> None: + super().__init__() + self.GalaxyCluster: List[MISPGalaxyCluster] = [] + self.name: str + + def from_dict(self, **kwargs): + """Galaxy could be in one of the following formats: + {'Galaxy': {}, 'GalaxyCluster': []} + {'Galaxy': {'GalaxyCluster': []}} + """ + + if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True): + # Parse the cluster from the kwargs + [self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')] + + if 'Galaxy' in kwargs: + kwargs = kwargs['Galaxy'] + super().from_dict(**kwargs) + + @property + def clusters(self) -> List[MISPGalaxyCluster]: + return self.GalaxyCluster + + def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster: + """Add a MISP galaxy cluster into a MISPGalaxy. + Supports all other parameters supported by MISPGalaxyCluster""" + + galaxy_cluster = MISPGalaxyCluster() + galaxy_cluster.from_dict(**kwargs) + self.clusters.append(galaxy_cluster) + return galaxy_cluster + + def __repr__(self) -> str: + if hasattr(self, 'name'): + return '<{self.__class__.__name__}(name={self.name})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + class MISPAttribute(AbstractMISP): _fields_for_feed: set = {'uuid', 'value', 'category', 'type', 'comment', 'data', 'deleted', 'timestamp', 'to_ids', 'disable_correlation', @@ -277,6 +597,7 @@ class MISPAttribute(AbstractMISP): self.SharingGroup: MISPSharingGroup self.Sighting: List[MISPSighting] = [] self.Tag: List[MISPTag] = [] + self.Galaxy: List[MISPGalaxy] = [] # For search self.Event: MISPEvent @@ -298,6 +619,27 @@ class MISPAttribute(AbstractMISP): """Set a list of prepared MISPTag.""" super()._set_tags(tags) + def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy: + """Add a galaxy to the Attribute, either by passing a MISPGalaxy or a dictionary""" + if isinstance(galaxy, MISPGalaxy): + self.galaxies.append(galaxy) + return galaxy + if isinstance(galaxy, dict): + misp_galaxy = MISPGalaxy() + misp_galaxy.from_dict(**galaxy) + elif kwargs: + misp_galaxy = MISPGalaxy() + misp_galaxy.from_dict(**kwargs) + else: + raise InvalidMISPGalaxy("A Galaxy to add to an existing Attribute needs to be either a MISPGalaxy or a plain python dictionary") + self.galaxies.append(misp_galaxy) + return misp_galaxy + + @property + def galaxies(self) -> List[MISPGalaxy]: + """Returns a list of galaxies associated to this Attribute""" + return self.Galaxy + def _prepare_data(self, data: Optional[Union[Path, str, bytes, BytesIO]]): if not data: super().__setattr__('data', None) @@ -588,6 +930,8 @@ class MISPAttribute(AbstractMISP): if kwargs.get('Tag'): [self.add_tag(tag) for tag in kwargs.pop('Tag')] + if kwargs.get('Galaxy'): + [self.add_galaxy(galaxy) for galaxy in kwargs.pop('Galaxy')] if kwargs.get('Sighting'): [self.add_sighting(sighting) for sighting in kwargs.pop('Sighting')] if kwargs.get('ShadowAttribute'): @@ -1162,324 +1506,6 @@ class MISPEventReport(AbstractMISP): self.content = '' -class MISPGalaxyClusterElement(AbstractMISP): - """A MISP Galaxy cluster element, providing further info on a cluster - - Creating a new galaxy cluster element can take the following parameters - - :param key: The key/identifier of the element - :type key: str - :param value: The value of the element - :type value: str - """ - - key: str - value: str - - def __repr__(self) -> str: - if hasattr(self, 'key') and hasattr(self, 'value'): - return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - def __setattr__(self, key, value): - if key == "value" and isinstance(value, list): - raise PyMISPError("You tried to set a list to a cluster element's value. " - "Instead, create seperate elements for each value") - super().__setattr__(key, value) - - def from_dict(self, **kwargs): - if kwargs.get('id'): - self.id = int(kwargs.pop('id')) - if kwargs.get('galaxy_cluster_id'): - self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id')) - - super().from_dict(**kwargs) - - -class MISPGalaxyClusterRelation(AbstractMISP): - """A MISP Galaxy cluster relation, linking one cluster to another - - Creating a new galaxy cluster can take the following parameters - - :param galaxy_cluster_uuid: The UUID of the galaxy the relation links to - :param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by - :param referenced_galaxy_cluster_uuid: The UUID of the related galaxy - :param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0 - :param sharing_group_id: The sharing group of the relation, only when distribution is 4 - """ - - def __repr__(self) -> str: - if hasattr(self, "referenced_galaxy_cluster_type"): - return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - def __init__(self) -> None: - super().__init__() - self.galaxy_cluster_uuid: str - self.referenced_galaxy_cluster_uuid: str - self.distribution: int = 0 - self.referenced_galaxy_cluster_type: str - self.Tag: List[MISPTag] = [] - - def from_dict(self, **kwargs): - # Default values for a valid event to send to a MISP instance - self.distribution = int(kwargs.pop('distribution', 0)) - if self.distribution not in [0, 1, 2, 3, 4, 5]: - raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') - - if kwargs.get('sharing_group_id'): - self.sharing_group_id = int(kwargs.pop('sharing_group_id')) - - if self.distribution == 4: - # The distribution is set to sharing group, a sharing_group_id is required. - if not hasattr(self, 'sharing_group_id'): - raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.') - elif not self.sharing_group_id: - # Cannot be None or 0 either. - raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) - - if kwargs.get('id'): - self.id = int(kwargs.pop('id')) - if kwargs.get('orgc_id'): - self.orgc_id = int(kwargs.pop('orgc_id')) - if kwargs.get('org_id'): - self.org_id = int(kwargs.pop('org_id')) - if kwargs.get('galaxy_id'): - self.galaxy_id = int(kwargs.pop('galaxy_id')) - if kwargs.get('tag_id'): - self.tag_id = int(kwargs.pop('tag_id')) - if kwargs.get('sharing_group_id'): - self.sharing_group_id = int(kwargs.pop('sharing_group_id')) - if kwargs.get('Tag'): - [self.add_tag(**t) for t in kwargs.pop('Tag')] - if kwargs.get('SharingGroup'): - self.SharingGroup = MISPSharingGroup() - self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) - super().from_dict(**kwargs) - - def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag: - return super()._add_tag(tag, **kwargs) - - @property - def tags(self) -> List[MISPTag]: - """Returns a list of tags associated to this Attribute""" - return self.Tag - - @tags.setter - def tags(self, tags: List[MISPTag]): - """Set a list of prepared MISPTag.""" - super()._set_tags(tags) - - -class MISPGalaxyCluster(AbstractMISP): - """A MISP galaxy cluster, storing respective galaxy elements and relations. - Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters - - Creating a new galaxy cluster can take the following parameters - - :param value: The value of the galaxy cluster - :type value: str - :param description: The description of the galaxy cluster - :type description: str - :param distribution: The distribution type, one of 0, 1, 2, 3, 4 - :type distribution: int - :param sharing_group_id: The sharing group ID, if distribution is set to 4 - :type sharing_group_id: int, optional - :param authors: A list of authors of the galaxy cluster - :type authors: list[str], optional - :param cluster_elements: List of MISPGalaxyClusterElement - :type cluster_elements: list[MISPGalaxyClusterElement], optional - :param cluster_relations: List of MISPGalaxyClusterRelation - :type cluster_relations: list[MISPGalaxyClusterRelation], optional - """ - - def __init__(self) -> None: - super().__init__() - self.Galaxy: MISPGalaxy - self.GalaxyElement: List[MISPGalaxyClusterElement] = [] - self.meta: Dict = {} - self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = [] - self.Org: MISPOrganisation - self.Orgc: MISPOrganisation - self.SharingGroup: MISPSharingGroup - self.value: str - # Set any inititialized cluster to be False - self.default = False - - @property - def cluster_elements(self) -> List[MISPGalaxyClusterElement]: - return self.GalaxyElement - - @cluster_elements.setter - def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]): - self.GalaxyElement = cluster_elements - - @property - def cluster_relations(self) -> List[MISPGalaxyClusterRelation]: - return self.GalaxyClusterRelation - - @cluster_relations.setter - def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]): - self.GalaxyClusterRelation = cluster_relations - - def parse_meta_as_elements(self): - """Function to parse the meta field into GalaxyClusterElements""" - # Parse the cluster elements from the kwargs meta fields - for key, value in self.meta.items(): - # The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up - if not isinstance(value, list): - value = [value] - for v in value: - self.add_cluster_element(key=key, value=v) - - @property - def elements_meta(self) -> Dict: - """Function to return the galaxy cluster elements as a dictionary structure of lists - that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID - """ - response = defaultdict(list) - for element in self.cluster_elements: - response[element.key].append(element.value) - return dict(response) - - def from_dict(self, **kwargs): - if 'GalaxyCluster' in kwargs: - kwargs = kwargs['GalaxyCluster'] - self.default = kwargs.pop('default', False) - # If the default field is set, we shouldn't have distribution or sharing group ID set - if self.default: - blocked_fields = ["distribution" "sharing_group_id"] - for field in blocked_fields: - if kwargs.get(field, None): - raise NewGalaxyClusterError( - f"The field '{field}' cannot be set on a default galaxy cluster" - ) - - self.distribution = int(kwargs.pop('distribution', 0)) - if self.distribution not in [0, 1, 2, 3, 4]: - raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') - - if kwargs.get('sharing_group_id'): - self.sharing_group_id = int(kwargs.pop('sharing_group_id')) - - if self.distribution == 4: - # The distribution is set to sharing group, a sharing_group_id is required. - if not hasattr(self, 'sharing_group_id'): - raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.') - elif not self.sharing_group_id: - # Cannot be None or 0 either. - raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) - - if 'uuid' in kwargs: - self.uuid = kwargs.pop('uuid') - if 'meta' in kwargs: - self.meta = kwargs.pop('meta') - if 'Galaxy' in kwargs: - self.Galaxy = MISPGalaxy() - self.Galaxy.from_dict(**kwargs.pop('Galaxy')) - if 'GalaxyElement' in kwargs: - [self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')] - if 'Org' in kwargs: - self.Org = MISPOrganisation() - self.Org.from_dict(**kwargs.pop('Org')) - if 'Orgc' in kwargs: - self.Orgc = MISPOrganisation() - self.Orgc.from_dict(**kwargs.pop('Orgc')) - if 'GalaxyClusterRelation' in kwargs: - [self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')] - if 'SharingGroup' in kwargs: - self.SharingGroup = MISPSharingGroup() - self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) - super().from_dict(**kwargs) - - def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement: - """Add a cluster relation to a MISPGalaxyCluster, key and value are required - - :param key: The key name of the element - :type key: str - :param value: The value of the element - :type value: str - """ - - cluster_element = MISPGalaxyClusterElement() - cluster_element.from_dict(key=key, value=value, **kwargs) - self.cluster_elements.append(cluster_element) - return cluster_element - - def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation: - """Add a cluster relation to a MISPGalaxyCluster. - - :param referenced_galaxy_cluster_uuid: UUID of the related cluster - :type referenced_galaxy_cluster_uuid: uuid - :param referenced_galaxy_cluster_type: Relation type - :type referenced_galaxy_cluster_type: uuid - :param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID - :param galaxy_cluster_uuid: uuid, Optional - """ - - if not getattr(self, "uuid", None): - raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster") - cluster_relation = MISPGalaxyClusterRelation() - - if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster): - referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid - - cluster_relation.from_dict( - referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid, - referenced_galaxy_cluster_type=referenced_galaxy_cluster_type, - galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid, - **kwargs - ) - self.cluster_relations.append(cluster_relation) - return cluster_relation - - def __repr__(self) -> str: - if hasattr(self, 'value'): - return '<{self.__class__.__name__}(value={self.value})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - -class MISPGalaxy(AbstractMISP): - """Galaxy class, used to view a galaxy and respective clusters""" - - def __init__(self) -> None: - super().__init__() - self.GalaxyCluster: List[MISPGalaxyCluster] = [] - self.name: str - - def from_dict(self, **kwargs): - """Galaxy could be in one of the following formats: - {'Galaxy': {}, 'GalaxyCluster': []} - {'Galaxy': {'GalaxyCluster': []}} - """ - - if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True): - # Parse the cluster from the kwargs - [self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')] - - if 'Galaxy' in kwargs: - kwargs = kwargs['Galaxy'] - super().from_dict(**kwargs) - - @property - def clusters(self) -> List[MISPGalaxyCluster]: - return self.GalaxyCluster - - def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster: - """Add a MISP galaxy cluster into a MISPGalaxy. - Supports all other parameters supported by MISPGalaxyCluster""" - - galaxy_cluster = MISPGalaxyCluster() - galaxy_cluster.from_dict(**kwargs) - self.clusters.append(galaxy_cluster) - return galaxy_cluster - - def __repr__(self) -> str: - if hasattr(self, 'name'): - return '<{self.__class__.__name__}(name={self.name})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - class MISPEvent(AbstractMISP): _fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', @@ -1945,13 +1971,23 @@ class MISPEvent(AbstractMISP): self.edited = True return event_report - def add_galaxy(self, **kwargs) -> MISPGalaxy: - """Add a MISP galaxy and sub-clusters into an event. + def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy: + """Add a galaxy and sub-clusters into an event, either by passing + a MISPGalaxy or a dictionary. Supports all other parameters supported by MISPGalaxy""" - galaxy = MISPGalaxy() - galaxy.from_dict(**kwargs) - self.galaxies.append(galaxy) - return galaxy + if isinstance(galaxy, MISPGalaxy): + self.galaxies.append(galaxy) + return galaxy + if isinstance(galaxy, dict): + misp_galaxy = MISPGalaxy() + misp_galaxy.from_dict(**galaxy) + elif kwargs: + misp_galaxy = MISPGalaxy() + misp_galaxy.from_dict(**kwargs) + else: + raise InvalidMISPGalaxy("A Galaxy to add to an existing Event needs to be either a MISPGalaxy or a plain python dictionary") + self.galaxies.append(misp_galaxy) + return misp_galaxy def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject: """Get an object by ID From 55a4b2e5c19703600f2aa08bc33bc76b16c60366 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 1 Dec 2022 10:09:39 +0100 Subject: [PATCH 2/4] add: Added very straight forward tests to make sure the galaxy clusters are properly defined --- tests/test_mispevent.py | 44 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/tests/test_mispevent.py b/tests/test_mispevent.py index 0c757e9..7d27a18 100644 --- a/tests/test_mispevent.py +++ b/tests/test_mispevent.py @@ -8,8 +8,8 @@ import glob import hashlib from datetime import date, datetime -from pymisp import (MISPEvent, MISPSighting, MISPTag, MISPOrganisation, - MISPObject) +from pymisp import (MISPAttribute, MISPEvent, MISPGalaxy, MISPObject, MISPOrganisation, + MISPSighting, MISPTag) from pymisp.exceptions import InvalidMISPObject from pymisp.tools import GitVulnFinderObject @@ -68,6 +68,15 @@ class TestMISPEvent(unittest.TestCase): del self.mispevent.uuid self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) + def test_event_galaxy(self): + self.init_event() + with open('tests/mispevent_testfiles/galaxy.json', 'r') as f: + galaxy = json.load(f) + misp_galaxy = MISPGalaxy() + misp_galaxy.from_dict(**galaxy) + self.mispevent.add_galaxy(misp_galaxy) + self.assertEqual(self.mispevent.galaxies[0].to_json(sort_keys=True, indent=2), json.dumps(galaxy, sort_keys=True, indent=2)) + def test_attribute(self): self.init_event() a = self.mispevent.add_attribute('filename', 'bar.exe') @@ -87,6 +96,21 @@ class TestMISPEvent(unittest.TestCase): ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) + def test_attribute_galaxy(self): + self.init_event() + with open('tests/mispevent_testfiles/galaxy.json', 'r') as f: + galaxy = json.load(f) + misp_galaxy = MISPGalaxy() + misp_galaxy.from_dict(**galaxy) + attribute = MISPAttribute() + attribute.from_dict(**{'type': 'github-username', 'value': 'adulau'}) + attribute.add_galaxy(misp_galaxy) + self.mispevent.add_attribute(**attribute) + self.assertEqual( + self.mispevent.attributes[0].galaxies[0].to_json(sort_keys=True, indent=2), + json.dumps(galaxy, sort_keys=True, indent=2) + ) + def test_to_dict_json_format(self): misp_event = MISPEvent() av_signature_object = MISPObject("av-signature") @@ -130,6 +154,22 @@ class TestMISPEvent(unittest.TestCase): ref_json = json.load(f) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) + def test_object_galaxy(self): + self.init_event() + misp_object = MISPObject('github-user') + misp_object.add_attribute('username', 'adulau') + misp_object.add_attribute('repository', 'cve-search') + self.mispevent.add_object(misp_object) + with open('tests/mispevent_testfiles/galaxy.json', 'r') as f: + galaxy = json.load(f) + misp_galaxy = MISPGalaxy() + misp_galaxy.from_dict(**galaxy) + self.mispevent.objects[0].attributes[0].add_galaxy(misp_galaxy) + self.assertEqual( + self.mispevent.objects[0].attributes[0].galaxies[0].to_json(sort_keys=True, indent=2), + json.dumps(galaxy, sort_keys=True, indent=2) + ) + def test_malware(self): with open('tests/mispevent_testfiles/simple.json', 'rb') as f: pseudofile = BytesIO(f.read()) From 77b0b3ac3e51ddea95a314730a8a34ec36497a88 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Thu, 1 Dec 2022 10:49:09 +0100 Subject: [PATCH 3/4] add: Galaxy test sample --- tests/mispevent_testfiles/galaxy.json | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 tests/mispevent_testfiles/galaxy.json diff --git a/tests/mispevent_testfiles/galaxy.json b/tests/mispevent_testfiles/galaxy.json new file mode 100644 index 0000000..a2d9107 --- /dev/null +++ b/tests/mispevent_testfiles/galaxy.json @@ -0,0 +1,25 @@ +{ + "uuid": "c5f2dfb4-21a1-42d8-a452-1d3c36a204ff", + "name": "Tea Matrix", + "type": "tea-matrix", + "description": "Tea Matrix", + "namespace": "tea-matrix", + "GalaxyCluster": [ + { + "collection_uuid": "7eacd736-b093-4cc0-a56c-5f84de725dfb", + "type": "tea-matrix", + "value": "Milk in tea", + "tag_name": "misp-galaxy:tea-matrix=\"Milk in tea\"", + "description": "Milk in tea", + "uuid": "24430dc6-9c27-4b3c-a5e7-6dda478fffa0", + "distribution": "3", + "default": true, + "meta": { + "kill_chain": [ + "tea:black" + ] + }, + "relationship_type": "ennemy-of" + } + ] +} From 1fb274821f686b2d4db93a888eea73cc59eb0999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 1 Dec 2022 12:06:57 +0100 Subject: [PATCH 4/4] chg: Re-order classes --- pymisp/mispevent.py | 640 ++++++++++++++++++++++---------------------- 1 file changed, 320 insertions(+), 320 deletions(-) diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 65a8b0d..63ebb3a 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -253,324 +253,6 @@ class MISPSighting(AbstractMISP): return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) -class MISPGalaxyClusterElement(AbstractMISP): - """A MISP Galaxy cluster element, providing further info on a cluster - - Creating a new galaxy cluster element can take the following parameters - - :param key: The key/identifier of the element - :type key: str - :param value: The value of the element - :type value: str - """ - - key: str - value: str - - def __repr__(self) -> str: - if hasattr(self, 'key') and hasattr(self, 'value'): - return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - def __setattr__(self, key, value): - if key == "value" and isinstance(value, list): - raise PyMISPError("You tried to set a list to a cluster element's value. " - "Instead, create seperate elements for each value") - super().__setattr__(key, value) - - def from_dict(self, **kwargs): - if kwargs.get('id'): - self.id = int(kwargs.pop('id')) - if kwargs.get('galaxy_cluster_id'): - self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id')) - - super().from_dict(**kwargs) - - -class MISPGalaxyClusterRelation(AbstractMISP): - """A MISP Galaxy cluster relation, linking one cluster to another - - Creating a new galaxy cluster can take the following parameters - - :param galaxy_cluster_uuid: The UUID of the galaxy the relation links to - :param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by - :param referenced_galaxy_cluster_uuid: The UUID of the related galaxy - :param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0 - :param sharing_group_id: The sharing group of the relation, only when distribution is 4 - """ - - def __repr__(self) -> str: - if hasattr(self, "referenced_galaxy_cluster_type"): - return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - def __init__(self) -> None: - super().__init__() - self.galaxy_cluster_uuid: str - self.referenced_galaxy_cluster_uuid: str - self.distribution: int = 0 - self.referenced_galaxy_cluster_type: str - self.Tag: List[MISPTag] = [] - - def from_dict(self, **kwargs): - # Default values for a valid event to send to a MISP instance - self.distribution = int(kwargs.pop('distribution', 0)) - if self.distribution not in [0, 1, 2, 3, 4, 5]: - raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') - - if kwargs.get('sharing_group_id'): - self.sharing_group_id = int(kwargs.pop('sharing_group_id')) - - if self.distribution == 4: - # The distribution is set to sharing group, a sharing_group_id is required. - if not hasattr(self, 'sharing_group_id'): - raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.') - elif not self.sharing_group_id: - # Cannot be None or 0 either. - raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) - - if kwargs.get('id'): - self.id = int(kwargs.pop('id')) - if kwargs.get('orgc_id'): - self.orgc_id = int(kwargs.pop('orgc_id')) - if kwargs.get('org_id'): - self.org_id = int(kwargs.pop('org_id')) - if kwargs.get('galaxy_id'): - self.galaxy_id = int(kwargs.pop('galaxy_id')) - if kwargs.get('tag_id'): - self.tag_id = int(kwargs.pop('tag_id')) - if kwargs.get('sharing_group_id'): - self.sharing_group_id = int(kwargs.pop('sharing_group_id')) - if kwargs.get('Tag'): - [self.add_tag(**t) for t in kwargs.pop('Tag')] - if kwargs.get('SharingGroup'): - self.SharingGroup = MISPSharingGroup() - self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) - super().from_dict(**kwargs) - - def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag: - return super()._add_tag(tag, **kwargs) - - @property - def tags(self) -> List[MISPTag]: - """Returns a list of tags associated to this Attribute""" - return self.Tag - - @tags.setter - def tags(self, tags: List[MISPTag]): - """Set a list of prepared MISPTag.""" - super()._set_tags(tags) - - -class MISPGalaxyCluster(AbstractMISP): - """A MISP galaxy cluster, storing respective galaxy elements and relations. - Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters - - Creating a new galaxy cluster can take the following parameters - - :param value: The value of the galaxy cluster - :type value: str - :param description: The description of the galaxy cluster - :type description: str - :param distribution: The distribution type, one of 0, 1, 2, 3, 4 - :type distribution: int - :param sharing_group_id: The sharing group ID, if distribution is set to 4 - :type sharing_group_id: int, optional - :param authors: A list of authors of the galaxy cluster - :type authors: list[str], optional - :param cluster_elements: List of MISPGalaxyClusterElement - :type cluster_elements: list[MISPGalaxyClusterElement], optional - :param cluster_relations: List of MISPGalaxyClusterRelation - :type cluster_relations: list[MISPGalaxyClusterRelation], optional - """ - - def __init__(self) -> None: - super().__init__() - self.Galaxy: MISPGalaxy - self.GalaxyElement: List[MISPGalaxyClusterElement] = [] - self.meta: Dict = {} - self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = [] - self.Org: MISPOrganisation - self.Orgc: MISPOrganisation - self.SharingGroup: MISPSharingGroup - self.value: str - # Set any inititialized cluster to be False - self.default = False - - @property - def cluster_elements(self) -> List[MISPGalaxyClusterElement]: - return self.GalaxyElement - - @cluster_elements.setter - def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]): - self.GalaxyElement = cluster_elements - - @property - def cluster_relations(self) -> List[MISPGalaxyClusterRelation]: - return self.GalaxyClusterRelation - - @cluster_relations.setter - def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]): - self.GalaxyClusterRelation = cluster_relations - - def parse_meta_as_elements(self): - """Function to parse the meta field into GalaxyClusterElements""" - # Parse the cluster elements from the kwargs meta fields - for key, value in self.meta.items(): - # The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up - if not isinstance(value, list): - value = [value] - for v in value: - self.add_cluster_element(key=key, value=v) - - @property - def elements_meta(self) -> Dict: - """Function to return the galaxy cluster elements as a dictionary structure of lists - that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID - """ - response = defaultdict(list) - for element in self.cluster_elements: - response[element.key].append(element.value) - return dict(response) - - def from_dict(self, **kwargs): - if 'GalaxyCluster' in kwargs: - kwargs = kwargs['GalaxyCluster'] - self.default = kwargs.pop('default', False) - # If the default field is set, we shouldn't have distribution or sharing group ID set - if self.default: - blocked_fields = ["distribution" "sharing_group_id"] - for field in blocked_fields: - if kwargs.get(field, None): - raise NewGalaxyClusterError( - f"The field '{field}' cannot be set on a default galaxy cluster" - ) - - self.distribution = int(kwargs.pop('distribution', 0)) - if self.distribution not in [0, 1, 2, 3, 4]: - raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') - - if kwargs.get('sharing_group_id'): - self.sharing_group_id = int(kwargs.pop('sharing_group_id')) - - if self.distribution == 4: - # The distribution is set to sharing group, a sharing_group_id is required. - if not hasattr(self, 'sharing_group_id'): - raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.') - elif not self.sharing_group_id: - # Cannot be None or 0 either. - raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) - - if 'uuid' in kwargs: - self.uuid = kwargs.pop('uuid') - if 'meta' in kwargs: - self.meta = kwargs.pop('meta') - if 'Galaxy' in kwargs: - self.Galaxy = MISPGalaxy() - self.Galaxy.from_dict(**kwargs.pop('Galaxy')) - if 'GalaxyElement' in kwargs: - [self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')] - if 'Org' in kwargs: - self.Org = MISPOrganisation() - self.Org.from_dict(**kwargs.pop('Org')) - if 'Orgc' in kwargs: - self.Orgc = MISPOrganisation() - self.Orgc.from_dict(**kwargs.pop('Orgc')) - if 'GalaxyClusterRelation' in kwargs: - [self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')] - if 'SharingGroup' in kwargs: - self.SharingGroup = MISPSharingGroup() - self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) - super().from_dict(**kwargs) - - def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement: - """Add a cluster relation to a MISPGalaxyCluster, key and value are required - - :param key: The key name of the element - :type key: str - :param value: The value of the element - :type value: str - """ - - cluster_element = MISPGalaxyClusterElement() - cluster_element.from_dict(key=key, value=value, **kwargs) - self.cluster_elements.append(cluster_element) - return cluster_element - - def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation: - """Add a cluster relation to a MISPGalaxyCluster. - - :param referenced_galaxy_cluster_uuid: UUID of the related cluster - :type referenced_galaxy_cluster_uuid: uuid - :param referenced_galaxy_cluster_type: Relation type - :type referenced_galaxy_cluster_type: uuid - :param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID - :param galaxy_cluster_uuid: uuid, Optional - """ - - if not getattr(self, "uuid", None): - raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster") - cluster_relation = MISPGalaxyClusterRelation() - - if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster): - referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid - - cluster_relation.from_dict( - referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid, - referenced_galaxy_cluster_type=referenced_galaxy_cluster_type, - galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid, - **kwargs - ) - self.cluster_relations.append(cluster_relation) - return cluster_relation - - def __repr__(self) -> str: - if hasattr(self, 'value'): - return '<{self.__class__.__name__}(value={self.value})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - -class MISPGalaxy(AbstractMISP): - """Galaxy class, used to view a galaxy and respective clusters""" - - def __init__(self) -> None: - super().__init__() - self.GalaxyCluster: List[MISPGalaxyCluster] = [] - self.name: str - - def from_dict(self, **kwargs): - """Galaxy could be in one of the following formats: - {'Galaxy': {}, 'GalaxyCluster': []} - {'Galaxy': {'GalaxyCluster': []}} - """ - - if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True): - # Parse the cluster from the kwargs - [self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')] - - if 'Galaxy' in kwargs: - kwargs = kwargs['Galaxy'] - super().from_dict(**kwargs) - - @property - def clusters(self) -> List[MISPGalaxyCluster]: - return self.GalaxyCluster - - def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster: - """Add a MISP galaxy cluster into a MISPGalaxy. - Supports all other parameters supported by MISPGalaxyCluster""" - - galaxy_cluster = MISPGalaxyCluster() - galaxy_cluster.from_dict(**kwargs) - self.clusters.append(galaxy_cluster) - return galaxy_cluster - - def __repr__(self) -> str: - if hasattr(self, 'name'): - return '<{self.__class__.__name__}(name={self.name})'.format(self=self) - return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) - - class MISPAttribute(AbstractMISP): _fields_for_feed: set = {'uuid', 'value', 'category', 'type', 'comment', 'data', 'deleted', 'timestamp', 'to_ids', 'disable_correlation', @@ -619,7 +301,7 @@ class MISPAttribute(AbstractMISP): """Set a list of prepared MISPTag.""" super()._set_tags(tags) - def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy: + def add_galaxy(self, galaxy: Union['MISPGalaxy', dict, None] = None, **kwargs) -> 'MISPGalaxy': """Add a galaxy to the Attribute, either by passing a MISPGalaxy or a dictionary""" if isinstance(galaxy, MISPGalaxy): self.galaxies.append(galaxy) @@ -636,7 +318,7 @@ class MISPAttribute(AbstractMISP): return misp_galaxy @property - def galaxies(self) -> List[MISPGalaxy]: + def galaxies(self) -> List['MISPGalaxy']: """Returns a list of galaxies associated to this Attribute""" return self.Galaxy @@ -1506,6 +1188,324 @@ class MISPEventReport(AbstractMISP): self.content = '' +class MISPGalaxyClusterElement(AbstractMISP): + """A MISP Galaxy cluster element, providing further info on a cluster + + Creating a new galaxy cluster element can take the following parameters + + :param key: The key/identifier of the element + :type key: str + :param value: The value of the element + :type value: str + """ + + key: str + value: str + + def __repr__(self) -> str: + if hasattr(self, 'key') and hasattr(self, 'value'): + return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + def __setattr__(self, key, value): + if key == "value" and isinstance(value, list): + raise PyMISPError("You tried to set a list to a cluster element's value. " + "Instead, create seperate elements for each value") + super().__setattr__(key, value) + + def from_dict(self, **kwargs): + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('galaxy_cluster_id'): + self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id')) + + super().from_dict(**kwargs) + + +class MISPGalaxyClusterRelation(AbstractMISP): + """A MISP Galaxy cluster relation, linking one cluster to another + + Creating a new galaxy cluster can take the following parameters + + :param galaxy_cluster_uuid: The UUID of the galaxy the relation links to + :param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by + :param referenced_galaxy_cluster_uuid: The UUID of the related galaxy + :param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0 + :param sharing_group_id: The sharing group of the relation, only when distribution is 4 + """ + + def __repr__(self) -> str: + if hasattr(self, "referenced_galaxy_cluster_type"): + return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + def __init__(self) -> None: + super().__init__() + self.galaxy_cluster_uuid: str + self.referenced_galaxy_cluster_uuid: str + self.distribution: int = 0 + self.referenced_galaxy_cluster_type: str + self.Tag: List[MISPTag] = [] + + def from_dict(self, **kwargs): + # Default values for a valid event to send to a MISP instance + self.distribution = int(kwargs.pop('distribution', 0)) + if self.distribution not in [0, 1, 2, 3, 4, 5]: + raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) + + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('orgc_id'): + self.orgc_id = int(kwargs.pop('orgc_id')) + if kwargs.get('org_id'): + self.org_id = int(kwargs.pop('org_id')) + if kwargs.get('galaxy_id'): + self.galaxy_id = int(kwargs.pop('galaxy_id')) + if kwargs.get('tag_id'): + self.tag_id = int(kwargs.pop('tag_id')) + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + if kwargs.get('Tag'): + [self.add_tag(**t) for t in kwargs.pop('Tag')] + if kwargs.get('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + super().from_dict(**kwargs) + + def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag: + return super()._add_tag(tag, **kwargs) + + @property + def tags(self) -> List[MISPTag]: + """Returns a list of tags associated to this Attribute""" + return self.Tag + + @tags.setter + def tags(self, tags: List[MISPTag]): + """Set a list of prepared MISPTag.""" + super()._set_tags(tags) + + +class MISPGalaxyCluster(AbstractMISP): + """A MISP galaxy cluster, storing respective galaxy elements and relations. + Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters + + Creating a new galaxy cluster can take the following parameters + + :param value: The value of the galaxy cluster + :type value: str + :param description: The description of the galaxy cluster + :type description: str + :param distribution: The distribution type, one of 0, 1, 2, 3, 4 + :type distribution: int + :param sharing_group_id: The sharing group ID, if distribution is set to 4 + :type sharing_group_id: int, optional + :param authors: A list of authors of the galaxy cluster + :type authors: list[str], optional + :param cluster_elements: List of MISPGalaxyClusterElement + :type cluster_elements: list[MISPGalaxyClusterElement], optional + :param cluster_relations: List of MISPGalaxyClusterRelation + :type cluster_relations: list[MISPGalaxyClusterRelation], optional + """ + + def __init__(self) -> None: + super().__init__() + self.Galaxy: MISPGalaxy + self.GalaxyElement: List[MISPGalaxyClusterElement] = [] + self.meta: Dict = {} + self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = [] + self.Org: MISPOrganisation + self.Orgc: MISPOrganisation + self.SharingGroup: MISPSharingGroup + self.value: str + # Set any inititialized cluster to be False + self.default = False + + @property + def cluster_elements(self) -> List[MISPGalaxyClusterElement]: + return self.GalaxyElement + + @cluster_elements.setter + def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]): + self.GalaxyElement = cluster_elements + + @property + def cluster_relations(self) -> List[MISPGalaxyClusterRelation]: + return self.GalaxyClusterRelation + + @cluster_relations.setter + def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]): + self.GalaxyClusterRelation = cluster_relations + + def parse_meta_as_elements(self): + """Function to parse the meta field into GalaxyClusterElements""" + # Parse the cluster elements from the kwargs meta fields + for key, value in self.meta.items(): + # The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up + if not isinstance(value, list): + value = [value] + for v in value: + self.add_cluster_element(key=key, value=v) + + @property + def elements_meta(self) -> Dict: + """Function to return the galaxy cluster elements as a dictionary structure of lists + that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID + """ + response = defaultdict(list) + for element in self.cluster_elements: + response[element.key].append(element.value) + return dict(response) + + def from_dict(self, **kwargs): + if 'GalaxyCluster' in kwargs: + kwargs = kwargs['GalaxyCluster'] + self.default = kwargs.pop('default', False) + # If the default field is set, we shouldn't have distribution or sharing group ID set + if self.default: + blocked_fields = ["distribution" "sharing_group_id"] + for field in blocked_fields: + if kwargs.get(field, None): + raise NewGalaxyClusterError( + f"The field '{field}' cannot be set on a default galaxy cluster" + ) + + self.distribution = int(kwargs.pop('distribution', 0)) + if self.distribution not in [0, 1, 2, 3, 4]: + raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) + + if 'uuid' in kwargs: + self.uuid = kwargs.pop('uuid') + if 'meta' in kwargs: + self.meta = kwargs.pop('meta') + if 'Galaxy' in kwargs: + self.Galaxy = MISPGalaxy() + self.Galaxy.from_dict(**kwargs.pop('Galaxy')) + if 'GalaxyElement' in kwargs: + [self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')] + if 'Org' in kwargs: + self.Org = MISPOrganisation() + self.Org.from_dict(**kwargs.pop('Org')) + if 'Orgc' in kwargs: + self.Orgc = MISPOrganisation() + self.Orgc.from_dict(**kwargs.pop('Orgc')) + if 'GalaxyClusterRelation' in kwargs: + [self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')] + if 'SharingGroup' in kwargs: + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + super().from_dict(**kwargs) + + def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement: + """Add a cluster relation to a MISPGalaxyCluster, key and value are required + + :param key: The key name of the element + :type key: str + :param value: The value of the element + :type value: str + """ + + cluster_element = MISPGalaxyClusterElement() + cluster_element.from_dict(key=key, value=value, **kwargs) + self.cluster_elements.append(cluster_element) + return cluster_element + + def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation: + """Add a cluster relation to a MISPGalaxyCluster. + + :param referenced_galaxy_cluster_uuid: UUID of the related cluster + :type referenced_galaxy_cluster_uuid: uuid + :param referenced_galaxy_cluster_type: Relation type + :type referenced_galaxy_cluster_type: uuid + :param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID + :param galaxy_cluster_uuid: uuid, Optional + """ + + if not getattr(self, "uuid", None): + raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster") + cluster_relation = MISPGalaxyClusterRelation() + + if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster): + referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid + + cluster_relation.from_dict( + referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid, + referenced_galaxy_cluster_type=referenced_galaxy_cluster_type, + galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid, + **kwargs + ) + self.cluster_relations.append(cluster_relation) + return cluster_relation + + def __repr__(self) -> str: + if hasattr(self, 'value'): + return '<{self.__class__.__name__}(value={self.value})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + +class MISPGalaxy(AbstractMISP): + """Galaxy class, used to view a galaxy and respective clusters""" + + def __init__(self) -> None: + super().__init__() + self.GalaxyCluster: List[MISPGalaxyCluster] = [] + self.name: str + + def from_dict(self, **kwargs): + """Galaxy could be in one of the following formats: + {'Galaxy': {}, 'GalaxyCluster': []} + {'Galaxy': {'GalaxyCluster': []}} + """ + + if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True): + # Parse the cluster from the kwargs + [self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')] + + if 'Galaxy' in kwargs: + kwargs = kwargs['Galaxy'] + super().from_dict(**kwargs) + + @property + def clusters(self) -> List[MISPGalaxyCluster]: + return self.GalaxyCluster + + def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster: + """Add a MISP galaxy cluster into a MISPGalaxy. + Supports all other parameters supported by MISPGalaxyCluster""" + + galaxy_cluster = MISPGalaxyCluster() + galaxy_cluster.from_dict(**kwargs) + self.clusters.append(galaxy_cluster) + return galaxy_cluster + + def __repr__(self) -> str: + if hasattr(self, 'name'): + return '<{self.__class__.__name__}(name={self.name})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + class MISPEvent(AbstractMISP): _fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',