chg: Add in more Galaxy 2.0 functions and code cleanup

pull/682/head
Tom King 2021-01-30 13:56:40 +00:00
parent eb28f01f01
commit 96636639c4
4 changed files with 172 additions and 96 deletions

View File

@ -24,7 +24,7 @@ Response (if any):
try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -471,9 +471,10 @@ class PyMISP:
"""
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
request_url = f'eventReports/delete/{event_report_id}'
data = {}
if hard:
request_url += "/1"
r = self._prepare_request('POST', request_url)
data['hard'] = 1
r = self._prepare_request('POST', request_url, data=data)
return self._check_json_response(r)
# ## END Event Report ###
@ -1302,7 +1303,15 @@ class PyMISP:
g.from_dict(**galaxy_j, withCluster=withCluster)
return g
def search_galaxy_clusters(self, galaxy: Union[MISPGalaxy, int, str, UUID], context: str = "all", searchall: str = None, pythonify: bool = False) -> Union[Dict, List[MISPGalaxyCluster]]:
def search_galaxy_clusters(self, galaxy: Union[MISPGalaxy, int, str, UUID], context: str = "all", searchall: str = None, pythonify: bool = False) -> Union[List[Dict], List[MISPGalaxyCluster]]:
"""Searches the galaxy clusters within a specific galaxy
:param galaxy: The MISPGalaxy you wish to search in
:param context: The context of how you want to search within the galaxy_
:param searchall: The search you want to make against the galaxy and context
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
allowed_context_types = ["all", "default", "custom", "org", "deleted"]
if context not in allowed_context_types:
@ -1327,6 +1336,12 @@ class PyMISP:
return self._check_json_response(response)
def get_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Gets a specific galaxy cluster
:param galaxy_cluster: The MISPGalaxyCluster you want to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('GET', f'galaxy_clusters/view/{cluster_id}')
cluster_j = self._check_json_response(r)
@ -1336,7 +1351,17 @@ class PyMISP:
gc.from_dict(**cluster_j)
return gc
def add_galaxy_cluster(self, galaxy: MISPGalaxy, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
def add_galaxy_cluster(self, galaxy: Union[MISPGalaxy, str, UUID], galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Add a new galaxy cluster to a MISP Galaxy
:param galaxy: A MISPGalaxy (or UUID) where you wish to add the galaxy cluster
:param galaxy_cluster: A MISPGalaxyCluster you wish to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if galaxy_cluster.default:
# We can't add default galaxies
raise PyMISPError('You are not able add a default galaxy cluster')
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
r = self._prepare_request('POST', f'galaxy_clusters/add/{galaxy_id}', data=galaxy_cluster)
cluster_j = self._check_json_response(r)
@ -1347,7 +1372,12 @@ class PyMISP:
return gc
def update_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Update a custom galaxy cluster."""
"""Update a custom galaxy cluster.
;param galaxy_cluster: The MISPGalaxyCluster you wish to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if galaxy_cluster.default:
# We can't edit default galaxies
raise PyMISPError('You are not able to update a default galaxy cluster')
@ -1360,13 +1390,26 @@ class PyMISP:
gc.from_dict(**cluster_j)
return gc
def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict:
def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID]) -> Dict:
"""Publishes a galaxy cluster
:param galaxy_cluster: The galaxy cluster you wish to publish
"""
if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default:
raise PyMISPError('You are not able to publish a default galaxy cluster')
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('POST', f'galaxy_clusters/publish/{cluster_id}')
response = self._check_json_response(r)
return response
def fork_galaxy_cluster(self, galaxy: Union[MISPGalaxyClusterRelation, int, str, UUID], galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
def fork_galaxy_cluster(self, galaxy: Union[MISPGalaxy, int, str, UUID], galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Forks an existing galaxy cluster, creating a new one with matching attributes
:param galaxy: The galaxy (or galaxy ID) where the cluster you want to fork resides
:param galaxy_cluster: The galaxy cluster you wish to fork
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
# Create a duplicate cluster from the cluster to fork
@ -1383,21 +1426,47 @@ class PyMISP:
gc.from_dict(**cluster_j)
return gc
def delete_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, id, str, UUID], hard=False) -> Dict:
"""Deletes a galaxy cluster from MISP
:param galaxy_cluster: The MISPGalaxyCluster you wish to delete from MISP
:param hard: flag for hard delete
"""
if isinstance(galaxy_cluster, MISPGalaxyCluster) and galaxy_cluster.default:
raise PyMISPError('You are not able to delete a default galaxy cluster')
data = {}
if hard:
data['hard'] = 1
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('POST', f'galaxy_clusters/delete/{cluster_id}', data=data)
return self._check_json_response(r)
def add_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict:
"""Add a galaxy cluster relation"""
"""Add a galaxy cluster relation, cluster relation must include
cluster UUIDs in both directions
:param galaxy_cluster_relation: The MISPGalaxyClusterRelation to add
"""
r = self._prepare_request('POST', 'galaxy_cluster_relations/add/', data=galaxy_cluster_relation)
cluster_rel_j = self._check_json_response(r)
return cluster_rel_j
def update_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict:
"""Update a galaxy cluster relation."""
"""Update a galaxy cluster relation
:param galaxy_cluster_relation: The MISPGalaxyClusterRelation to update
"""
cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation)
r = self._prepare_request('POST', f'galaxy_cluster_relations/edit/{cluster_relation_id}', data=galaxy_cluster_relation)
cluster_rel_j = self._check_json_response(r)
return cluster_rel_j
def delete_galaxy_cluster_relation(self, galaxy_cluster_relation: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict:
"""Delete a galaxy cluster relation"""
"""Delete a galaxy cluster relation
:param galaxy_cluster_relation: The MISPGalaxyClusterRelation to delete
"""
cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation)
r = self._prepare_request('POST', f'galaxy_cluster_relations/delete/{cluster_relation_id}')
cluster_rel_j = self._check_json_response(r)

View File

@ -27,10 +27,6 @@ class UpdateAttributeError(PyMISPError):
pass
class NewGalaxyError(PyMISPError):
pass
class NewGalaxyClusterError(PyMISPError):
pass

View File

@ -1054,6 +1054,16 @@ class MISPEventReport(AbstractMISP):
class MISPGalaxyClusterElement(AbstractMISP):
"""A MISP Galaxy cluster element, providing further info on a cluster
Creating a new galaxy cluster element can take the following parameters
:param key: The key/identifier of the element
:type key: str
:param value: The value of the element
:type value: str
"""
def __repr__(self) -> str:
if hasattr(self, 'key') and hasattr(self, 'value'):
return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self)
@ -1065,26 +1075,30 @@ class MISPGalaxyClusterElement(AbstractMISP):
"Instead, create seperate elements for each value")
super().__setattr__(key, value)
def from_dict(self, **kwargs):
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('galaxy_cluster_id'):
self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id'))
super().from_dict(**kwargs)
class MISPGalaxyClusterRelation(AbstractMISP):
"""A MISP Galaxy cluster relation, linking one cluster to another
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, default 0
:type distribution: int
Creating a new galaxy cluster can take the following parameters
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
:type galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
:type referenced_galaxy_cluster_type: str
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
:type referenced_galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_id: The ID of the related galaxy
:type referenced_galaxy_cluster_id: int, optional
:param galaxy_cluster_id: The ID of the galaxy cluster
:type galaxy_cluster_id: id, optional
:param id: The ID of the cluster relation
:type id: int, optional
:param default: Whether the relation is a default
:type default: bool, optional
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0
:type distribution: int
:param sharing_group_id: The sharing group of the relation, only when distribution is 4
:type sharing_group_id: int, optional
"""
def __repr__(self) -> str:
@ -1102,11 +1116,21 @@ class MISPGalaxyClusterRelation(AbstractMISP):
def from_dict(self, **kwargs):
# Default values for a valid event to send to a MISP instance
self.distribution = kwargs.pop('distribution', 0)
self.distribution = int(self.distribution)
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('orgc_id'):
@ -1144,52 +1168,22 @@ class MISPGalaxyCluster(AbstractMISP):
"""A MISP galaxy cluster, storing respective galaxy elements and relations.
Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters
:param Org: The organisation as a MISPOrganisation
:type Org: MISPOrganisation
:param Orgc: The creator organisation as a MISPOrganisation
:type Orgc: MISPOrganisation
:param SharingGroup: The SharingGroup applied to the cluster, if any
:type SharingGroup: MISPSharingGroup, optional
:param default: Whether the galaxy cluster is a default or custom cluster, default clusters cannot be edited
:type default: bool
:param deleted: Whether the galaxy cluster is deleted or not
:type deleted: bool
Creating a new galaxy cluster can take the following parameters
:param value: The value of the galaxy cluster
:type value: str
:param description: The description of the galaxy cluster
:type description: str
:param distribution: The distribution type, one of 1, 2, 3, 4, 5
:param distribution: The distribution type, one of 0, 1, 2, 3, 4
:type distribution: int
:param sharing_group_id: The sharing group ID, if distribution is set to 4
:type sharing_group_id: int, optional
:param extends_uuid: The UUID of the galaxy cluster it extends
:type extends_uuid: uuid, optional
:param galaxy_id: The ID of the galaxy
:type galaxy_id: int
:param id: The ID of the galaxy cluster
:type id: int
:param org_id: The org's ID
:type org_id: int
:param orgc_id: The creating org's ID
:type orgc_id: int
:param published: Whether the cluster is published or not
:type published: bool
:param source: The source of the galaxy cluster
:type source: str
:param tag_count: The count of events using this galaxy cluster
:type tag_count: int
:param tag_id: The tag ID
:type tag_id: int
:param tag_name: The galaxy cluster's tag
:type tag_name: str
:param type: The type of the galaxy cluster, must match the housing galaxies type
:type type: str
:param value: The value of the galaxy cluster
:type value: str
:param authors: A list of authors of the galaxy cluster
:type authors: list, optional
:type authors: list[str], optional
:param cluster_elements: List of MISPGalaxyClusterElement
:type cluster_elements: list, optional
:param cluster_relations: List of MISPGalaxyClusterRelation, changes must be made through PyMISP instance
:type cluster_relations: list, optional
:type cluster_elements: list[MISPGalaxyClusterElement], optional
:param cluster_relations: List of MISPGalaxyClusterRelation
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
"""
def __init__(self):
@ -1200,6 +1194,8 @@ class MISPGalaxyCluster(AbstractMISP):
self.Org: MISPOrganisation
self.Orgc: MISPOrganisation
self.SharingGroup: MISPSharingGroup
# Set any inititialized cluster to be False
self.default = False
@property
def cluster_elements(self) -> List[MISPGalaxyClusterElement]:
@ -1217,18 +1213,45 @@ class MISPGalaxyCluster(AbstractMISP):
def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]):
self.GalaxyClusterRelation = cluster_relations
@property
def meta(self) -> Dict:
"""Function to return the galaxy cluster elements as a dictionary structure of lists
that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID
"""
response = defaultdict(list)
for element in self.cluster_elements:
response[element.key].append(element.value)
return dict(response)
def from_dict(self, **kwargs):
# If the default field is set, we shouldn't have distribution or sharing group ID set
if 'GalaxyCluster' in kwargs:
kwargs = kwargs['GalaxyCluster']
if kwargs.get('default', False):
self.default = kwargs.pop('default', False)
# If the default field is set, we shouldn't have distribution or sharing group ID set
if self.default:
blocked_fields = ["distribution" "sharing_group_id"]
for field in blocked_fields:
if kwargs.get(field, None):
raise NewGalaxyClusterError(
f"One of the following fields are set for a default galaxy cluster: {', '.join(blocked_fields)}"
f"The field '{field}' cannot be set on a default galaxy cluster"
)
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4]:
raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if 'uuid' in kwargs:
self.uuid = kwargs.pop('uuid')
if 'meta' in kwargs:
@ -1257,30 +1280,38 @@ class MISPGalaxyCluster(AbstractMISP):
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_cluster_element(self, key: str, value: str, **kwargs):
def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement:
"""Add a cluster relation to a MISPGalaxyCluster, key and value are required
:param key: The key name of the element
:type key: str
:param value: The value of the element
:type value: str
"""
cluster_element = MISPGalaxyClusterElement()
cluster_element.from_dict(key=key, value=value, **kwargs)
self.cluster_elements.append(cluster_element)
return cluster_element
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs):
"""Add a cluster relation to a MISPGalaxyCluster
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation:
"""Add a cluster relation to a MISPGalaxyCluster.
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
:type referenced_galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: Relation type
:type referenced_galaxy_cluster_type: uuid
:param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID
:param galaxy_cluster_uuid: uuid, Optional
"""
if not getattr(self, "uuid", None):
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
cluster_relation = MISPGalaxyClusterRelation()
cluster_relation.from_dict(
galaxy_cluster_uuid=self.uuid,
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,
galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid,
**kwargs
)
self.cluster_relations.append(cluster_relation)
@ -1293,27 +1324,7 @@ class MISPGalaxyCluster(AbstractMISP):
class MISPGalaxy(AbstractMISP):
"""Galaxy class, used to view a galaxy and respective clusters, supports the following fields
:param id: Galaxy ID
:type id: int
:param uuid: Galaxy UUID
:type uuuid: uuid, str
:param name: Galaxy name
:type name: str
:param type: Galaxy type
:type type: str
:param description: Galaxy description
:type description: str
:param version: Galaxy version number
:type version: int
:param icon: Galaxy icon
:type icon: str
:param namespace: Galaxy namespace
:type namespace: str
:param clusters: List of MISPGalaxyCluster
:type clusters: list
"""
"""Galaxy class, used to view a galaxy and respective clusters"""
def __init__(self):
super().__init__()
@ -1338,8 +1349,8 @@ class MISPGalaxy(AbstractMISP):
return self.GalaxyCluster
def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster:
"""Add a MISP galaxy and sub-clusters into an event.
Supports all other parameters supported by MISPGalaxy"""
"""Add a MISP galaxy cluster into a MISPGalaxy.
Supports all other parameters supported by MISPGalaxyCluster"""
galaxy_cluster = MISPGalaxyCluster()
galaxy_cluster.from_dict(**kwargs)