new: MISP Galaxy 2.0 capability

pull/682/head
Tom King 2021-01-16 15:56:30 +00:00
parent de6125a623
commit 164791e980
5 changed files with 395 additions and 14 deletions

View File

@ -24,7 +24,7 @@ Response (if any):
try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPGalaxyCluster # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -23,7 +23,8 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPGalaxyCluster, \
MISPGalaxyClusterRelation
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
SearchType = TypeVar('SearchType', str, int)
@ -1191,10 +1192,11 @@ class PyMISP:
to_return.append(g)
return to_return
def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxy]:
def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], withCluster: bool = False, pythonify: bool = False) -> Union[Dict, MISPGalaxy]:
"""Get a galaxy by id
:param galaxy: galaxy to get
:param withCluster: Include the clusters associated with the galaxy
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
@ -1203,7 +1205,7 @@ class PyMISP:
if not (self.global_pythonify or pythonify) or 'errors' in galaxy_j:
return galaxy_j
g = MISPGalaxy()
g.from_dict(**galaxy_j)
g.from_dict(**galaxy_j, withCluster=withCluster)
return g
def update_galaxies(self) -> Dict:
@ -1211,6 +1213,67 @@ class PyMISP:
response = self._prepare_request('POST', 'galaxies/update')
return self._check_json_response(response)
def get_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('GET', f'galaxy_clusters/view/{cluster_id}')
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
return cluster_j
gc = MISPGalaxyCluster()
gc.from_dict(**cluster_j)
return gc
def update_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Update a custom galaxy cluster."""
if galaxy_cluster.default:
# We can't edit default galaxies
raise PyMISPError('You are not able to update a default galaxy cluster')
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('POST', f'galaxy_clusters/edit/{cluster_id}', data=galaxy_cluster)
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
return cluster_j
gc = MISPGalaxyCluster()
gc.from_dict(**cluster_j)
return gc
def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict:
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('POST', f'galaxy_clusters/publish/{cluster_id}')
response = self._check_json_response(r)
return response
def fork_galaxy_cluster(self, galaxy: Union[MISPGalaxyClusterRelation, int, str, UUID], galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
# Create a duplicate cluster from the cluster to fork
forked_galaxy_cluster = MISPGalaxyCluster()
forked_galaxy_cluster.from_dict(**galaxy_cluster)
# Set the UUID and version it extends from the existing galaxy cluster
forked_galaxy_cluster.extends_uuid = forked_galaxy_cluster.pop('uuid')
forked_galaxy_cluster.extends_version = forked_galaxy_cluster.pop('version')
r = self._prepare_request('POST', f'galaxy_clusters/add/{galaxy_id}/forkUUID:{cluster_id}', data=galaxy_cluster)
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
return cluster_j
gc = MISPGalaxyCluster()
gc.from_dict(**cluster_j)
return gc
def update_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict:
"""Update a galaxy cluster relation."""
cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation)
r = self._prepare_request('POST', f'galaxy_cluster_relations/edit/{cluster_relation_id}', data=galaxy_cluster_relation)
cluster_rel_j = self._check_json_response(r)
return cluster_rel_j
def delete_galaxy_cluster_relation(self, galaxy_cluster_relation: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict:
"""Delete a galaxy cluster relation"""
cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation)
r = self._prepare_request('POST', f'galaxy_cluster_relations/delete/{cluster_relation_id}')
cluster_rel_j = self._check_json_response(r)
return cluster_rel_j
# ## END Galaxy ###
# ## BEGIN Feed ###

@ -1 +1 @@
Subproject commit 8921a0c8a26f1e5a7ce77fca7e96d8523ad5fffe
Subproject commit 27a554ab12acbc1242f801b5682364b2047cf9e0

View File

@ -23,6 +23,18 @@ class UpdateAttributeError(PyMISPError):
pass
class NewGalaxyError(PyMISPError):
pass
class NewGalaxyClusterError(PyMISPError):
pass
class NewGalaxyClusterRelationError(PyMISPError):
pass
class SearchError(PyMISPError):
pass

View File

@ -16,7 +16,7 @@ from pathlib import Path
from typing import List, Optional, Union, IO, Dict, Any
from .abstract import AbstractMISP, MISPTag
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewGalaxyClusterError, NewGalaxyClusterRelationError
logger = logging.getLogger('pymisp')
@ -982,6 +982,305 @@ class MISPObject(AbstractMISP):
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPGalaxyClusterElement(AbstractMISP):
def __repr__(self) -> str:
if hasattr(self, 'key') and hasattr(self, 'value'):
return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __setattr__(self, key, value):
if key == "value" and isinstance(value, list):
raise PyMISPError("You tried to set a list to a cluster element's value. "
"Instead, create seperate elements for each value")
super().__setattr__(key, value)
class MISPGalaxyClusterRelation(AbstractMISP):
"""A MISP Galaxy cluster relation, linking one cluster to another
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, default 0
:type distribution: int
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
:type galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
:type referenced_galaxy_cluster_type: str
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
:type referenced_galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_id: The ID of the related galaxy
:type referenced_galaxy_cluster_id: int, optional
:param galaxy_cluster_id: The ID of the galaxy cluster
:type galaxy_cluster_id: id, optional
:param id: The ID of the cluster relation
:type id: int, optional
:param default: Whether the relation is a default
:type default: bool, optional
"""
def __repr__(self) -> str:
if hasattr(self, "referenced_galaxy_cluster_type"):
return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __init__(self):
super().__init__()
self.galaxy_cluster_uuid: uuid
self.referenced_galaxy_cluster_uuid: uuid
self.distribution: int = 0
self.referenced_galaxy_cluster_type: str
self.Tag: MISPTag = []
def from_dict(self, **kwargs):
# Default values for a valid event to send to a MISP instance
self.distribution = kwargs.pop('distribution', 0)
self.distribution = int(self.distribution)
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('orgc_id'):
self.orgc_id = int(kwargs.pop('orgc_id'))
if kwargs.get('org_id'):
self.org_id = int(kwargs.pop('org_id'))
if kwargs.get('galaxy_id'):
self.galaxy_id = int(kwargs.pop('galaxy_id'))
if kwargs.get('tag_id'):
self.tag_id = int(kwargs.pop('tag_id'))
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if kwargs.get('Tag'):
[self.add_tag(**t) for t in kwargs.pop('Tag')]
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@property
def tags(self) -> List[MISPTag]:
"""Returns a list of tags associated to this Attribute"""
return self.Tag
@tags.setter
def tags(self, tags: List[MISPTag]):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
class MISPGalaxyCluster(AbstractMISP):
"""A MISP galaxy cluster, storing respective galaxy elements and relations.
Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters
:param Org: The organisation as a MISPOrganisation
:type Org: MISPOrganisation
:param Orgc: The creator organisation as a MISPOrganisation
:type Orgc: MISPOrganisation
:param SharingGroup: The SharingGroup applied to the cluster, if any
:type SharingGroup: MISPSharingGroup, optional
:param default: Whether the galaxy cluster is a default or custom cluster, default clusters cannot be edited
:type default: bool
:param deleted: Whether the galaxy cluster is deleted or not
:type deleted: bool
:param description: The description of the galaxy cluster
:type description: str
:param distribution: The distribution type, one of 1, 2, 3, 4, 5
:type distribution: int
:param sharing_group_id: The sharing group ID, if distribution is set to 4
:type sharing_group_id: int, optional
:param extends_uuid: The UUID of the galaxy cluster it extends
:type extends_uuid: uuid, optional
:param galaxy_id: The ID of the galaxy
:type galaxy_id: int
:param id: The ID of the galaxy cluster
:type id: int
:param org_id: The org's ID
:type org_id: int
:param orgc_id: The creating org's ID
:type orgc_id: int
:param published: Whether the cluster is published or not
:type published: bool
:param source: The source of the galaxy cluster
:type source: str
:param tag_count: The count of events using this galaxy cluster
:type tag_count: int
:param tag_id: The tag ID
:type tag_id: int
:param tag_name: The galaxy cluster's tag
:type tag_name: str
:param type: The type of the galaxy cluster, must match the housing galaxies type
:type type: str
:param value: The value of the galaxy cluster
:type value: str
:param authors: A list of authors of the galaxy cluster
:type authors: list, optional
:param cluster_elements: List of MISPGalaxyClusterElement
:type cluster_elements: list, optional
:param cluster_relations: List of MISPGalaxyClusterRelation, changes must be made through PyMISP instance
:type cluster_relations: list, optional
"""
def __init__(self):
super().__init__()
self.Galaxy: MISPGalaxy
self.GalaxyElement: List[MISPGalaxyClusterElement] = []
self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = []
self.Org: MISPOrganisation
self.Orgc: MISPOrganisation
self.SharingGroup: MISPSharingGroup
@property
def cluster_elements(self) -> List[MISPGalaxyClusterElement]:
return self.GalaxyElement
@cluster_elements.setter
def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]):
self.GalaxyElement = cluster_elements
@property
def cluster_relations(self) -> MISPGalaxyClusterRelation:
return self.GalaxyClusterRelation
@cluster_relations.setter
def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]):
self.GalaxyClusterRelation = cluster_relations
def from_dict(self, **kwargs):
# If the default field is set, we shouldn't have distribution or sharing group ID set
if 'GalaxyCluster' in kwargs:
kwargs = kwargs['GalaxyCluster']
if kwargs.get('default', False):
blocked_fields = ["distribution" "sharing_group_id"]
for field in blocked_fields:
if kwargs.get(field, None):
raise NewGalaxyClusterError(
f"One of the following fields are set for a default galaxy cluster: {', '.join(blocked_fields)}"
)
if 'uuid' in kwargs:
self.uuid = kwargs.pop('uuid')
if 'meta' in kwargs:
# Parse the cluster elements from the kwargs meta fields
for key, value in kwargs.pop('meta').items():
# The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up
if not isinstance(value, list):
value = [value]
for v in value:
self.add_cluster_element(key=key, value=v)
if 'Galaxy' in kwargs:
self.Galaxy = MISPGalaxy()
self.Galaxy.from_dict(**kwargs.pop('Galaxy'))
if 'GalaxyElement' in kwargs:
[self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')]
if 'Org' in kwargs:
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
if 'Orgc' in kwargs:
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if 'GalaxyClusterRelation' in kwargs:
[self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')]
if 'SharingGroup' in kwargs:
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_cluster_element(self, key: str, value: str, **kwargs):
"""Add a cluster relation to a MISPGalaxyCluster, key and value are required
:param key: The key name of the element
:param value: The value of the element
"""
cluster_element = MISPGalaxyClusterElement()
cluster_element.from_dict(key=key, value=value, **kwargs)
self.cluster_elements.append(cluster_element)
return cluster_element
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: uuid, referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs):
"""Add a cluster relation to a MISPGalaxyCluster
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
:param referenced_galaxy_cluster_type: Relation type
"""
if not getattr(self, "uuid", None):
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
cluster_relation = MISPGalaxyClusterRelation()
cluster_relation.from_dict(
galaxy_cluster_uuid=self.uuid,
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,
**kwargs
)
self.cluster_relations.append(cluster_relation)
return cluster_relation
def __repr__(self) -> str:
if hasattr(self, 'value'):
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPGalaxy(AbstractMISP):
"""Galaxy class, used to view a galaxy and respective clusters, supports the following fields
:param id: Galaxy ID
:type id: int
:param uuid: Galaxy UUID
:type uuuid: uuid, str
:param name: Galaxy name
:type name: str
:param type: Galaxy type
:type type: str
:param description: Galaxy description
:type description: str
:param version: Galaxy version number
:type version: int
:param icon: Galaxy icon
:type icon: str
:param namespace: Galaxy namespace
:type namespace: str
:param clusters: List of MISPGalaxyCluster
:type clusters: list
"""
def __init__(self):
super().__init__()
self.GalaxyCluster: List[MISPGalaxyCluster] = []
def from_dict(self, **kwargs):
"""Galaxy could be in one of the following formats:
{'Galaxy': {}, 'GalaxyCluster': []}
{'Galaxy': {'GalaxyCluster': []}}
"""
if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True):
# Parse the cluster from the kwargs
[self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')]
if 'Galaxy' in kwargs:
kwargs = kwargs['Galaxy']
super().from_dict(**kwargs)
@property
def clusters(self) -> List[MISPGalaxyCluster]:
return self.GalaxyCluster
def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster:
"""Add a MISP galaxy and sub-clusters into an event.
Supports all other parameters supported by MISPGalaxy"""
galaxy_cluster = MISPGalaxyCluster()
galaxy_cluster.from_dict(**kwargs)
self.clusters.append(galaxy_cluster)
return galaxy_cluster
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPEvent(AbstractMISP):
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
@ -1006,6 +1305,7 @@ class MISPEvent(AbstractMISP):
self.ShadowAttribute: List[MISPShadowAttribute] = []
self.SharingGroup: MISPSharingGroup
self.Tag: List[MISPTag] = []
self.Galaxy: List[MISPGalaxy] = []
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]] = None, **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@ -1168,6 +1468,10 @@ class MISPEvent(AbstractMISP):
def objects(self) -> List[MISPObject]:
return self.Object
@property
def galaxies(self) -> List[MISPGalaxy]:
return self.Galaxy
@objects.setter
def objects(self, objects: List[MISPObject]):
if all(isinstance(x, MISPObject) for x in objects):
@ -1272,6 +1576,8 @@ class MISPEvent(AbstractMISP):
self.set_date(kwargs.pop('date'))
if kwargs.get('Attribute'):
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
if kwargs.get('Galaxy'):
[self.add_galaxy(**e) for e in kwargs.pop('Galaxy')]
# All other keys
if kwargs.get('id'):
@ -1412,6 +1718,14 @@ class MISPEvent(AbstractMISP):
return attr_list
return attribute
def add_galaxy(self, **kwargs) -> MISPGalaxy:
"""Add a MISP galaxy and sub-clusters into an event.
Supports all other parameters supported by MISPGalaxy"""
galaxy = MISPGalaxy()
galaxy.from_dict(**kwargs)
self.galaxies.append(galaxy)
return galaxy
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
"""Get an object by ID
@ -1610,14 +1924,6 @@ class MISPTaxonomy(AbstractMISP):
super().from_dict(**kwargs)
class MISPGalaxy(AbstractMISP):
def from_dict(self, **kwargs):
if 'Galaxy' in kwargs:
kwargs = kwargs['Galaxy']
super().from_dict(**kwargs)
class MISPNoticelist(AbstractMISP):
def from_dict(self, **kwargs):