From 96636639c4d688bd064a24537fe078e99f428145 Mon Sep 17 00:00:00 2001 From: Tom King Date: Sat, 30 Jan 2021 13:56:40 +0000 Subject: [PATCH] chg: Add in more Galaxy 2.0 functions and code cleanup --- pymisp/__init__.py | 2 +- pymisp/api.py | 89 +++++++++++++++++++--- pymisp/exceptions.py | 4 - pymisp/mispevent.py | 173 +++++++++++++++++++++++-------------------- 4 files changed, 172 insertions(+), 96 deletions(-) diff --git a/pymisp/__init__.py b/pymisp/__init__.py index e23bc30..11b36cc 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -24,7 +24,7 @@ Response (if any): try: from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa - from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster # noqa + from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation # noqa from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa from .tools import stix # noqa diff --git a/pymisp/api.py b/pymisp/api.py index 48191d9..6f39a24 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -471,9 +471,10 @@ class PyMISP: """ event_report_id = get_uuid_or_id_from_abstract_misp(event_report) request_url = f'eventReports/delete/{event_report_id}' + data = {} if hard: - request_url += "/1" - r = self._prepare_request('POST', request_url) + data['hard'] = 1 + r = self._prepare_request('POST', request_url, data=data) return self._check_json_response(r) # ## END Event Report ### @@ -1302,7 +1303,15 @@ class PyMISP: g.from_dict(**galaxy_j, withCluster=withCluster) return g - def search_galaxy_clusters(self, galaxy: Union[MISPGalaxy, int, str, UUID], context: str = "all", searchall: str = None, pythonify: bool = False) -> Union[Dict, List[MISPGalaxyCluster]]: + def search_galaxy_clusters(self, galaxy: Union[MISPGalaxy, int, str, UUID], context: str = "all", searchall: str = None, pythonify: bool = False) -> Union[List[Dict], List[MISPGalaxyCluster]]: + """Searches the galaxy clusters within a specific galaxy + + :param galaxy: The MISPGalaxy you wish to search in + :param context: The context of how you want to search within the galaxy_ + :param searchall: The search you want to make against the galaxy and context + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy) allowed_context_types = ["all", "default", "custom", "org", "deleted"] if context not in allowed_context_types: @@ -1327,6 +1336,12 @@ class PyMISP: return self._check_json_response(response) def get_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + """Gets a specific galaxy cluster + + :param galaxy_cluster: The MISPGalaxyCluster you want to get + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) r = self._prepare_request('GET', f'galaxy_clusters/view/{cluster_id}') cluster_j = self._check_json_response(r) @@ -1336,7 +1351,17 @@ class PyMISP: gc.from_dict(**cluster_j) return gc - def add_galaxy_cluster(self, galaxy: MISPGalaxy, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + def add_galaxy_cluster(self, galaxy: Union[MISPGalaxy, str, UUID], galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + """Add a new galaxy cluster to a MISP Galaxy + + :param galaxy: A MISPGalaxy (or UUID) where you wish to add the galaxy cluster + :param galaxy_cluster: A MISPGalaxyCluster you wish to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + + if galaxy_cluster.default: + # We can't add default galaxies + raise PyMISPError('You are not able add a default galaxy cluster') galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy) r = self._prepare_request('POST', f'galaxy_clusters/add/{galaxy_id}', data=galaxy_cluster) cluster_j = self._check_json_response(r) @@ -1347,7 +1372,12 @@ class PyMISP: return gc def update_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: - """Update a custom galaxy cluster.""" + """Update a custom galaxy cluster. + + ;param galaxy_cluster: The MISPGalaxyCluster you wish to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + if galaxy_cluster.default: # We can't edit default galaxies raise PyMISPError('You are not able to update a default galaxy cluster') @@ -1360,13 +1390,26 @@ class PyMISP: gc.from_dict(**cluster_j) return gc - def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict: + def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID]) -> Dict: + """Publishes a galaxy cluster + + :param galaxy_cluster: The galaxy cluster you wish to publish + """ + if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default: + raise PyMISPError('You are not able to publish a default galaxy cluster') cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) r = self._prepare_request('POST', f'galaxy_clusters/publish/{cluster_id}') response = self._check_json_response(r) return response - def fork_galaxy_cluster(self, galaxy: Union[MISPGalaxyClusterRelation, int, str, UUID], galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + def fork_galaxy_cluster(self, galaxy: Union[MISPGalaxy, int, str, UUID], galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]: + """Forks an existing galaxy cluster, creating a new one with matching attributes + + :param galaxy: The galaxy (or galaxy ID) where the cluster you want to fork resides + :param galaxy_cluster: The galaxy cluster you wish to fork + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy) cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) # Create a duplicate cluster from the cluster to fork @@ -1383,21 +1426,47 @@ class PyMISP: gc.from_dict(**cluster_j) return gc + def delete_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, id, str, UUID], hard=False) -> Dict: + """Deletes a galaxy cluster from MISP + + :param galaxy_cluster: The MISPGalaxyCluster you wish to delete from MISP + :param hard: flag for hard delete + """ + + if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default: + raise PyMISPError('You are not able to delete a default galaxy cluster') + data = {} + if hard: + data['hard'] = 1 + cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster) + r = self._prepare_request('POST', f'galaxy_clusters/delete/{cluster_id}', data=data) + return self._check_json_response(r) + def add_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict: - """Add a galaxy cluster relation""" + """Add a galaxy cluster relation, cluster relation must include + cluster UUIDs in both directions + + :param galaxy_cluster_relation: The MISPGalaxyClusterRelation to add + """ r = self._prepare_request('POST', 'galaxy_cluster_relations/add/', data=galaxy_cluster_relation) cluster_rel_j = self._check_json_response(r) return cluster_rel_j def update_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict: - """Update a galaxy cluster relation.""" + """Update a galaxy cluster relation + + :param galaxy_cluster_relation: The MISPGalaxyClusterRelation to update + """ cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation) r = self._prepare_request('POST', f'galaxy_cluster_relations/edit/{cluster_relation_id}', data=galaxy_cluster_relation) cluster_rel_j = self._check_json_response(r) return cluster_rel_j def delete_galaxy_cluster_relation(self, galaxy_cluster_relation: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict: - """Delete a galaxy cluster relation""" + """Delete a galaxy cluster relation + + :param galaxy_cluster_relation: The MISPGalaxyClusterRelation to delete + """ cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation) r = self._prepare_request('POST', f'galaxy_cluster_relations/delete/{cluster_relation_id}') cluster_rel_j = self._check_json_response(r) diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index a9857ce..58d3a52 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -27,10 +27,6 @@ class UpdateAttributeError(PyMISPError): pass -class NewGalaxyError(PyMISPError): - pass - - class NewGalaxyClusterError(PyMISPError): pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index e7905e8..a3a2437 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -1054,6 +1054,16 @@ class MISPEventReport(AbstractMISP): class MISPGalaxyClusterElement(AbstractMISP): + """A MISP Galaxy cluster element, providing further info on a cluster + + Creating a new galaxy cluster element can take the following parameters + + :param key: The key/identifier of the element + :type key: str + :param value: The value of the element + :type value: str + """ + def __repr__(self) -> str: if hasattr(self, 'key') and hasattr(self, 'value'): return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self) @@ -1065,26 +1075,30 @@ class MISPGalaxyClusterElement(AbstractMISP): "Instead, create seperate elements for each value") super().__setattr__(key, value) + def from_dict(self, **kwargs): + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('galaxy_cluster_id'): + self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id')) + + super().from_dict(**kwargs) + class MISPGalaxyClusterRelation(AbstractMISP): """A MISP Galaxy cluster relation, linking one cluster to another - :param distribution: The distribution of the relation, one of 0, 1, 2, 3, default 0 - :type distribution: int + Creating a new galaxy cluster can take the following parameters + :param galaxy_cluster_uuid: The UUID of the galaxy the relation links to :type galaxy_cluster_uuid: uuid :param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by :type referenced_galaxy_cluster_type: str :param referenced_galaxy_cluster_uuid: The UUID of the related galaxy :type referenced_galaxy_cluster_uuid: uuid - :param referenced_galaxy_cluster_id: The ID of the related galaxy - :type referenced_galaxy_cluster_id: int, optional - :param galaxy_cluster_id: The ID of the galaxy cluster - :type galaxy_cluster_id: id, optional - :param id: The ID of the cluster relation - :type id: int, optional - :param default: Whether the relation is a default - :type default: bool, optional + :param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0 + :type distribution: int + :param sharing_group_id: The sharing group of the relation, only when distribution is 4 + :type sharing_group_id: int, optional """ def __repr__(self) -> str: @@ -1102,11 +1116,21 @@ class MISPGalaxyClusterRelation(AbstractMISP): def from_dict(self, **kwargs): # Default values for a valid event to send to a MISP instance - self.distribution = kwargs.pop('distribution', 0) - self.distribution = int(self.distribution) + self.distribution = int(kwargs.pop('distribution', 0)) if self.distribution not in [0, 1, 2, 3, 4, 5]: raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) + if kwargs.get('id'): self.id = int(kwargs.pop('id')) if kwargs.get('orgc_id'): @@ -1144,52 +1168,22 @@ class MISPGalaxyCluster(AbstractMISP): """A MISP galaxy cluster, storing respective galaxy elements and relations. Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters - :param Org: The organisation as a MISPOrganisation - :type Org: MISPOrganisation - :param Orgc: The creator organisation as a MISPOrganisation - :type Orgc: MISPOrganisation - :param SharingGroup: The SharingGroup applied to the cluster, if any - :type SharingGroup: MISPSharingGroup, optional - :param default: Whether the galaxy cluster is a default or custom cluster, default clusters cannot be edited - :type default: bool - :param deleted: Whether the galaxy cluster is deleted or not - :type deleted: bool + Creating a new galaxy cluster can take the following parameters + + :param value: The value of the galaxy cluster + :type value: str :param description: The description of the galaxy cluster :type description: str - :param distribution: The distribution type, one of 1, 2, 3, 4, 5 + :param distribution: The distribution type, one of 0, 1, 2, 3, 4 :type distribution: int :param sharing_group_id: The sharing group ID, if distribution is set to 4 :type sharing_group_id: int, optional - :param extends_uuid: The UUID of the galaxy cluster it extends - :type extends_uuid: uuid, optional - :param galaxy_id: The ID of the galaxy - :type galaxy_id: int - :param id: The ID of the galaxy cluster - :type id: int - :param org_id: The org's ID - :type org_id: int - :param orgc_id: The creating org's ID - :type orgc_id: int - :param published: Whether the cluster is published or not - :type published: bool - :param source: The source of the galaxy cluster - :type source: str - :param tag_count: The count of events using this galaxy cluster - :type tag_count: int - :param tag_id: The tag ID - :type tag_id: int - :param tag_name: The galaxy cluster's tag - :type tag_name: str - :param type: The type of the galaxy cluster, must match the housing galaxies type - :type type: str - :param value: The value of the galaxy cluster - :type value: str :param authors: A list of authors of the galaxy cluster - :type authors: list, optional + :type authors: list[str], optional :param cluster_elements: List of MISPGalaxyClusterElement - :type cluster_elements: list, optional - :param cluster_relations: List of MISPGalaxyClusterRelation, changes must be made through PyMISP instance - :type cluster_relations: list, optional + :type cluster_elements: list[MISPGalaxyClusterElement], optional + :param cluster_relations: List of MISPGalaxyClusterRelation + :type cluster_relations: list[MISPGalaxyClusterRelation], optional """ def __init__(self): @@ -1200,6 +1194,8 @@ class MISPGalaxyCluster(AbstractMISP): self.Org: MISPOrganisation self.Orgc: MISPOrganisation self.SharingGroup: MISPSharingGroup + # Set any inititialized cluster to be False + self.default = False @property def cluster_elements(self) -> List[MISPGalaxyClusterElement]: @@ -1217,18 +1213,45 @@ class MISPGalaxyCluster(AbstractMISP): def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]): self.GalaxyClusterRelation = cluster_relations + @property + def meta(self) -> Dict: + """Function to return the galaxy cluster elements as a dictionary structure of lists + that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID + """ + response = defaultdict(list) + for element in self.cluster_elements: + response[element.key].append(element.value) + return dict(response) + def from_dict(self, **kwargs): - # If the default field is set, we shouldn't have distribution or sharing group ID set if 'GalaxyCluster' in kwargs: kwargs = kwargs['GalaxyCluster'] - if kwargs.get('default', False): + self.default = kwargs.pop('default', False) + # If the default field is set, we shouldn't have distribution or sharing group ID set + if self.default: blocked_fields = ["distribution" "sharing_group_id"] for field in blocked_fields: if kwargs.get(field, None): raise NewGalaxyClusterError( - f"One of the following fields are set for a default galaxy cluster: {', '.join(blocked_fields)}" + f"The field '{field}' cannot be set on a default galaxy cluster" ) + + self.distribution = int(kwargs.pop('distribution', 0)) + if self.distribution not in [0, 1, 2, 3, 4]: + raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id)) + if 'uuid' in kwargs: self.uuid = kwargs.pop('uuid') if 'meta' in kwargs: @@ -1257,30 +1280,38 @@ class MISPGalaxyCluster(AbstractMISP): self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) super().from_dict(**kwargs) - def add_cluster_element(self, key: str, value: str, **kwargs): + def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement: """Add a cluster relation to a MISPGalaxyCluster, key and value are required :param key: The key name of the element + :type key: str :param value: The value of the element + :type value: str """ + cluster_element = MISPGalaxyClusterElement() cluster_element.from_dict(key=key, value=value, **kwargs) self.cluster_elements.append(cluster_element) return cluster_element - def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs): - """Add a cluster relation to a MISPGalaxyCluster + def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation: + """Add a cluster relation to a MISPGalaxyCluster. :param referenced_galaxy_cluster_uuid: UUID of the related cluster + :type referenced_galaxy_cluster_uuid: uuid :param referenced_galaxy_cluster_type: Relation type + :type referenced_galaxy_cluster_type: uuid + :param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID + :param galaxy_cluster_uuid: uuid, Optional """ + if not getattr(self, "uuid", None): raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster") cluster_relation = MISPGalaxyClusterRelation() cluster_relation.from_dict( - galaxy_cluster_uuid=self.uuid, referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid, referenced_galaxy_cluster_type=referenced_galaxy_cluster_type, + galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid, **kwargs ) self.cluster_relations.append(cluster_relation) @@ -1293,27 +1324,7 @@ class MISPGalaxyCluster(AbstractMISP): class MISPGalaxy(AbstractMISP): - """Galaxy class, used to view a galaxy and respective clusters, supports the following fields - - :param id: Galaxy ID - :type id: int - :param uuid: Galaxy UUID - :type uuuid: uuid, str - :param name: Galaxy name - :type name: str - :param type: Galaxy type - :type type: str - :param description: Galaxy description - :type description: str - :param version: Galaxy version number - :type version: int - :param icon: Galaxy icon - :type icon: str - :param namespace: Galaxy namespace - :type namespace: str - :param clusters: List of MISPGalaxyCluster - :type clusters: list - """ + """Galaxy class, used to view a galaxy and respective clusters""" def __init__(self): super().__init__() @@ -1338,8 +1349,8 @@ class MISPGalaxy(AbstractMISP): return self.GalaxyCluster def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster: - """Add a MISP galaxy and sub-clusters into an event. - Supports all other parameters supported by MISPGalaxy""" + """Add a MISP galaxy cluster into a MISPGalaxy. + Supports all other parameters supported by MISPGalaxyCluster""" galaxy_cluster = MISPGalaxyCluster() galaxy_cluster.from_dict(**kwargs)