add: Added the `Galaxy` field to MISPAttribute using the MISPGalaxy class

- Including an `add_galaxy` method similar to the
  one used for events
- `attribute.galaxies` gives the list of attached
  galaxy clusters
pull/887/head
Christian Studer 2022-12-01 10:05:38 +01:00
parent 24c5281387
commit 75a100a485
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
3 changed files with 370 additions and 329 deletions

View File

@ -26,14 +26,15 @@ Response (if any):
try:
warning_2022()
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .exceptions import (PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, # noqa
InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse)
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import (MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, # noqa
MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy,
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation,
MISPCorrelationExclusion, MISPGalaxy, MISPDecayingModel)
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -65,6 +65,10 @@ class UnknownMISPObjectTemplate(MISPObjectException):
pass
class InvalidMISPGalaxy(PyMISPError):
pass
class PyMISPInvalidFormat(PyMISPError):
pass

View File

@ -17,7 +17,9 @@ from pathlib import Path
from typing import List, Optional, Union, IO, Dict, Any
from .abstract import AbstractMISP, MISPTag
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, NewGalaxyClusterError, NewGalaxyClusterRelationError
from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
NewGalaxyClusterError, NewGalaxyClusterRelationError)
logger = logging.getLogger('pymisp')
@ -251,6 +253,324 @@ class MISPSighting(AbstractMISP):
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPGalaxyClusterElement(AbstractMISP):
"""A MISP Galaxy cluster element, providing further info on a cluster
Creating a new galaxy cluster element can take the following parameters
:param key: The key/identifier of the element
:type key: str
:param value: The value of the element
:type value: str
"""
key: str
value: str
def __repr__(self) -> str:
if hasattr(self, 'key') and hasattr(self, 'value'):
return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __setattr__(self, key, value):
if key == "value" and isinstance(value, list):
raise PyMISPError("You tried to set a list to a cluster element's value. "
"Instead, create seperate elements for each value")
super().__setattr__(key, value)
def from_dict(self, **kwargs):
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('galaxy_cluster_id'):
self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id'))
super().from_dict(**kwargs)
class MISPGalaxyClusterRelation(AbstractMISP):
"""A MISP Galaxy cluster relation, linking one cluster to another
Creating a new galaxy cluster can take the following parameters
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0
:param sharing_group_id: The sharing group of the relation, only when distribution is 4
"""
def __repr__(self) -> str:
if hasattr(self, "referenced_galaxy_cluster_type"):
return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __init__(self) -> None:
super().__init__()
self.galaxy_cluster_uuid: str
self.referenced_galaxy_cluster_uuid: str
self.distribution: int = 0
self.referenced_galaxy_cluster_type: str
self.Tag: List[MISPTag] = []
def from_dict(self, **kwargs):
# Default values for a valid event to send to a MISP instance
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('orgc_id'):
self.orgc_id = int(kwargs.pop('orgc_id'))
if kwargs.get('org_id'):
self.org_id = int(kwargs.pop('org_id'))
if kwargs.get('galaxy_id'):
self.galaxy_id = int(kwargs.pop('galaxy_id'))
if kwargs.get('tag_id'):
self.tag_id = int(kwargs.pop('tag_id'))
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if kwargs.get('Tag'):
[self.add_tag(**t) for t in kwargs.pop('Tag')]
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@property
def tags(self) -> List[MISPTag]:
"""Returns a list of tags associated to this Attribute"""
return self.Tag
@tags.setter
def tags(self, tags: List[MISPTag]):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
class MISPGalaxyCluster(AbstractMISP):
"""A MISP galaxy cluster, storing respective galaxy elements and relations.
Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters
Creating a new galaxy cluster can take the following parameters
:param value: The value of the galaxy cluster
:type value: str
:param description: The description of the galaxy cluster
:type description: str
:param distribution: The distribution type, one of 0, 1, 2, 3, 4
:type distribution: int
:param sharing_group_id: The sharing group ID, if distribution is set to 4
:type sharing_group_id: int, optional
:param authors: A list of authors of the galaxy cluster
:type authors: list[str], optional
:param cluster_elements: List of MISPGalaxyClusterElement
:type cluster_elements: list[MISPGalaxyClusterElement], optional
:param cluster_relations: List of MISPGalaxyClusterRelation
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
"""
def __init__(self) -> None:
super().__init__()
self.Galaxy: MISPGalaxy
self.GalaxyElement: List[MISPGalaxyClusterElement] = []
self.meta: Dict = {}
self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = []
self.Org: MISPOrganisation
self.Orgc: MISPOrganisation
self.SharingGroup: MISPSharingGroup
self.value: str
# Set any inititialized cluster to be False
self.default = False
@property
def cluster_elements(self) -> List[MISPGalaxyClusterElement]:
return self.GalaxyElement
@cluster_elements.setter
def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]):
self.GalaxyElement = cluster_elements
@property
def cluster_relations(self) -> List[MISPGalaxyClusterRelation]:
return self.GalaxyClusterRelation
@cluster_relations.setter
def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]):
self.GalaxyClusterRelation = cluster_relations
def parse_meta_as_elements(self):
"""Function to parse the meta field into GalaxyClusterElements"""
# Parse the cluster elements from the kwargs meta fields
for key, value in self.meta.items():
# The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up
if not isinstance(value, list):
value = [value]
for v in value:
self.add_cluster_element(key=key, value=v)
@property
def elements_meta(self) -> Dict:
"""Function to return the galaxy cluster elements as a dictionary structure of lists
that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID
"""
response = defaultdict(list)
for element in self.cluster_elements:
response[element.key].append(element.value)
return dict(response)
def from_dict(self, **kwargs):
if 'GalaxyCluster' in kwargs:
kwargs = kwargs['GalaxyCluster']
self.default = kwargs.pop('default', False)
# If the default field is set, we shouldn't have distribution or sharing group ID set
if self.default:
blocked_fields = ["distribution" "sharing_group_id"]
for field in blocked_fields:
if kwargs.get(field, None):
raise NewGalaxyClusterError(
f"The field '{field}' cannot be set on a default galaxy cluster"
)
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4]:
raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if 'uuid' in kwargs:
self.uuid = kwargs.pop('uuid')
if 'meta' in kwargs:
self.meta = kwargs.pop('meta')
if 'Galaxy' in kwargs:
self.Galaxy = MISPGalaxy()
self.Galaxy.from_dict(**kwargs.pop('Galaxy'))
if 'GalaxyElement' in kwargs:
[self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')]
if 'Org' in kwargs:
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
if 'Orgc' in kwargs:
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if 'GalaxyClusterRelation' in kwargs:
[self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')]
if 'SharingGroup' in kwargs:
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement:
"""Add a cluster relation to a MISPGalaxyCluster, key and value are required
:param key: The key name of the element
:type key: str
:param value: The value of the element
:type value: str
"""
cluster_element = MISPGalaxyClusterElement()
cluster_element.from_dict(key=key, value=value, **kwargs)
self.cluster_elements.append(cluster_element)
return cluster_element
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation:
"""Add a cluster relation to a MISPGalaxyCluster.
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
:type referenced_galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: Relation type
:type referenced_galaxy_cluster_type: uuid
:param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID
:param galaxy_cluster_uuid: uuid, Optional
"""
if not getattr(self, "uuid", None):
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
cluster_relation = MISPGalaxyClusterRelation()
if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster):
referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid
cluster_relation.from_dict(
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,
galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid,
**kwargs
)
self.cluster_relations.append(cluster_relation)
return cluster_relation
def __repr__(self) -> str:
if hasattr(self, 'value'):
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPGalaxy(AbstractMISP):
"""Galaxy class, used to view a galaxy and respective clusters"""
def __init__(self) -> None:
super().__init__()
self.GalaxyCluster: List[MISPGalaxyCluster] = []
self.name: str
def from_dict(self, **kwargs):
"""Galaxy could be in one of the following formats:
{'Galaxy': {}, 'GalaxyCluster': []}
{'Galaxy': {'GalaxyCluster': []}}
"""
if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True):
# Parse the cluster from the kwargs
[self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')]
if 'Galaxy' in kwargs:
kwargs = kwargs['Galaxy']
super().from_dict(**kwargs)
@property
def clusters(self) -> List[MISPGalaxyCluster]:
return self.GalaxyCluster
def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster:
"""Add a MISP galaxy cluster into a MISPGalaxy.
Supports all other parameters supported by MISPGalaxyCluster"""
galaxy_cluster = MISPGalaxyCluster()
galaxy_cluster.from_dict(**kwargs)
self.clusters.append(galaxy_cluster)
return galaxy_cluster
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPAttribute(AbstractMISP):
_fields_for_feed: set = {'uuid', 'value', 'category', 'type', 'comment', 'data',
'deleted', 'timestamp', 'to_ids', 'disable_correlation',
@ -277,6 +597,7 @@ class MISPAttribute(AbstractMISP):
self.SharingGroup: MISPSharingGroup
self.Sighting: List[MISPSighting] = []
self.Tag: List[MISPTag] = []
self.Galaxy: List[MISPGalaxy] = []
# For search
self.Event: MISPEvent
@ -298,6 +619,27 @@ class MISPAttribute(AbstractMISP):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy:
"""Add a galaxy to the Attribute, either by passing a MISPGalaxy or a dictionary"""
if isinstance(galaxy, MISPGalaxy):
self.galaxies.append(galaxy)
return galaxy
if isinstance(galaxy, dict):
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**galaxy)
elif kwargs:
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**kwargs)
else:
raise InvalidMISPGalaxy("A Galaxy to add to an existing Attribute needs to be either a MISPGalaxy or a plain python dictionary")
self.galaxies.append(misp_galaxy)
return misp_galaxy
@property
def galaxies(self) -> List[MISPGalaxy]:
"""Returns a list of galaxies associated to this Attribute"""
return self.Galaxy
def _prepare_data(self, data: Optional[Union[Path, str, bytes, BytesIO]]):
if not data:
super().__setattr__('data', None)
@ -588,6 +930,8 @@ class MISPAttribute(AbstractMISP):
if kwargs.get('Tag'):
[self.add_tag(tag) for tag in kwargs.pop('Tag')]
if kwargs.get('Galaxy'):
[self.add_galaxy(galaxy) for galaxy in kwargs.pop('Galaxy')]
if kwargs.get('Sighting'):
[self.add_sighting(sighting) for sighting in kwargs.pop('Sighting')]
if kwargs.get('ShadowAttribute'):
@ -1162,324 +1506,6 @@ class MISPEventReport(AbstractMISP):
self.content = ''
class MISPGalaxyClusterElement(AbstractMISP):
"""A MISP Galaxy cluster element, providing further info on a cluster
Creating a new galaxy cluster element can take the following parameters
:param key: The key/identifier of the element
:type key: str
:param value: The value of the element
:type value: str
"""
key: str
value: str
def __repr__(self) -> str:
if hasattr(self, 'key') and hasattr(self, 'value'):
return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __setattr__(self, key, value):
if key == "value" and isinstance(value, list):
raise PyMISPError("You tried to set a list to a cluster element's value. "
"Instead, create seperate elements for each value")
super().__setattr__(key, value)
def from_dict(self, **kwargs):
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('galaxy_cluster_id'):
self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id'))
super().from_dict(**kwargs)
class MISPGalaxyClusterRelation(AbstractMISP):
"""A MISP Galaxy cluster relation, linking one cluster to another
Creating a new galaxy cluster can take the following parameters
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0
:param sharing_group_id: The sharing group of the relation, only when distribution is 4
"""
def __repr__(self) -> str:
if hasattr(self, "referenced_galaxy_cluster_type"):
return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __init__(self) -> None:
super().__init__()
self.galaxy_cluster_uuid: str
self.referenced_galaxy_cluster_uuid: str
self.distribution: int = 0
self.referenced_galaxy_cluster_type: str
self.Tag: List[MISPTag] = []
def from_dict(self, **kwargs):
# Default values for a valid event to send to a MISP instance
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('orgc_id'):
self.orgc_id = int(kwargs.pop('orgc_id'))
if kwargs.get('org_id'):
self.org_id = int(kwargs.pop('org_id'))
if kwargs.get('galaxy_id'):
self.galaxy_id = int(kwargs.pop('galaxy_id'))
if kwargs.get('tag_id'):
self.tag_id = int(kwargs.pop('tag_id'))
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if kwargs.get('Tag'):
[self.add_tag(**t) for t in kwargs.pop('Tag')]
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@property
def tags(self) -> List[MISPTag]:
"""Returns a list of tags associated to this Attribute"""
return self.Tag
@tags.setter
def tags(self, tags: List[MISPTag]):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
class MISPGalaxyCluster(AbstractMISP):
"""A MISP galaxy cluster, storing respective galaxy elements and relations.
Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters
Creating a new galaxy cluster can take the following parameters
:param value: The value of the galaxy cluster
:type value: str
:param description: The description of the galaxy cluster
:type description: str
:param distribution: The distribution type, one of 0, 1, 2, 3, 4
:type distribution: int
:param sharing_group_id: The sharing group ID, if distribution is set to 4
:type sharing_group_id: int, optional
:param authors: A list of authors of the galaxy cluster
:type authors: list[str], optional
:param cluster_elements: List of MISPGalaxyClusterElement
:type cluster_elements: list[MISPGalaxyClusterElement], optional
:param cluster_relations: List of MISPGalaxyClusterRelation
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
"""
def __init__(self) -> None:
super().__init__()
self.Galaxy: MISPGalaxy
self.GalaxyElement: List[MISPGalaxyClusterElement] = []
self.meta: Dict = {}
self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = []
self.Org: MISPOrganisation
self.Orgc: MISPOrganisation
self.SharingGroup: MISPSharingGroup
self.value: str
# Set any inititialized cluster to be False
self.default = False
@property
def cluster_elements(self) -> List[MISPGalaxyClusterElement]:
return self.GalaxyElement
@cluster_elements.setter
def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]):
self.GalaxyElement = cluster_elements
@property
def cluster_relations(self) -> List[MISPGalaxyClusterRelation]:
return self.GalaxyClusterRelation
@cluster_relations.setter
def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]):
self.GalaxyClusterRelation = cluster_relations
def parse_meta_as_elements(self):
"""Function to parse the meta field into GalaxyClusterElements"""
# Parse the cluster elements from the kwargs meta fields
for key, value in self.meta.items():
# The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up
if not isinstance(value, list):
value = [value]
for v in value:
self.add_cluster_element(key=key, value=v)
@property
def elements_meta(self) -> Dict:
"""Function to return the galaxy cluster elements as a dictionary structure of lists
that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID
"""
response = defaultdict(list)
for element in self.cluster_elements:
response[element.key].append(element.value)
return dict(response)
def from_dict(self, **kwargs):
if 'GalaxyCluster' in kwargs:
kwargs = kwargs['GalaxyCluster']
self.default = kwargs.pop('default', False)
# If the default field is set, we shouldn't have distribution or sharing group ID set
if self.default:
blocked_fields = ["distribution" "sharing_group_id"]
for field in blocked_fields:
if kwargs.get(field, None):
raise NewGalaxyClusterError(
f"The field '{field}' cannot be set on a default galaxy cluster"
)
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4]:
raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if 'uuid' in kwargs:
self.uuid = kwargs.pop('uuid')
if 'meta' in kwargs:
self.meta = kwargs.pop('meta')
if 'Galaxy' in kwargs:
self.Galaxy = MISPGalaxy()
self.Galaxy.from_dict(**kwargs.pop('Galaxy'))
if 'GalaxyElement' in kwargs:
[self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')]
if 'Org' in kwargs:
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
if 'Orgc' in kwargs:
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if 'GalaxyClusterRelation' in kwargs:
[self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')]
if 'SharingGroup' in kwargs:
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement:
"""Add a cluster relation to a MISPGalaxyCluster, key and value are required
:param key: The key name of the element
:type key: str
:param value: The value of the element
:type value: str
"""
cluster_element = MISPGalaxyClusterElement()
cluster_element.from_dict(key=key, value=value, **kwargs)
self.cluster_elements.append(cluster_element)
return cluster_element
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation:
"""Add a cluster relation to a MISPGalaxyCluster.
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
:type referenced_galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: Relation type
:type referenced_galaxy_cluster_type: uuid
:param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID
:param galaxy_cluster_uuid: uuid, Optional
"""
if not getattr(self, "uuid", None):
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
cluster_relation = MISPGalaxyClusterRelation()
if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster):
referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid
cluster_relation.from_dict(
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,
galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid,
**kwargs
)
self.cluster_relations.append(cluster_relation)
return cluster_relation
def __repr__(self) -> str:
if hasattr(self, 'value'):
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPGalaxy(AbstractMISP):
"""Galaxy class, used to view a galaxy and respective clusters"""
def __init__(self) -> None:
super().__init__()
self.GalaxyCluster: List[MISPGalaxyCluster] = []
self.name: str
def from_dict(self, **kwargs):
"""Galaxy could be in one of the following formats:
{'Galaxy': {}, 'GalaxyCluster': []}
{'Galaxy': {'GalaxyCluster': []}}
"""
if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True):
# Parse the cluster from the kwargs
[self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')]
if 'Galaxy' in kwargs:
kwargs = kwargs['Galaxy']
super().from_dict(**kwargs)
@property
def clusters(self) -> List[MISPGalaxyCluster]:
return self.GalaxyCluster
def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster:
"""Add a MISP galaxy cluster into a MISPGalaxy.
Supports all other parameters supported by MISPGalaxyCluster"""
galaxy_cluster = MISPGalaxyCluster()
galaxy_cluster.from_dict(**kwargs)
self.clusters.append(galaxy_cluster)
return galaxy_cluster
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPEvent(AbstractMISP):
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
@ -1945,13 +1971,23 @@ class MISPEvent(AbstractMISP):
self.edited = True
return event_report
def add_galaxy(self, **kwargs) -> MISPGalaxy:
"""Add a MISP galaxy and sub-clusters into an event.
def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy:
"""Add a galaxy and sub-clusters into an event, either by passing
a MISPGalaxy or a dictionary.
Supports all other parameters supported by MISPGalaxy"""
galaxy = MISPGalaxy()
galaxy.from_dict(**kwargs)
self.galaxies.append(galaxy)
return galaxy
if isinstance(galaxy, MISPGalaxy):
self.galaxies.append(galaxy)
return galaxy
if isinstance(galaxy, dict):
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**galaxy)
elif kwargs:
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**kwargs)
else:
raise InvalidMISPGalaxy("A Galaxy to add to an existing Event needs to be either a MISPGalaxy or a plain python dictionary")
self.galaxies.append(misp_galaxy)
return misp_galaxy
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
"""Get an object by ID