From 164791e980a8bac9ecceaef021ac64198facb25a Mon Sep 17 00:00:00 2001 From: Tom King Date: Sat, 16 Jan 2021 15:56:30 +0000 Subject: [PATCH] new: MISP Galaxy 2.0 capability --- pymisp/__init__.py | 2 +- pymisp/api.py | 69 ++++++++- pymisp/data/misp-objects | 2 +- pymisp/exceptions.py | 12 ++ pymisp/mispevent.py | 324 +++++++++++++++++++++++++++++++++++++-- 5 files changed, 395 insertions(+), 14 deletions(-) diff --git a/pymisp/__init__.py b/pymisp/__init__.py index dfdf831..c2e3050 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -24,7 +24,7 @@ Response (if any): try: from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa - from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist # noqa + from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPGalaxyCluster # noqa from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa from .tools import stix # noqa diff --git a/pymisp/api.py b/pymisp/api.py index a7811a4..ab823b1 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -23,7 +23,8 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \ MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \ MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \ - MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist + MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPGalaxyCluster, \ + MISPGalaxyClusterRelation from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types SearchType = TypeVar('SearchType', str, int) @@ -1191,10 +1192,11 @@ class PyMISP: to_return.append(g) return to_return - def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxy]: + def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], withCluster: bool = False, pythonify: bool = False) -> Union[Dict, MISPGalaxy]: """Get a galaxy by id :param galaxy: galaxy to get + :param withCluster: Include the clusters associated with the galaxy :param pythonify: Returns a PyMISP Object instead of the plain json output """ galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy) @@ -1203,7 +1205,7 @@ class PyMISP: if not (self.global_pythonify or pythonify) or 'errors' in galaxy_j: return galaxy_j g = MISPGalaxy() - g.from_dict(**galaxy_j) + g.from_dict(**galaxy_j, withCluster=withCluster) return g def update_galaxies(self) -> Dict: @@ -1211,6 +1213,67 @@ class PyMISP: response = self._prepare_request('POST', 'galaxies/update') return self._check_json_response(response) + def get_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) + r = self._prepare_request('GET', f'galaxy_clusters/view/{cluster_id}') + cluster_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in cluster_j: + return cluster_j + gc = MISPGalaxyCluster() + gc.from_dict(**cluster_j) + return gc + + def update_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + """Update a custom galaxy cluster.""" + if galaxy_cluster.default: + # We can't edit default galaxies + raise PyMISPError('You are not able to update a default galaxy cluster') + cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) + r = self._prepare_request('POST', f'galaxy_clusters/edit/{cluster_id}', data=galaxy_cluster) + cluster_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in cluster_j: + return cluster_j + gc = MISPGalaxyCluster() + gc.from_dict(**cluster_j) + return gc + + def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict: + cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) + r = self._prepare_request('POST', f'galaxy_clusters/publish/{cluster_id}') + response = self._check_json_response(r) + return response + + def fork_galaxy_cluster(self, galaxy: Union[MISPGalaxyClusterRelation, int, str, UUID], galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy) + cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) + # Create a duplicate cluster from the cluster to fork + forked_galaxy_cluster = MISPGalaxyCluster() + forked_galaxy_cluster.from_dict(**galaxy_cluster) + # Set the UUID and version it extends from the existing galaxy cluster + forked_galaxy_cluster.extends_uuid = forked_galaxy_cluster.pop('uuid') + forked_galaxy_cluster.extends_version = forked_galaxy_cluster.pop('version') + r = self._prepare_request('POST', f'galaxy_clusters/add/{galaxy_id}/forkUUID:{cluster_id}', data=galaxy_cluster) + cluster_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in cluster_j: + return cluster_j + gc = MISPGalaxyCluster() + gc.from_dict(**cluster_j) + return gc + + def update_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict: + """Update a galaxy cluster relation.""" + cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation) + r = self._prepare_request('POST', f'galaxy_cluster_relations/edit/{cluster_relation_id}', data=galaxy_cluster_relation) + cluster_rel_j = self._check_json_response(r) + return cluster_rel_j + + def delete_galaxy_cluster_relation(self, galaxy_cluster_relation: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict: + """Delete a galaxy cluster relation""" + cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation) + r = self._prepare_request('POST', f'galaxy_cluster_relations/delete/{cluster_relation_id}') + cluster_rel_j = self._check_json_response(r) + return cluster_rel_j + # ## END Galaxy ### # ## BEGIN Feed ### diff --git a/pymisp/data/misp-objects b/pymisp/data/misp-objects index 8921a0c..27a554a 160000 --- a/pymisp/data/misp-objects +++ b/pymisp/data/misp-objects @@ -1 +1 @@ -Subproject commit 8921a0c8a26f1e5a7ce77fca7e96d8523ad5fffe +Subproject commit 27a554ab12acbc1242f801b5682364b2047cf9e0 diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index 8a809cc..7a1afd4 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -23,6 +23,18 @@ class UpdateAttributeError(PyMISPError): pass +class NewGalaxyError(PyMISPError): + pass + + +class NewGalaxyClusterError(PyMISPError): + pass + + +class NewGalaxyClusterRelationError(PyMISPError): + pass + + class SearchError(PyMISPError): pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 04c33f9..1c61134 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -16,7 +16,7 @@ from pathlib import Path from typing import List, Optional, Union, IO, Dict, Any from .abstract import AbstractMISP, MISPTag -from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError +from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewGalaxyClusterError, NewGalaxyClusterRelationError logger = logging.getLogger('pymisp') @@ -982,6 +982,305 @@ class MISPObject(AbstractMISP): return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) +class MISPGalaxyClusterElement(AbstractMISP): + def __repr__(self) -> str: + if hasattr(self, 'key') and hasattr(self, 'value'): + return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + def __setattr__(self, key, value): + if key == "value" and isinstance(value, list): + raise PyMISPError("You tried to set a list to a cluster element's value. " + "Instead, create seperate elements for each value") + super().__setattr__(key, value) + + +class MISPGalaxyClusterRelation(AbstractMISP): + """A MISP Galaxy cluster relation, linking one cluster to another + + :param distribution: The distribution of the relation, one of 0, 1, 2, 3, default 0 + :type distribution: int + :param galaxy_cluster_uuid: The UUID of the galaxy the relation links to + :type galaxy_cluster_uuid: uuid + :param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by + :type referenced_galaxy_cluster_type: str + :param referenced_galaxy_cluster_uuid: The UUID of the related galaxy + :type referenced_galaxy_cluster_uuid: uuid + :param referenced_galaxy_cluster_id: The ID of the related galaxy + :type referenced_galaxy_cluster_id: int, optional + :param galaxy_cluster_id: The ID of the galaxy cluster + :type galaxy_cluster_id: id, optional + :param id: The ID of the cluster relation + :type id: int, optional + :param default: Whether the relation is a default + :type default: bool, optional + """ + + def __repr__(self) -> str: + if hasattr(self, "referenced_galaxy_cluster_type"): + return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + def __init__(self): + super().__init__() + self.galaxy_cluster_uuid: uuid + self.referenced_galaxy_cluster_uuid: uuid + self.distribution: int = 0 + self.referenced_galaxy_cluster_type: str + self.Tag: MISPTag = [] + + def from_dict(self, **kwargs): + # Default values for a valid event to send to a MISP instance + self.distribution = kwargs.pop('distribution', 0) + self.distribution = int(self.distribution) + if self.distribution not in [0, 1, 2, 3, 4, 5]: + raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') + + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('orgc_id'): + self.orgc_id = int(kwargs.pop('orgc_id')) + if kwargs.get('org_id'): + self.org_id = int(kwargs.pop('org_id')) + if kwargs.get('galaxy_id'): + self.galaxy_id = int(kwargs.pop('galaxy_id')) + if kwargs.get('tag_id'): + self.tag_id = int(kwargs.pop('tag_id')) + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + if kwargs.get('Tag'): + [self.add_tag(**t) for t in kwargs.pop('Tag')] + if kwargs.get('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + super().from_dict(**kwargs) + + def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag: + return super()._add_tag(tag, **kwargs) + + @property + def tags(self) -> List[MISPTag]: + """Returns a list of tags associated to this Attribute""" + return self.Tag + + @tags.setter + def tags(self, tags: List[MISPTag]): + """Set a list of prepared MISPTag.""" + super()._set_tags(tags) + + +class MISPGalaxyCluster(AbstractMISP): + """A MISP galaxy cluster, storing respective galaxy elements and relations. + Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters + + :param Org: The organisation as a MISPOrganisation + :type Org: MISPOrganisation + :param Orgc: The creator organisation as a MISPOrganisation + :type Orgc: MISPOrganisation + :param SharingGroup: The SharingGroup applied to the cluster, if any + :type SharingGroup: MISPSharingGroup, optional + :param default: Whether the galaxy cluster is a default or custom cluster, default clusters cannot be edited + :type default: bool + :param deleted: Whether the galaxy cluster is deleted or not + :type deleted: bool + :param description: The description of the galaxy cluster + :type description: str + :param distribution: The distribution type, one of 1, 2, 3, 4, 5 + :type distribution: int + :param sharing_group_id: The sharing group ID, if distribution is set to 4 + :type sharing_group_id: int, optional + :param extends_uuid: The UUID of the galaxy cluster it extends + :type extends_uuid: uuid, optional + :param galaxy_id: The ID of the galaxy + :type galaxy_id: int + :param id: The ID of the galaxy cluster + :type id: int + :param org_id: The org's ID + :type org_id: int + :param orgc_id: The creating org's ID + :type orgc_id: int + :param published: Whether the cluster is published or not + :type published: bool + :param source: The source of the galaxy cluster + :type source: str + :param tag_count: The count of events using this galaxy cluster + :type tag_count: int + :param tag_id: The tag ID + :type tag_id: int + :param tag_name: The galaxy cluster's tag + :type tag_name: str + :param type: The type of the galaxy cluster, must match the housing galaxies type + :type type: str + :param value: The value of the galaxy cluster + :type value: str + :param authors: A list of authors of the galaxy cluster + :type authors: list, optional + :param cluster_elements: List of MISPGalaxyClusterElement + :type cluster_elements: list, optional + :param cluster_relations: List of MISPGalaxyClusterRelation, changes must be made through PyMISP instance + :type cluster_relations: list, optional + """ + + def __init__(self): + super().__init__() + self.Galaxy: MISPGalaxy + self.GalaxyElement: List[MISPGalaxyClusterElement] = [] + self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = [] + self.Org: MISPOrganisation + self.Orgc: MISPOrganisation + self.SharingGroup: MISPSharingGroup + + @property + def cluster_elements(self) -> List[MISPGalaxyClusterElement]: + return self.GalaxyElement + + @cluster_elements.setter + def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]): + self.GalaxyElement = cluster_elements + + @property + def cluster_relations(self) -> MISPGalaxyClusterRelation: + return self.GalaxyClusterRelation + + @cluster_relations.setter + def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]): + self.GalaxyClusterRelation = cluster_relations + + def from_dict(self, **kwargs): + # If the default field is set, we shouldn't have distribution or sharing group ID set + if 'GalaxyCluster' in kwargs: + kwargs = kwargs['GalaxyCluster'] + + if kwargs.get('default', False): + blocked_fields = ["distribution" "sharing_group_id"] + for field in blocked_fields: + if kwargs.get(field, None): + raise NewGalaxyClusterError( + f"One of the following fields are set for a default galaxy cluster: {', '.join(blocked_fields)}" + ) + if 'uuid' in kwargs: + self.uuid = kwargs.pop('uuid') + if 'meta' in kwargs: + # Parse the cluster elements from the kwargs meta fields + for key, value in kwargs.pop('meta').items(): + # The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up + if not isinstance(value, list): + value = [value] + for v in value: + self.add_cluster_element(key=key, value=v) + if 'Galaxy' in kwargs: + self.Galaxy = MISPGalaxy() + self.Galaxy.from_dict(**kwargs.pop('Galaxy')) + if 'GalaxyElement' in kwargs: + [self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')] + if 'Org' in kwargs: + self.Org = MISPOrganisation() + self.Org.from_dict(**kwargs.pop('Org')) + if 'Orgc' in kwargs: + self.Orgc = MISPOrganisation() + self.Orgc.from_dict(**kwargs.pop('Orgc')) + if 'GalaxyClusterRelation' in kwargs: + [self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')] + if 'SharingGroup' in kwargs: + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + super().from_dict(**kwargs) + + def add_cluster_element(self, key: str, value: str, **kwargs): + """Add a cluster relation to a MISPGalaxyCluster, key and value are required + + :param key: The key name of the element + :param value: The value of the element + """ + cluster_element = MISPGalaxyClusterElement() + cluster_element.from_dict(key=key, value=value, **kwargs) + self.cluster_elements.append(cluster_element) + return cluster_element + + def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs): + """Add a cluster relation to a MISPGalaxyCluster + + :param referenced_galaxy_cluster_uuid: UUID of the related cluster + :param referenced_galaxy_cluster_type: Relation type + """ + if not getattr(self, "uuid", None): + raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster") + cluster_relation = MISPGalaxyClusterRelation() + cluster_relation.from_dict( + galaxy_cluster_uuid=self.uuid, + referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid, + referenced_galaxy_cluster_type=referenced_galaxy_cluster_type, + **kwargs + ) + self.cluster_relations.append(cluster_relation) + return cluster_relation + + def __repr__(self) -> str: + if hasattr(self, 'value'): + return '<{self.__class__.__name__}(value={self.value})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + +class MISPGalaxy(AbstractMISP): + """Galaxy class, used to view a galaxy and respective clusters, supports the following fields + + :param id: Galaxy ID + :type id: int + :param uuid: Galaxy UUID + :type uuuid: uuid, str + :param name: Galaxy name + :type name: str + :param type: Galaxy type + :type type: str + :param description: Galaxy description + :type description: str + :param version: Galaxy version number + :type version: int + :param icon: Galaxy icon + :type icon: str + :param namespace: Galaxy namespace + :type namespace: str + :param clusters: List of MISPGalaxyCluster + :type clusters: list + """ + + def __init__(self): + super().__init__() + self.GalaxyCluster: List[MISPGalaxyCluster] = [] + + def from_dict(self, **kwargs): + """Galaxy could be in one of the following formats: + {'Galaxy': {}, 'GalaxyCluster': []} + {'Galaxy': {'GalaxyCluster': []}} + """ + + if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True): + # Parse the cluster from the kwargs + [self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')] + + if 'Galaxy' in kwargs: + kwargs = kwargs['Galaxy'] + super().from_dict(**kwargs) + + @property + def clusters(self) -> List[MISPGalaxyCluster]: + return self.GalaxyCluster + + def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster: + """Add a MISP galaxy and sub-clusters into an event. + Supports all other parameters supported by MISPGalaxy""" + + galaxy_cluster = MISPGalaxyCluster() + galaxy_cluster.from_dict(**kwargs) + self.clusters.append(galaxy_cluster) + return galaxy_cluster + + def __repr__(self) -> str: + if hasattr(self, 'name'): + return '<{self.__class__.__name__}(name={self.name})'.format(self=self) + return '<{self.__class__.__name__}(NotInitialized)'.format(self=self) + + class MISPEvent(AbstractMISP): _fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', @@ -1006,6 +1305,7 @@ class MISPEvent(AbstractMISP): self.ShadowAttribute: List[MISPShadowAttribute] = [] self.SharingGroup: MISPSharingGroup self.Tag: List[MISPTag] = [] + self.Galaxy: List[MISPGalaxy] = [] def add_tag(self, tag: Optional[Union[str, MISPTag, dict]] = None, **kwargs) -> MISPTag: return super()._add_tag(tag, **kwargs) @@ -1168,6 +1468,10 @@ class MISPEvent(AbstractMISP): def objects(self) -> List[MISPObject]: return self.Object + @property + def galaxies(self) -> List[MISPGalaxy]: + return self.Galaxy + @objects.setter def objects(self, objects: List[MISPObject]): if all(isinstance(x, MISPObject) for x in objects): @@ -1272,6 +1576,8 @@ class MISPEvent(AbstractMISP): self.set_date(kwargs.pop('date')) if kwargs.get('Attribute'): [self.add_attribute(**a) for a in kwargs.pop('Attribute')] + if kwargs.get('Galaxy'): + [self.add_galaxy(**e) for e in kwargs.pop('Galaxy')] # All other keys if kwargs.get('id'): @@ -1412,6 +1718,14 @@ class MISPEvent(AbstractMISP): return attr_list return attribute + def add_galaxy(self, **kwargs) -> MISPGalaxy: + """Add a MISP galaxy and sub-clusters into an event. + Supports all other parameters supported by MISPGalaxy""" + galaxy = MISPGalaxy() + galaxy.from_dict(**kwargs) + self.galaxies.append(galaxy) + return galaxy + def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject: """Get an object by ID @@ -1610,14 +1924,6 @@ class MISPTaxonomy(AbstractMISP): super().from_dict(**kwargs) -class MISPGalaxy(AbstractMISP): - - def from_dict(self, **kwargs): - if 'Galaxy' in kwargs: - kwargs = kwargs['Galaxy'] - super().from_dict(**kwargs) - - class MISPNoticelist(AbstractMISP): def from_dict(self, **kwargs):