mirror of https://github.com/MISP/PyMISP
chg: Re-order classes
parent
77b0b3ac3e
commit
1fb274821f
|
@ -253,324 +253,6 @@ class MISPSighting(AbstractMISP):
|
|||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
class MISPGalaxyClusterElement(AbstractMISP):
|
||||
"""A MISP Galaxy cluster element, providing further info on a cluster
|
||||
|
||||
Creating a new galaxy cluster element can take the following parameters
|
||||
|
||||
:param key: The key/identifier of the element
|
||||
:type key: str
|
||||
:param value: The value of the element
|
||||
:type value: str
|
||||
"""
|
||||
|
||||
key: str
|
||||
value: str
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'key') and hasattr(self, 'value'):
|
||||
return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key == "value" and isinstance(value, list):
|
||||
raise PyMISPError("You tried to set a list to a cluster element's value. "
|
||||
"Instead, create seperate elements for each value")
|
||||
super().__setattr__(key, value)
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('galaxy_cluster_id'):
|
||||
self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id'))
|
||||
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
|
||||
class MISPGalaxyClusterRelation(AbstractMISP):
|
||||
"""A MISP Galaxy cluster relation, linking one cluster to another
|
||||
|
||||
Creating a new galaxy cluster can take the following parameters
|
||||
|
||||
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
|
||||
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
|
||||
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
|
||||
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0
|
||||
:param sharing_group_id: The sharing group of the relation, only when distribution is 4
|
||||
"""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, "referenced_galaxy_cluster_type"):
|
||||
return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.galaxy_cluster_uuid: str
|
||||
self.referenced_galaxy_cluster_uuid: str
|
||||
self.distribution: int = 0
|
||||
self.referenced_galaxy_cluster_type: str
|
||||
self.Tag: List[MISPTag] = []
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
# Default values for a valid event to send to a MISP instance
|
||||
self.distribution = int(kwargs.pop('distribution', 0))
|
||||
if self.distribution not in [0, 1, 2, 3, 4, 5]:
|
||||
raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
|
||||
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
|
||||
if self.distribution == 4:
|
||||
# The distribution is set to sharing group, a sharing_group_id is required.
|
||||
if not hasattr(self, 'sharing_group_id'):
|
||||
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.')
|
||||
elif not self.sharing_group_id:
|
||||
# Cannot be None or 0 either.
|
||||
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
|
||||
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('orgc_id'):
|
||||
self.orgc_id = int(kwargs.pop('orgc_id'))
|
||||
if kwargs.get('org_id'):
|
||||
self.org_id = int(kwargs.pop('org_id'))
|
||||
if kwargs.get('galaxy_id'):
|
||||
self.galaxy_id = int(kwargs.pop('galaxy_id'))
|
||||
if kwargs.get('tag_id'):
|
||||
self.tag_id = int(kwargs.pop('tag_id'))
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
if kwargs.get('Tag'):
|
||||
[self.add_tag(**t) for t in kwargs.pop('Tag')]
|
||||
if kwargs.get('SharingGroup'):
|
||||
self.SharingGroup = MISPSharingGroup()
|
||||
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag:
|
||||
return super()._add_tag(tag, **kwargs)
|
||||
|
||||
@property
|
||||
def tags(self) -> List[MISPTag]:
|
||||
"""Returns a list of tags associated to this Attribute"""
|
||||
return self.Tag
|
||||
|
||||
@tags.setter
|
||||
def tags(self, tags: List[MISPTag]):
|
||||
"""Set a list of prepared MISPTag."""
|
||||
super()._set_tags(tags)
|
||||
|
||||
|
||||
class MISPGalaxyCluster(AbstractMISP):
|
||||
"""A MISP galaxy cluster, storing respective galaxy elements and relations.
|
||||
Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters
|
||||
|
||||
Creating a new galaxy cluster can take the following parameters
|
||||
|
||||
:param value: The value of the galaxy cluster
|
||||
:type value: str
|
||||
:param description: The description of the galaxy cluster
|
||||
:type description: str
|
||||
:param distribution: The distribution type, one of 0, 1, 2, 3, 4
|
||||
:type distribution: int
|
||||
:param sharing_group_id: The sharing group ID, if distribution is set to 4
|
||||
:type sharing_group_id: int, optional
|
||||
:param authors: A list of authors of the galaxy cluster
|
||||
:type authors: list[str], optional
|
||||
:param cluster_elements: List of MISPGalaxyClusterElement
|
||||
:type cluster_elements: list[MISPGalaxyClusterElement], optional
|
||||
:param cluster_relations: List of MISPGalaxyClusterRelation
|
||||
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.Galaxy: MISPGalaxy
|
||||
self.GalaxyElement: List[MISPGalaxyClusterElement] = []
|
||||
self.meta: Dict = {}
|
||||
self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = []
|
||||
self.Org: MISPOrganisation
|
||||
self.Orgc: MISPOrganisation
|
||||
self.SharingGroup: MISPSharingGroup
|
||||
self.value: str
|
||||
# Set any inititialized cluster to be False
|
||||
self.default = False
|
||||
|
||||
@property
|
||||
def cluster_elements(self) -> List[MISPGalaxyClusterElement]:
|
||||
return self.GalaxyElement
|
||||
|
||||
@cluster_elements.setter
|
||||
def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]):
|
||||
self.GalaxyElement = cluster_elements
|
||||
|
||||
@property
|
||||
def cluster_relations(self) -> List[MISPGalaxyClusterRelation]:
|
||||
return self.GalaxyClusterRelation
|
||||
|
||||
@cluster_relations.setter
|
||||
def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]):
|
||||
self.GalaxyClusterRelation = cluster_relations
|
||||
|
||||
def parse_meta_as_elements(self):
|
||||
"""Function to parse the meta field into GalaxyClusterElements"""
|
||||
# Parse the cluster elements from the kwargs meta fields
|
||||
for key, value in self.meta.items():
|
||||
# The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up
|
||||
if not isinstance(value, list):
|
||||
value = [value]
|
||||
for v in value:
|
||||
self.add_cluster_element(key=key, value=v)
|
||||
|
||||
@property
|
||||
def elements_meta(self) -> Dict:
|
||||
"""Function to return the galaxy cluster elements as a dictionary structure of lists
|
||||
that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID
|
||||
"""
|
||||
response = defaultdict(list)
|
||||
for element in self.cluster_elements:
|
||||
response[element.key].append(element.value)
|
||||
return dict(response)
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if 'GalaxyCluster' in kwargs:
|
||||
kwargs = kwargs['GalaxyCluster']
|
||||
self.default = kwargs.pop('default', False)
|
||||
# If the default field is set, we shouldn't have distribution or sharing group ID set
|
||||
if self.default:
|
||||
blocked_fields = ["distribution" "sharing_group_id"]
|
||||
for field in blocked_fields:
|
||||
if kwargs.get(field, None):
|
||||
raise NewGalaxyClusterError(
|
||||
f"The field '{field}' cannot be set on a default galaxy cluster"
|
||||
)
|
||||
|
||||
self.distribution = int(kwargs.pop('distribution', 0))
|
||||
if self.distribution not in [0, 1, 2, 3, 4]:
|
||||
raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
|
||||
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
|
||||
if self.distribution == 4:
|
||||
# The distribution is set to sharing group, a sharing_group_id is required.
|
||||
if not hasattr(self, 'sharing_group_id'):
|
||||
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.')
|
||||
elif not self.sharing_group_id:
|
||||
# Cannot be None or 0 either.
|
||||
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
|
||||
|
||||
if 'uuid' in kwargs:
|
||||
self.uuid = kwargs.pop('uuid')
|
||||
if 'meta' in kwargs:
|
||||
self.meta = kwargs.pop('meta')
|
||||
if 'Galaxy' in kwargs:
|
||||
self.Galaxy = MISPGalaxy()
|
||||
self.Galaxy.from_dict(**kwargs.pop('Galaxy'))
|
||||
if 'GalaxyElement' in kwargs:
|
||||
[self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')]
|
||||
if 'Org' in kwargs:
|
||||
self.Org = MISPOrganisation()
|
||||
self.Org.from_dict(**kwargs.pop('Org'))
|
||||
if 'Orgc' in kwargs:
|
||||
self.Orgc = MISPOrganisation()
|
||||
self.Orgc.from_dict(**kwargs.pop('Orgc'))
|
||||
if 'GalaxyClusterRelation' in kwargs:
|
||||
[self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')]
|
||||
if 'SharingGroup' in kwargs:
|
||||
self.SharingGroup = MISPSharingGroup()
|
||||
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement:
|
||||
"""Add a cluster relation to a MISPGalaxyCluster, key and value are required
|
||||
|
||||
:param key: The key name of the element
|
||||
:type key: str
|
||||
:param value: The value of the element
|
||||
:type value: str
|
||||
"""
|
||||
|
||||
cluster_element = MISPGalaxyClusterElement()
|
||||
cluster_element.from_dict(key=key, value=value, **kwargs)
|
||||
self.cluster_elements.append(cluster_element)
|
||||
return cluster_element
|
||||
|
||||
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation:
|
||||
"""Add a cluster relation to a MISPGalaxyCluster.
|
||||
|
||||
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
|
||||
:type referenced_galaxy_cluster_uuid: uuid
|
||||
:param referenced_galaxy_cluster_type: Relation type
|
||||
:type referenced_galaxy_cluster_type: uuid
|
||||
:param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID
|
||||
:param galaxy_cluster_uuid: uuid, Optional
|
||||
"""
|
||||
|
||||
if not getattr(self, "uuid", None):
|
||||
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
|
||||
cluster_relation = MISPGalaxyClusterRelation()
|
||||
|
||||
if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster):
|
||||
referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid
|
||||
|
||||
cluster_relation.from_dict(
|
||||
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
|
||||
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,
|
||||
galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid,
|
||||
**kwargs
|
||||
)
|
||||
self.cluster_relations.append(cluster_relation)
|
||||
return cluster_relation
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'value'):
|
||||
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
class MISPGalaxy(AbstractMISP):
|
||||
"""Galaxy class, used to view a galaxy and respective clusters"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.GalaxyCluster: List[MISPGalaxyCluster] = []
|
||||
self.name: str
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
"""Galaxy could be in one of the following formats:
|
||||
{'Galaxy': {}, 'GalaxyCluster': []}
|
||||
{'Galaxy': {'GalaxyCluster': []}}
|
||||
"""
|
||||
|
||||
if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True):
|
||||
# Parse the cluster from the kwargs
|
||||
[self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')]
|
||||
|
||||
if 'Galaxy' in kwargs:
|
||||
kwargs = kwargs['Galaxy']
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
@property
|
||||
def clusters(self) -> List[MISPGalaxyCluster]:
|
||||
return self.GalaxyCluster
|
||||
|
||||
def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster:
|
||||
"""Add a MISP galaxy cluster into a MISPGalaxy.
|
||||
Supports all other parameters supported by MISPGalaxyCluster"""
|
||||
|
||||
galaxy_cluster = MISPGalaxyCluster()
|
||||
galaxy_cluster.from_dict(**kwargs)
|
||||
self.clusters.append(galaxy_cluster)
|
||||
return galaxy_cluster
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'name'):
|
||||
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
class MISPAttribute(AbstractMISP):
|
||||
_fields_for_feed: set = {'uuid', 'value', 'category', 'type', 'comment', 'data',
|
||||
'deleted', 'timestamp', 'to_ids', 'disable_correlation',
|
||||
|
@ -619,7 +301,7 @@ class MISPAttribute(AbstractMISP):
|
|||
"""Set a list of prepared MISPTag."""
|
||||
super()._set_tags(tags)
|
||||
|
||||
def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy:
|
||||
def add_galaxy(self, galaxy: Union['MISPGalaxy', dict, None] = None, **kwargs) -> 'MISPGalaxy':
|
||||
"""Add a galaxy to the Attribute, either by passing a MISPGalaxy or a dictionary"""
|
||||
if isinstance(galaxy, MISPGalaxy):
|
||||
self.galaxies.append(galaxy)
|
||||
|
@ -636,7 +318,7 @@ class MISPAttribute(AbstractMISP):
|
|||
return misp_galaxy
|
||||
|
||||
@property
|
||||
def galaxies(self) -> List[MISPGalaxy]:
|
||||
def galaxies(self) -> List['MISPGalaxy']:
|
||||
"""Returns a list of galaxies associated to this Attribute"""
|
||||
return self.Galaxy
|
||||
|
||||
|
@ -1506,6 +1188,324 @@ class MISPEventReport(AbstractMISP):
|
|||
self.content = ''
|
||||
|
||||
|
||||
class MISPGalaxyClusterElement(AbstractMISP):
|
||||
"""A MISP Galaxy cluster element, providing further info on a cluster
|
||||
|
||||
Creating a new galaxy cluster element can take the following parameters
|
||||
|
||||
:param key: The key/identifier of the element
|
||||
:type key: str
|
||||
:param value: The value of the element
|
||||
:type value: str
|
||||
"""
|
||||
|
||||
key: str
|
||||
value: str
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'key') and hasattr(self, 'value'):
|
||||
return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key == "value" and isinstance(value, list):
|
||||
raise PyMISPError("You tried to set a list to a cluster element's value. "
|
||||
"Instead, create seperate elements for each value")
|
||||
super().__setattr__(key, value)
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('galaxy_cluster_id'):
|
||||
self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id'))
|
||||
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
|
||||
class MISPGalaxyClusterRelation(AbstractMISP):
|
||||
"""A MISP Galaxy cluster relation, linking one cluster to another
|
||||
|
||||
Creating a new galaxy cluster can take the following parameters
|
||||
|
||||
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
|
||||
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
|
||||
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
|
||||
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0
|
||||
:param sharing_group_id: The sharing group of the relation, only when distribution is 4
|
||||
"""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, "referenced_galaxy_cluster_type"):
|
||||
return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.galaxy_cluster_uuid: str
|
||||
self.referenced_galaxy_cluster_uuid: str
|
||||
self.distribution: int = 0
|
||||
self.referenced_galaxy_cluster_type: str
|
||||
self.Tag: List[MISPTag] = []
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
# Default values for a valid event to send to a MISP instance
|
||||
self.distribution = int(kwargs.pop('distribution', 0))
|
||||
if self.distribution not in [0, 1, 2, 3, 4, 5]:
|
||||
raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
|
||||
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
|
||||
if self.distribution == 4:
|
||||
# The distribution is set to sharing group, a sharing_group_id is required.
|
||||
if not hasattr(self, 'sharing_group_id'):
|
||||
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.')
|
||||
elif not self.sharing_group_id:
|
||||
# Cannot be None or 0 either.
|
||||
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
|
||||
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('orgc_id'):
|
||||
self.orgc_id = int(kwargs.pop('orgc_id'))
|
||||
if kwargs.get('org_id'):
|
||||
self.org_id = int(kwargs.pop('org_id'))
|
||||
if kwargs.get('galaxy_id'):
|
||||
self.galaxy_id = int(kwargs.pop('galaxy_id'))
|
||||
if kwargs.get('tag_id'):
|
||||
self.tag_id = int(kwargs.pop('tag_id'))
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
if kwargs.get('Tag'):
|
||||
[self.add_tag(**t) for t in kwargs.pop('Tag')]
|
||||
if kwargs.get('SharingGroup'):
|
||||
self.SharingGroup = MISPSharingGroup()
|
||||
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag:
|
||||
return super()._add_tag(tag, **kwargs)
|
||||
|
||||
@property
|
||||
def tags(self) -> List[MISPTag]:
|
||||
"""Returns a list of tags associated to this Attribute"""
|
||||
return self.Tag
|
||||
|
||||
@tags.setter
|
||||
def tags(self, tags: List[MISPTag]):
|
||||
"""Set a list of prepared MISPTag."""
|
||||
super()._set_tags(tags)
|
||||
|
||||
|
||||
class MISPGalaxyCluster(AbstractMISP):
|
||||
"""A MISP galaxy cluster, storing respective galaxy elements and relations.
|
||||
Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters
|
||||
|
||||
Creating a new galaxy cluster can take the following parameters
|
||||
|
||||
:param value: The value of the galaxy cluster
|
||||
:type value: str
|
||||
:param description: The description of the galaxy cluster
|
||||
:type description: str
|
||||
:param distribution: The distribution type, one of 0, 1, 2, 3, 4
|
||||
:type distribution: int
|
||||
:param sharing_group_id: The sharing group ID, if distribution is set to 4
|
||||
:type sharing_group_id: int, optional
|
||||
:param authors: A list of authors of the galaxy cluster
|
||||
:type authors: list[str], optional
|
||||
:param cluster_elements: List of MISPGalaxyClusterElement
|
||||
:type cluster_elements: list[MISPGalaxyClusterElement], optional
|
||||
:param cluster_relations: List of MISPGalaxyClusterRelation
|
||||
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.Galaxy: MISPGalaxy
|
||||
self.GalaxyElement: List[MISPGalaxyClusterElement] = []
|
||||
self.meta: Dict = {}
|
||||
self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = []
|
||||
self.Org: MISPOrganisation
|
||||
self.Orgc: MISPOrganisation
|
||||
self.SharingGroup: MISPSharingGroup
|
||||
self.value: str
|
||||
# Set any inititialized cluster to be False
|
||||
self.default = False
|
||||
|
||||
@property
|
||||
def cluster_elements(self) -> List[MISPGalaxyClusterElement]:
|
||||
return self.GalaxyElement
|
||||
|
||||
@cluster_elements.setter
|
||||
def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]):
|
||||
self.GalaxyElement = cluster_elements
|
||||
|
||||
@property
|
||||
def cluster_relations(self) -> List[MISPGalaxyClusterRelation]:
|
||||
return self.GalaxyClusterRelation
|
||||
|
||||
@cluster_relations.setter
|
||||
def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]):
|
||||
self.GalaxyClusterRelation = cluster_relations
|
||||
|
||||
def parse_meta_as_elements(self):
|
||||
"""Function to parse the meta field into GalaxyClusterElements"""
|
||||
# Parse the cluster elements from the kwargs meta fields
|
||||
for key, value in self.meta.items():
|
||||
# The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up
|
||||
if not isinstance(value, list):
|
||||
value = [value]
|
||||
for v in value:
|
||||
self.add_cluster_element(key=key, value=v)
|
||||
|
||||
@property
|
||||
def elements_meta(self) -> Dict:
|
||||
"""Function to return the galaxy cluster elements as a dictionary structure of lists
|
||||
that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID
|
||||
"""
|
||||
response = defaultdict(list)
|
||||
for element in self.cluster_elements:
|
||||
response[element.key].append(element.value)
|
||||
return dict(response)
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if 'GalaxyCluster' in kwargs:
|
||||
kwargs = kwargs['GalaxyCluster']
|
||||
self.default = kwargs.pop('default', False)
|
||||
# If the default field is set, we shouldn't have distribution or sharing group ID set
|
||||
if self.default:
|
||||
blocked_fields = ["distribution" "sharing_group_id"]
|
||||
for field in blocked_fields:
|
||||
if kwargs.get(field, None):
|
||||
raise NewGalaxyClusterError(
|
||||
f"The field '{field}' cannot be set on a default galaxy cluster"
|
||||
)
|
||||
|
||||
self.distribution = int(kwargs.pop('distribution', 0))
|
||||
if self.distribution not in [0, 1, 2, 3, 4]:
|
||||
raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
|
||||
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
|
||||
if self.distribution == 4:
|
||||
# The distribution is set to sharing group, a sharing_group_id is required.
|
||||
if not hasattr(self, 'sharing_group_id'):
|
||||
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.')
|
||||
elif not self.sharing_group_id:
|
||||
# Cannot be None or 0 either.
|
||||
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
|
||||
|
||||
if 'uuid' in kwargs:
|
||||
self.uuid = kwargs.pop('uuid')
|
||||
if 'meta' in kwargs:
|
||||
self.meta = kwargs.pop('meta')
|
||||
if 'Galaxy' in kwargs:
|
||||
self.Galaxy = MISPGalaxy()
|
||||
self.Galaxy.from_dict(**kwargs.pop('Galaxy'))
|
||||
if 'GalaxyElement' in kwargs:
|
||||
[self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')]
|
||||
if 'Org' in kwargs:
|
||||
self.Org = MISPOrganisation()
|
||||
self.Org.from_dict(**kwargs.pop('Org'))
|
||||
if 'Orgc' in kwargs:
|
||||
self.Orgc = MISPOrganisation()
|
||||
self.Orgc.from_dict(**kwargs.pop('Orgc'))
|
||||
if 'GalaxyClusterRelation' in kwargs:
|
||||
[self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')]
|
||||
if 'SharingGroup' in kwargs:
|
||||
self.SharingGroup = MISPSharingGroup()
|
||||
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement:
|
||||
"""Add a cluster relation to a MISPGalaxyCluster, key and value are required
|
||||
|
||||
:param key: The key name of the element
|
||||
:type key: str
|
||||
:param value: The value of the element
|
||||
:type value: str
|
||||
"""
|
||||
|
||||
cluster_element = MISPGalaxyClusterElement()
|
||||
cluster_element.from_dict(key=key, value=value, **kwargs)
|
||||
self.cluster_elements.append(cluster_element)
|
||||
return cluster_element
|
||||
|
||||
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation:
|
||||
"""Add a cluster relation to a MISPGalaxyCluster.
|
||||
|
||||
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
|
||||
:type referenced_galaxy_cluster_uuid: uuid
|
||||
:param referenced_galaxy_cluster_type: Relation type
|
||||
:type referenced_galaxy_cluster_type: uuid
|
||||
:param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID
|
||||
:param galaxy_cluster_uuid: uuid, Optional
|
||||
"""
|
||||
|
||||
if not getattr(self, "uuid", None):
|
||||
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
|
||||
cluster_relation = MISPGalaxyClusterRelation()
|
||||
|
||||
if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster):
|
||||
referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid
|
||||
|
||||
cluster_relation.from_dict(
|
||||
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
|
||||
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,
|
||||
galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid,
|
||||
**kwargs
|
||||
)
|
||||
self.cluster_relations.append(cluster_relation)
|
||||
return cluster_relation
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'value'):
|
||||
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
class MISPGalaxy(AbstractMISP):
|
||||
"""Galaxy class, used to view a galaxy and respective clusters"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.GalaxyCluster: List[MISPGalaxyCluster] = []
|
||||
self.name: str
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
"""Galaxy could be in one of the following formats:
|
||||
{'Galaxy': {}, 'GalaxyCluster': []}
|
||||
{'Galaxy': {'GalaxyCluster': []}}
|
||||
"""
|
||||
|
||||
if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True):
|
||||
# Parse the cluster from the kwargs
|
||||
[self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')]
|
||||
|
||||
if 'Galaxy' in kwargs:
|
||||
kwargs = kwargs['Galaxy']
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
@property
|
||||
def clusters(self) -> List[MISPGalaxyCluster]:
|
||||
return self.GalaxyCluster
|
||||
|
||||
def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster:
|
||||
"""Add a MISP galaxy cluster into a MISPGalaxy.
|
||||
Supports all other parameters supported by MISPGalaxyCluster"""
|
||||
|
||||
galaxy_cluster = MISPGalaxyCluster()
|
||||
galaxy_cluster.from_dict(**kwargs)
|
||||
self.clusters.append(galaxy_cluster)
|
||||
return galaxy_cluster
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if hasattr(self, 'name'):
|
||||
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
class MISPEvent(AbstractMISP):
|
||||
|
||||
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
|
||||
|
|
Loading…
Reference in New Issue