Merge branch 'tomking2-feature/misp-galaxy-2' into main

pull/749/head
Raphaël Vinot 2021-03-02 11:50:43 +01:00
commit 3067b818ff
8 changed files with 679 additions and 17 deletions

View File

@ -35,7 +35,7 @@ try:
warning_2022()
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import openioc # noqa

View File

@ -24,7 +24,8 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
MISPGalaxyCluster, MISPGalaxyClusterRelation
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
SearchType = TypeVar('SearchType', str, int)
@ -499,9 +500,10 @@ class PyMISP:
"""
event_report_id = get_uuid_or_id_from_abstract_misp(event_report)
request_url = f'eventReports/delete/{event_report_id}'
data = {}
if hard:
request_url += "/1"
r = self._prepare_request('POST', request_url)
data['hard'] = 1
r = self._prepare_request('POST', request_url, data=data)
return self._check_json_response(r)
# ## END Event Report ###
@ -1314,10 +1316,11 @@ class PyMISP:
to_return.append(g)
return to_return
def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxy]:
def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], withCluster: bool = False, pythonify: bool = False) -> Union[Dict, MISPGalaxy]:
"""Get a galaxy by id
:param galaxy: galaxy to get
:param withCluster: Include the clusters associated with the galaxy
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
@ -1326,14 +1329,179 @@ class PyMISP:
if not (self.global_pythonify or pythonify) or 'errors' in galaxy_j:
return galaxy_j
g = MISPGalaxy()
g.from_dict(**galaxy_j)
g.from_dict(**galaxy_j, withCluster=withCluster)
return g
def search_galaxy_clusters(self, galaxy: Union[MISPGalaxy, int, str, UUID], context: str = "all", searchall: str = None, pythonify: bool = False) -> Union[List[Dict], List[MISPGalaxyCluster]]:
"""Searches the galaxy clusters within a specific galaxy
:param galaxy: The MISPGalaxy you wish to search in
:param context: The context of how you want to search within the galaxy_
:param searchall: The search you want to make against the galaxy and context
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
allowed_context_types = ["all", "default", "custom", "org", "deleted"]
if context not in allowed_context_types:
raise PyMISPError(f"The context must be one of {allowed_context_types.join(', ')}")
kw_params = {"context": context}
if searchall:
kw_params["searchall"] = searchall
r = self._prepare_request('GET', f"galaxy_clusters/index/{galaxy_id}", kw_params=kw_params)
clusters_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in clusters_j:
return clusters_j
response = []
for cluster in clusters_j:
c = MISPGalaxyCluster()
c.from_dict(**cluster)
response.append(c)
return response
def update_galaxies(self) -> Dict:
"""Update all the galaxies."""
response = self._prepare_request('POST', 'galaxies/update')
return self._check_json_response(response)
def get_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Gets a specific galaxy cluster
:param galaxy_cluster: The MISPGalaxyCluster you want to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('GET', f'galaxy_clusters/view/{cluster_id}')
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
return cluster_j
gc = MISPGalaxyCluster()
gc.from_dict(**cluster_j)
return gc
def add_galaxy_cluster(self, galaxy: Union[MISPGalaxy, str, UUID], galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Add a new galaxy cluster to a MISP Galaxy
:param galaxy: A MISPGalaxy (or UUID) where you wish to add the galaxy cluster
:param galaxy_cluster: A MISPGalaxyCluster you wish to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if getattr(galaxy_cluster, "default", False):
# We can't add default galaxies
raise PyMISPError('You are not able add a default galaxy cluster')
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
r = self._prepare_request('POST', f'galaxy_clusters/add/{galaxy_id}', data=galaxy_cluster)
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
return cluster_j
gc = MISPGalaxyCluster()
gc.from_dict(**cluster_j)
return gc
def update_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Update a custom galaxy cluster.
;param galaxy_cluster: The MISPGalaxyCluster you wish to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if getattr(galaxy_cluster, "default", False):
# We can't edit default galaxies
raise PyMISPError('You are not able to update a default galaxy cluster')
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
print(cluster_id)
r = self._prepare_request('POST', f'galaxy_clusters/edit/{cluster_id}', data=galaxy_cluster)
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
return cluster_j
gc = MISPGalaxyCluster()
gc.from_dict(**cluster_j)
return gc
def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID]) -> Dict:
"""Publishes a galaxy cluster
:param galaxy_cluster: The galaxy cluster you wish to publish
"""
if isinstance(galaxy_cluster, MISPGalaxyCluster) and getattr(galaxy_cluster, "default", False):
raise PyMISPError('You are not able to publish a default galaxy cluster')
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('POST', f'galaxy_clusters/publish/{cluster_id}')
response = self._check_json_response(r)
return response
def fork_galaxy_cluster(self, galaxy: Union[MISPGalaxy, int, str, UUID], galaxy_cluster: Union[MISPGalaxyClusterRelation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Forks an existing galaxy cluster, creating a new one with matching attributes
:param galaxy: The galaxy (or galaxy ID) where the cluster you want to fork resides
:param galaxy_cluster: The galaxy cluster you wish to fork
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
galaxy_id = get_uuid_or_id_from_abstract_misp(galaxy)
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
# Create a duplicate cluster from the cluster to fork
forked_galaxy_cluster = MISPGalaxyCluster()
forked_galaxy_cluster.from_dict(**galaxy_cluster)
# Set the UUID and version it extends from the existing galaxy cluster
forked_galaxy_cluster.extends_uuid = forked_galaxy_cluster.pop('uuid')
forked_galaxy_cluster.extends_version = forked_galaxy_cluster.pop('version')
r = self._prepare_request('POST', f'galaxy_clusters/add/{galaxy_id}/forkUUID:{cluster_id}', data=galaxy_cluster)
cluster_j = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in cluster_j:
return cluster_j
gc = MISPGalaxyCluster()
gc.from_dict(**cluster_j)
return gc
def delete_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, id, str, UUID], hard=False) -> Dict:
"""Deletes a galaxy cluster from MISP
:param galaxy_cluster: The MISPGalaxyCluster you wish to delete from MISP
:param hard: flag for hard delete
"""
if isinstance(galaxy_cluster, MISPGalaxyCluster) and getattr(galaxy_cluster, "default", False):
raise PyMISPError('You are not able to delete a default galaxy cluster')
data = {}
if hard:
data['hard'] = 1
cluster_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster)
r = self._prepare_request('POST', f'galaxy_clusters/delete/{cluster_id}', data=data)
return self._check_json_response(r)
def add_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict:
"""Add a galaxy cluster relation, cluster relation must include
cluster UUIDs in both directions
:param galaxy_cluster_relation: The MISPGalaxyClusterRelation to add
"""
r = self._prepare_request('POST', 'galaxy_cluster_relations/add/', data=galaxy_cluster_relation)
cluster_rel_j = self._check_json_response(r)
return cluster_rel_j
def update_galaxy_cluster_relation(self, galaxy_cluster_relation: MISPGalaxyClusterRelation) -> Dict:
"""Update a galaxy cluster relation
:param galaxy_cluster_relation: The MISPGalaxyClusterRelation to update
"""
cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation)
r = self._prepare_request('POST', f'galaxy_cluster_relations/edit/{cluster_relation_id}', data=galaxy_cluster_relation)
cluster_rel_j = self._check_json_response(r)
return cluster_rel_j
def delete_galaxy_cluster_relation(self, galaxy_cluster_relation: Union[MISPGalaxyClusterRelation, int, str, UUID]) -> Dict:
"""Delete a galaxy cluster relation
:param galaxy_cluster_relation: The MISPGalaxyClusterRelation to delete
"""
cluster_relation_id = get_uuid_or_id_from_abstract_misp(galaxy_cluster_relation)
r = self._prepare_request('POST', f'galaxy_cluster_relations/delete/{cluster_relation_id}')
cluster_rel_j = self._check_json_response(r)
return cluster_rel_j
# ## END Galaxy ###
# ## BEGIN Feed ###

@ -1 +1 @@
Subproject commit 2b1c3532dccad651f960ff71defdbc422c40ef0c
Subproject commit 27a554ab12acbc1242f801b5682364b2047cf9e0

View File

@ -27,6 +27,14 @@ class UpdateAttributeError(PyMISPError):
pass
class NewGalaxyClusterError(PyMISPError):
pass
class NewGalaxyClusterRelationError(PyMISPError):
pass
class SearchError(PyMISPError):
pass

View File

@ -9,6 +9,7 @@ import sys
from io import BytesIO, BufferedIOBase, TextIOBase
from zipfile import ZipFile
import uuid
from uuid import UUID
from collections import defaultdict
import logging
import hashlib
@ -16,7 +17,7 @@ from pathlib import Path
from typing import List, Optional, Union, IO, Dict, Any
from .abstract import AbstractMISP, MISPTag
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, NewGalaxyClusterError, NewGalaxyClusterRelationError
logger = logging.getLogger('pymisp')
@ -1057,6 +1058,324 @@ class MISPEventReport(AbstractMISP):
self.content = ''
class MISPGalaxyClusterElement(AbstractMISP):
"""A MISP Galaxy cluster element, providing further info on a cluster
Creating a new galaxy cluster element can take the following parameters
:param key: The key/identifier of the element
:type key: str
:param value: The value of the element
:type value: str
"""
def __repr__(self) -> str:
if hasattr(self, 'key') and hasattr(self, 'value'):
return '<{self.__class__.__name__}(key={self.key}, value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __setattr__(self, key, value):
if key == "value" and isinstance(value, list):
raise PyMISPError("You tried to set a list to a cluster element's value. "
"Instead, create seperate elements for each value")
super().__setattr__(key, value)
def from_dict(self, **kwargs):
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('galaxy_cluster_id'):
self.galaxy_cluster_id = int(kwargs.pop('galaxy_cluster_id'))
super().from_dict(**kwargs)
class MISPGalaxyClusterRelation(AbstractMISP):
"""A MISP Galaxy cluster relation, linking one cluster to another
Creating a new galaxy cluster can take the following parameters
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
:type galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
:type referenced_galaxy_cluster_type: str
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
:type referenced_galaxy_cluster_uuid: uuid
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0
:type distribution: int
:param sharing_group_id: The sharing group of the relation, only when distribution is 4
:type sharing_group_id: int, optional
"""
def __repr__(self) -> str:
if hasattr(self, "referenced_galaxy_cluster_type"):
return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __init__(self):
super().__init__()
self.galaxy_cluster_uuid: uuid
self.referenced_galaxy_cluster_uuid: uuid
self.distribution: int = 0
self.referenced_galaxy_cluster_type: str
self.Tag: MISPTag = []
def from_dict(self, **kwargs):
# Default values for a valid event to send to a MISP instance
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4, 5]:
raise NewGalaxyClusterRelationError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterRelationError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if kwargs.get('id'):
self.id = int(kwargs.pop('id'))
if kwargs.get('orgc_id'):
self.orgc_id = int(kwargs.pop('orgc_id'))
if kwargs.get('org_id'):
self.org_id = int(kwargs.pop('org_id'))
if kwargs.get('galaxy_id'):
self.galaxy_id = int(kwargs.pop('galaxy_id'))
if kwargs.get('tag_id'):
self.tag_id = int(kwargs.pop('tag_id'))
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if kwargs.get('Tag'):
[self.add_tag(**t) for t in kwargs.pop('Tag')]
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@property
def tags(self) -> List[MISPTag]:
"""Returns a list of tags associated to this Attribute"""
return self.Tag
@tags.setter
def tags(self, tags: List[MISPTag]):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
class MISPGalaxyCluster(AbstractMISP):
"""A MISP galaxy cluster, storing respective galaxy elements and relations.
Used to view default galaxy clusters and add/edit/update/delete Galaxy 2.0 clusters
Creating a new galaxy cluster can take the following parameters
:param value: The value of the galaxy cluster
:type value: str
:param description: The description of the galaxy cluster
:type description: str
:param distribution: The distribution type, one of 0, 1, 2, 3, 4
:type distribution: int
:param sharing_group_id: The sharing group ID, if distribution is set to 4
:type sharing_group_id: int, optional
:param authors: A list of authors of the galaxy cluster
:type authors: list[str], optional
:param cluster_elements: List of MISPGalaxyClusterElement
:type cluster_elements: list[MISPGalaxyClusterElement], optional
:param cluster_relations: List of MISPGalaxyClusterRelation
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
"""
def __init__(self):
super().__init__()
self.Galaxy: MISPGalaxy
self.GalaxyElement: List[MISPGalaxyClusterElement] = []
self.meta = {}
self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = []
self.Org: MISPOrganisation
self.Orgc: MISPOrganisation
self.SharingGroup: MISPSharingGroup
# Set any inititialized cluster to be False
self.default = False
@property
def cluster_elements(self) -> List[MISPGalaxyClusterElement]:
return self.GalaxyElement
@cluster_elements.setter
def cluster_elements(self, cluster_elements: List[MISPGalaxyClusterElement]):
self.GalaxyElement = cluster_elements
@property
def cluster_relations(self) -> MISPGalaxyClusterRelation:
return self.GalaxyClusterRelation
@cluster_relations.setter
def cluster_relations(self, cluster_relations: List[MISPGalaxyClusterRelation]):
self.GalaxyClusterRelation = cluster_relations
def parse_meta_as_elements(self):
"""Function to parse the meta field into GalaxyClusterElements"""
# Parse the cluster elements from the kwargs meta fields
for key, value in self.meta.items():
# The meta will merge fields together, i.e. Two 'countries' will be a list, so split these up
if not isinstance(value, list):
value = [value]
for v in value:
self.add_cluster_element(key=key, value=v)
@property
def elements_meta(self) -> Dict:
"""Function to return the galaxy cluster elements as a dictionary structure of lists
that comes from a MISPGalaxy within a MISPEvent. Lossy, you lose the element ID
"""
response = defaultdict(list)
for element in self.cluster_elements:
response[element.key].append(element.value)
return dict(response)
def from_dict(self, **kwargs):
if 'GalaxyCluster' in kwargs:
kwargs = kwargs['GalaxyCluster']
self.default = kwargs.pop('default', False)
# If the default field is set, we shouldn't have distribution or sharing group ID set
if self.default:
blocked_fields = ["distribution" "sharing_group_id"]
for field in blocked_fields:
if kwargs.get(field, None):
raise NewGalaxyClusterError(
f"The field '{field}' cannot be set on a default galaxy cluster"
)
self.distribution = int(kwargs.pop('distribution', 0))
if self.distribution not in [0, 1, 2, 3, 4]:
raise NewGalaxyClusterError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4')
if kwargs.get('sharing_group_id'):
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
if self.distribution == 4:
# The distribution is set to sharing group, a sharing_group_id is required.
if not hasattr(self, 'sharing_group_id'):
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required.')
elif not self.sharing_group_id:
# Cannot be None or 0 either.
raise NewGalaxyClusterError('If the distribution is set to sharing group, a sharing group ID is required (cannot be {}).'.format(self.sharing_group_id))
if 'uuid' in kwargs:
self.uuid = kwargs.pop('uuid')
if 'meta' in kwargs:
self.meta = kwargs.pop('meta')
if 'Galaxy' in kwargs:
self.Galaxy = MISPGalaxy()
self.Galaxy.from_dict(**kwargs.pop('Galaxy'))
if 'GalaxyElement' in kwargs:
[self.add_cluster_element(**e) for e in kwargs.pop('GalaxyElement')]
if 'Org' in kwargs:
self.Org = MISPOrganisation()
self.Org.from_dict(**kwargs.pop('Org'))
if 'Orgc' in kwargs:
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if 'GalaxyClusterRelation' in kwargs:
[self.add_cluster_relation(**r) for r in kwargs.pop('GalaxyClusterRelation')]
if 'SharingGroup' in kwargs:
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super().from_dict(**kwargs)
def add_cluster_element(self, key: str, value: str, **kwargs) -> MISPGalaxyClusterElement:
"""Add a cluster relation to a MISPGalaxyCluster, key and value are required
:param key: The key name of the element
:type key: str
:param value: The value of the element
:type value: str
"""
cluster_element = MISPGalaxyClusterElement()
cluster_element.from_dict(key=key, value=value, **kwargs)
self.cluster_elements.append(cluster_element)
return cluster_element
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation:
"""Add a cluster relation to a MISPGalaxyCluster.
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
:type referenced_galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: Relation type
:type referenced_galaxy_cluster_type: uuid
:param galaxy_cluster_uuid: UUID of this cluster, leave blank to use the stored UUID
:param galaxy_cluster_uuid: uuid, Optional
"""
if not getattr(self, "uuid", None):
raise PyMISPError("The cluster does not have a UUID, make sure it is a valid galaxy cluster")
cluster_relation = MISPGalaxyClusterRelation()
if isinstance(referenced_galaxy_cluster_uuid, MISPGalaxyCluster):
referenced_galaxy_cluster_uuid = referenced_galaxy_cluster_uuid.uuid
cluster_relation.from_dict(
referenced_galaxy_cluster_uuid=referenced_galaxy_cluster_uuid,
referenced_galaxy_cluster_type=referenced_galaxy_cluster_type,
galaxy_cluster_uuid=galaxy_cluster_uuid or self.uuid,
**kwargs
)
self.cluster_relations.append(cluster_relation)
return cluster_relation
def __repr__(self) -> str:
if hasattr(self, 'value'):
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPGalaxy(AbstractMISP):
"""Galaxy class, used to view a galaxy and respective clusters"""
def __init__(self):
super().__init__()
self.GalaxyCluster: List[MISPGalaxyCluster] = []
def from_dict(self, **kwargs):
"""Galaxy could be in one of the following formats:
{'Galaxy': {}, 'GalaxyCluster': []}
{'Galaxy': {'GalaxyCluster': []}}
"""
if 'GalaxyCluster' in kwargs and kwargs.get("withCluster", True):
# Parse the cluster from the kwargs
[self.add_galaxy_cluster(**e) for e in kwargs.pop('GalaxyCluster')]
if 'Galaxy' in kwargs:
kwargs = kwargs['Galaxy']
super().from_dict(**kwargs)
@property
def clusters(self) -> List[MISPGalaxyCluster]:
return self.GalaxyCluster
def add_galaxy_cluster(self, **kwargs) -> MISPGalaxyCluster:
"""Add a MISP galaxy cluster into a MISPGalaxy.
Supports all other parameters supported by MISPGalaxyCluster"""
galaxy_cluster = MISPGalaxyCluster()
galaxy_cluster.from_dict(**kwargs)
self.clusters.append(galaxy_cluster)
return galaxy_cluster
def __repr__(self) -> str:
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPEvent(AbstractMISP):
_fields_for_feed: set = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
@ -1082,6 +1401,7 @@ class MISPEvent(AbstractMISP):
self.SharingGroup: MISPSharingGroup
self.EventReport: List[MISPEventReport] = []
self.Tag: List[MISPTag] = []
self.Galaxy: List[MISPGalaxy] = []
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]] = None, **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@ -1248,6 +1568,10 @@ class MISPEvent(AbstractMISP):
def objects(self) -> List[MISPObject]:
return self.Object
@property
def galaxies(self) -> List[MISPGalaxy]:
return self.Galaxy
@objects.setter
def objects(self, objects: List[MISPObject]):
if all(isinstance(x, MISPObject) for x in objects):
@ -1352,6 +1676,8 @@ class MISPEvent(AbstractMISP):
self.set_date(kwargs.pop('date'))
if kwargs.get('Attribute'):
[self.add_attribute(**a) for a in kwargs.pop('Attribute')]
if kwargs.get('Galaxy'):
[self.add_galaxy(**e) for e in kwargs.pop('Galaxy')]
if kwargs.get('EventReport'):
[self.add_event_report(**e) for e in kwargs.pop('EventReport')]
@ -1504,6 +1830,14 @@ class MISPEvent(AbstractMISP):
self.edited = True
return event_report
def add_galaxy(self, **kwargs) -> MISPGalaxy:
"""Add a MISP galaxy and sub-clusters into an event.
Supports all other parameters supported by MISPGalaxy"""
galaxy = MISPGalaxy()
galaxy.from_dict(**kwargs)
self.galaxies.append(galaxy)
return galaxy
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
"""Get an object by ID
@ -1715,14 +2049,6 @@ class MISPTaxonomy(AbstractMISP):
super().from_dict(**kwargs)
class MISPGalaxy(AbstractMISP):
def from_dict(self, **kwargs):
if 'Galaxy' in kwargs:
kwargs = kwargs['Galaxy']
super().from_dict(**kwargs)
class MISPNoticelist(AbstractMISP):
def from_dict(self, **kwargs):

View File

@ -192,7 +192,9 @@
"Timo Steffens",
"Various"
],
"default": false,
"description": "The Sofacy Group (also known as APT28, Pawn Storm, Fancy Bear and Sednit) is a cyber espionage group believed to have ties to the Russian government. Likely operating since 2007, the group is known to target government, military, and security organizations. It has been characterized as an advanced persistent threat.",
"distribution": "0",
"galaxy_id": "366",
"id": "45563",
"meta": {
@ -248,7 +250,9 @@
"Will Metcalf",
"KahuSecurity"
],
"default": false,
"description": "Sednit EK is the exploit kit used by APT28",
"distribution": "0",
"galaxy_id": "370",
"id": "38813",
"meta": {
@ -274,7 +278,9 @@
"Will Metcalf",
"KahuSecurity"
],
"default": false,
"description": "DealersChoice is a Flash Player Exploit platform triggered by RTF",
"distribution": "0",
"galaxy_id": "370",
"id": "38805",
"meta": {
@ -315,7 +321,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "backdoor",
"distribution": "0",
"galaxy_id": "367",
"id": "46592",
"meta": {
@ -347,7 +355,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "",
"distribution": "0",
"galaxy_id": "367",
"id": "46670",
"meta": {
@ -370,7 +380,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "backdoor used by apt28\n\nSedreco serves as a spying backdoor; its functionalities can be extended with dynamically loaded plugins. It is made up of two distinct components: a dropper and the persistent payload installed by this dropper. We have not seen this component since April 2016.",
"distribution": "0",
"galaxy_id": "367",
"id": "46591",
"meta": {
@ -405,7 +417,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "This backdoor component is known to have a modular structure featuring various espionage functionalities, such as key-logging, screen grabbing and file exfiltration. This component is available for Osx, Windows, Linux and iOS operating systems.\n\nXagent is a modular backdoor with spying functionalities such as keystroke logging and file exfiltration. Xagent is the groups flagship backdoor and heavily used in their operations. Early versions for Linux and Windows were seen years ago, then in 2015 an iOS version came out. One year later, an Android version was discovered and finally, in the beginning of 2017, an Xagent sample for OS X was described.",
"distribution": "0",
"galaxy_id": "367",
"id": "46669",
"meta": {
@ -444,7 +458,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "JHUHUGIT is malware used by APT28. It is based on Carberp source code and serves as reconnaissance malware.[[Citation: Kaspersky Sofacy]][[Citation: F-Secure Sofacy 2015]][[Citation: ESET Sednit Part 1]][[Citation: FireEye APT28 January 2017]]\n\nAliases: JHUHUGIT, Seduploader, JKEYSKW, Sednit, GAMEFISH",
"distribution": "0",
"galaxy_id": "365",
"id": "41618",
"meta": {
@ -478,7 +494,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "XTunnel a VPN-like network proxy tool that can relay traffic between a C2 server and a victim. It was first seen in May 2013 and reportedly used by APT28 during the compromise of the Democratic National Committee.[[Citation: Crowdstrike DNC June 2016]][[Citation: Invincea XTunnel]][[Citation: ESET Sednit Part 2]]\n\nAliases: XTunnel, X-Tunnel, XAPS",
"distribution": "0",
"galaxy_id": "365",
"id": "41543",
"meta": {
@ -509,7 +527,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "ADVSTORESHELL is a spying backdoor that has been used by APT28 from at least 2012 to 2016. It is generally used for long-term espionage and is deployed on targets deemed interesting after a reconnaissance phase.[[Citation: Kaspersky Sofacy]][[Citation: ESET Sednit Part 2]]\n\nAliases: ADVSTORESHELL, NETUI, EVILTOSS, AZZY, Sedreco",
"distribution": "0",
"galaxy_id": "365",
"id": "41582",
"meta": {
@ -541,7 +561,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "USBStealer is malware that has used by APT28 since at least 2005 to extract information from air-gapped networks. It does not have the capability to communicate over the Internet and has been used in conjunction with ADVSTORESHELL.[[Citation: ESET Sednit USBStealer 2014]][[Citation: Kaspersky Sofacy]]\n\nAliases: USBStealer, USB Stealer, Win32/USBStealer",
"distribution": "0",
"galaxy_id": "365",
"id": "41549",
"meta": {
@ -571,7 +593,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "is a trojan that has been used by APT28 on OS X and appears to be a port of their standard CHOPSTICK or XAgent trojan.[[Citation: XAgentOSX]]",
"distribution": "0",
"galaxy_id": "365",
"id": "41551",
"meta": {
@ -595,7 +619,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "CHOPSTICK is malware family of modular backdoors used by APT28. It has been used from at least November 2012 to August 2016 and is usually dropped on victims as second-stage malware, though it has been used as first-stage malware in several cases.[[Citation: FireEye APT28]][[Citation: ESET Sednit Part 2]][[Citation: FireEye APT28 January 2017]]\n\nAliases: CHOPSTICK, SPLM, Xagent, X-Agent, webhp",
"distribution": "0",
"galaxy_id": "365",
"id": "41559",
"meta": {
@ -628,7 +654,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "Downdelph is a first-stage downloader written in Delphi that has been used by APT28 in rare instances between 2013 and 2015.[[Citation: ESET Sednit Part 3]]\n\nAliases: Downdelph, Delphacy",
"distribution": "0",
"galaxy_id": "365",
"id": "41504",
"meta": {

View File

@ -192,7 +192,9 @@
"Timo Steffens",
"Various"
],
"default": false,
"description": "The Sofacy Group (also known as APT28, Pawn Storm, Fancy Bear and Sednit) is a cyber espionage group believed to have ties to the Russian government. Likely operating since 2007, the group is known to target government, military, and security organizations. It has been characterized as an advanced persistent threat.",
"distribution": "0",
"galaxy_id": "366",
"id": "45563",
"meta": {
@ -248,7 +250,9 @@
"Will Metcalf",
"KahuSecurity"
],
"default": false,
"description": "Sednit EK is the exploit kit used by APT28",
"distribution": "0",
"galaxy_id": "370",
"id": "38813",
"meta": {
@ -274,7 +278,9 @@
"Will Metcalf",
"KahuSecurity"
],
"default": false,
"description": "DealersChoice is a Flash Player Exploit platform triggered by RTF",
"distribution": "0",
"galaxy_id": "370",
"id": "38805",
"meta": {
@ -315,7 +321,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "backdoor",
"distribution": "0",
"galaxy_id": "367",
"id": "46592",
"meta": {
@ -347,7 +355,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "",
"distribution": "0",
"galaxy_id": "367",
"id": "46670",
"meta": {
@ -370,7 +380,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "backdoor used by apt28\n\nSedreco serves as a spying backdoor; its functionalities can be extended with dynamically loaded plugins. It is made up of two distinct components: a dropper and the persistent payload installed by this dropper. We have not seen this component since April 2016.",
"distribution": "0",
"galaxy_id": "367",
"id": "46591",
"meta": {
@ -405,7 +417,9 @@
"Timo Steffens",
"Christophe Vandeplas"
],
"default": false,
"description": "This backdoor component is known to have a modular structure featuring various espionage functionalities, such as key-logging, screen grabbing and file exfiltration. This component is available for Osx, Windows, Linux and iOS operating systems.\n\nXagent is a modular backdoor with spying functionalities such as keystroke logging and file exfiltration. Xagent is the groups flagship backdoor and heavily used in their operations. Early versions for Linux and Windows were seen years ago, then in 2015 an iOS version came out. One year later, an Android version was discovered and finally, in the beginning of 2017, an Xagent sample for OS X was described.",
"distribution": "0",
"galaxy_id": "367",
"id": "46669",
"meta": {
@ -444,7 +458,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "JHUHUGIT is malware used by APT28. It is based on Carberp source code and serves as reconnaissance malware.[[Citation: Kaspersky Sofacy]][[Citation: F-Secure Sofacy 2015]][[Citation: ESET Sednit Part 1]][[Citation: FireEye APT28 January 2017]]\n\nAliases: JHUHUGIT, Seduploader, JKEYSKW, Sednit, GAMEFISH",
"distribution": "0",
"galaxy_id": "365",
"id": "41618",
"meta": {
@ -478,7 +494,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "XTunnel a VPN-like network proxy tool that can relay traffic between a C2 server and a victim. It was first seen in May 2013 and reportedly used by APT28 during the compromise of the Democratic National Committee.[[Citation: Crowdstrike DNC June 2016]][[Citation: Invincea XTunnel]][[Citation: ESET Sednit Part 2]]\n\nAliases: XTunnel, X-Tunnel, XAPS",
"distribution": "0",
"galaxy_id": "365",
"id": "41543",
"meta": {
@ -509,7 +527,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "ADVSTORESHELL is a spying backdoor that has been used by APT28 from at least 2012 to 2016. It is generally used for long-term espionage and is deployed on targets deemed interesting after a reconnaissance phase.[[Citation: Kaspersky Sofacy]][[Citation: ESET Sednit Part 2]]\n\nAliases: ADVSTORESHELL, NETUI, EVILTOSS, AZZY, Sedreco",
"distribution": "0",
"galaxy_id": "365",
"id": "41582",
"meta": {
@ -541,7 +561,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "USBStealer is malware that has used by APT28 since at least 2005 to extract information from air-gapped networks. It does not have the capability to communicate over the Internet and has been used in conjunction with ADVSTORESHELL.[[Citation: ESET Sednit USBStealer 2014]][[Citation: Kaspersky Sofacy]]\n\nAliases: USBStealer, USB Stealer, Win32/USBStealer",
"distribution": "0",
"galaxy_id": "365",
"id": "41549",
"meta": {
@ -571,7 +593,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "is a trojan that has been used by APT28 on OS X and appears to be a port of their standard CHOPSTICK or XAgent trojan.[[Citation: XAgentOSX]]",
"distribution": "0",
"galaxy_id": "365",
"id": "41551",
"meta": {
@ -595,7 +619,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "CHOPSTICK is malware family of modular backdoors used by APT28. It has been used from at least November 2012 to August 2016 and is usually dropped on victims as second-stage malware, though it has been used as first-stage malware in several cases.[[Citation: FireEye APT28]][[Citation: ESET Sednit Part 2]][[Citation: FireEye APT28 January 2017]]\n\nAliases: CHOPSTICK, SPLM, Xagent, X-Agent, webhp",
"distribution": "0",
"galaxy_id": "365",
"id": "41559",
"meta": {
@ -628,7 +654,9 @@
"authors": [
"MITRE"
],
"default": false,
"description": "Downdelph is a first-stage downloader written in Delphi that has been used by APT28 in rare instances between 2013 and 2015.[[Citation: ESET Sednit Part 3]]\n\nAliases: Downdelph, Delphacy",
"distribution": "0",
"galaxy_id": "365",
"id": "41504",
"meta": {

View File

@ -27,7 +27,7 @@ logger = logging.getLogger('pymisp')
try:
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPGalaxyCluster
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
from pymisp.exceptions import MISPServerError
except ImportError:
@ -2711,6 +2711,110 @@ class TestComprehensive(unittest.TestCase):
self.user_misp_connector.delete_event(event)
self.user_misp_connector.delete_event_report(new_event_report)
def test_galaxy_cluster(self):
self.admin_misp_connector.toggle_global_pythonify()
galaxy = self.admin_misp_connector.galaxies()[0]
new_galaxy_cluster = MISPGalaxyCluster()
new_galaxy_cluster.value = "Test Cluster"
new_galaxy_cluster.authors = ["MISP"]
new_galaxy_cluster.distribution = 1
new_galaxy_cluster.description = "Example test cluster"
try:
galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True)
existing_galaxy_cluster = galaxy.clusters[0]
new_galaxy_cluster = self.admin_misp_connector.add_galaxy_cluster(galaxy.id, new_galaxy_cluster)
# The new galaxy cluster should be under the selected galaxy
self.assertEqual(galaxy.id, new_galaxy_cluster.galaxy_id)
# The cluster should have the right value
self.assertEqual(new_galaxy_cluster.value, "Test Cluster")
new_galaxy_cluster.add_cluster_element("synonyms", "Test2")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster should have one element that is a synonym
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1)
element = new_galaxy_cluster.cluster_elements[0]
self.assertEqual(element.key, "synonyms")
self.assertEqual(element.value, "Test2")
# The cluster should have the old meta as a prop
self.assertEqual(new_galaxy_cluster.elements_meta, {'synonyms': ['Test2']})
# The cluster element should be updatable
element.value = "Test3"
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
element = new_galaxy_cluster.cluster_elements[0]
self.assertEqual(element.value, "Test3")
new_galaxy_cluster.add_cluster_element("synonyms", "ToDelete")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster should have two elements
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 2)
new_galaxy_cluster.cluster_elements = [e for e in new_galaxy_cluster.cluster_elements if e.value != "ToDelete"]
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster elements should be deletable
self.assertEqual(len(new_galaxy_cluster.cluster_elements), 1)
new_galaxy_cluster.add_cluster_relation(existing_galaxy_cluster, "is-tested-by")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
# The cluster should have a relationship
self.assertEqual(len(new_galaxy_cluster.cluster_relations), 1)
relation = new_galaxy_cluster.cluster_relations[0]
self.assertEqual(relation.referenced_galaxy_cluster_type, "is-tested-by")
self.assertEqual(relation.referenced_galaxy_cluster_uuid, existing_galaxy_cluster.uuid)
relation.add_tag("tlp:amber")
new_galaxy_cluster = self.admin_misp_connector.update_galaxy_cluster(new_galaxy_cluster)
relation = new_galaxy_cluster.cluster_relations[0]
# The relationship should have a tag of tlp:amber
self.assertEqual(len(relation.tags), 1)
self.assertEqual(relation.tags[0].name, "tlp:amber")
# The cluster relations should be deletable
resp = self.admin_misp_connector.delete_galaxy_cluster_relation(relation)
self.assertTrue(resp['success'])
# The cluster relation should no longer be present
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster)
self.assertEqual(len(new_galaxy_cluster.cluster_relations), 0)
resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster)
# Galaxy clusters should be soft deletable
self.assertTrue(resp['success'])
new_galaxy_cluster = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster)
self.assertTrue(isinstance(new_galaxy_cluster, MISPGalaxyCluster))
resp = self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True)
# Galaxy clusters should be hard deletable
self.assertTrue(resp['success'])
resp = self.admin_misp_connector.get_galaxy_cluster(new_galaxy_cluster)
self.assertTrue("errors" in resp)
finally:
self.admin_misp_connector.delete_galaxy_cluster_relation(relation)
self.admin_misp_connector.delete_galaxy_cluster(new_galaxy_cluster, hard=True)
self.admin_misp_connector.toggle_global_pythonify()
def test_event_galaxy(self):
self.admin_misp_connector.toggle_global_pythonify()
event = self.create_simple_event()
try:
galaxy = self.admin_misp_connector.galaxies()[0]
galaxy = self.admin_misp_connector.get_galaxy(galaxy.id, withCluster=True)
galaxy_cluster = galaxy.clusters[0]
event.add_tag(galaxy_cluster.tag_name)
event = self.admin_misp_connector.add_event(event)
# The event should have a galaxy attached
self.assertEqual(len(event.galaxies), 1)
event_galaxy = event.galaxies[0]
# The galaxy ID should equal the galaxy from which the cluster came from
self.assertEqual(event_galaxy.id, galaxy.id)
# The galaxy cluster should equal the cluster added
self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id)
finally:
self.admin_misp_connector.delete_event(event)
self.admin_misp_connector.toggle_global_pythonify()
@unittest.skip("Internal use only")
def missing_methods(self):
skip = [