mirror of https://github.com/MISP/PyMISP
parent
0aa0464ac0
commit
18049212a5
|
@ -28,7 +28,12 @@ try:
|
|||
warning_2022()
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
|
||||
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation # noqa
|
||||
from .mispevent import (MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, # noqa
|
||||
MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy,
|
||||
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
|
||||
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
|
||||
MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation,
|
||||
MISPCorrelationExclusion)
|
||||
from .tools import AbstractMISPObjectGenerator # noqa
|
||||
from .tools import Neo4j # noqa
|
||||
from .tools import openioc # noqa
|
||||
|
|
|
@ -25,7 +25,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
|
|||
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
|
||||
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
|
||||
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
|
||||
MISPGalaxyCluster, MISPGalaxyClusterRelation
|
||||
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion
|
||||
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
|
||||
|
||||
SearchType = TypeVar('SearchType', str, int)
|
||||
|
@ -1301,6 +1301,69 @@ class PyMISP:
|
|||
|
||||
# ## END Noticelist ###
|
||||
|
||||
# ## BEGIN Correlation Exclusions ###
|
||||
|
||||
def correlation_exclusions(self, pythonify: bool = False) -> Union[Dict, List[MISPCorrelationExclusion]]:
|
||||
"""Get all the correlation exclusions
|
||||
|
||||
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
|
||||
"""
|
||||
r = self._prepare_request('GET', 'correlation_exclusions')
|
||||
correlation_exclusions = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in correlation_exclusions:
|
||||
return correlation_exclusions
|
||||
to_return = []
|
||||
for correlation_exclusion in correlation_exclusions:
|
||||
c = MISPCorrelationExclusion()
|
||||
c.from_dict(**correlation_exclusion)
|
||||
to_return.append(c)
|
||||
return to_return
|
||||
|
||||
def get_correlation_exclusion(self, correlation_exclusion: Union[MISPCorrelationExclusion, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPCorrelationExclusion]:
|
||||
"""Get a correlation exclusion by ID
|
||||
|
||||
:param correlation_exclusion: Correlation exclusion to get
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
exclusion_id = get_uuid_or_id_from_abstract_misp(correlation_exclusion)
|
||||
r = self._prepare_request('GET', f'correlation_exclusions/view/{exclusion_id}')
|
||||
correlation_exclusion_j = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in correlation_exclusion_j:
|
||||
return correlation_exclusion_j
|
||||
c = MISPCorrelationExclusion()
|
||||
c.from_dict(**correlation_exclusion_j)
|
||||
return c
|
||||
|
||||
def add_correlation_exclusion(self, correlation_exclusion: MISPCorrelationExclusion, pythonify: bool = False) -> Union[Dict, MISPCorrelationExclusion]:
|
||||
"""Add a new correlation exclusion
|
||||
|
||||
:param correlation_exclusion: correlation exclusion to add
|
||||
:param pythonify: Returns a PyMISP Object instead of the plain json output
|
||||
"""
|
||||
r = self._prepare_request('POST', 'correlation_exclusions/add', data=correlation_exclusion)
|
||||
new_correlation_exclusion = self._check_json_response(r)
|
||||
if not (self.global_pythonify or pythonify) or 'errors' in new_correlation_exclusion:
|
||||
return new_correlation_exclusion
|
||||
c = MISPCorrelationExclusion()
|
||||
c.from_dict(**new_correlation_exclusion)
|
||||
return c
|
||||
|
||||
def delete_correlation_exclusion(self, correlation_exclusion: Union[MISPCorrelationExclusion, int, str, UUID]) -> Dict:
|
||||
"""Delete a correlation exclusion
|
||||
|
||||
:param correlation_exclusion: The MISPCorrelationExclusion you wish to delete from MISP
|
||||
"""
|
||||
exclusion_id = get_uuid_or_id_from_abstract_misp(correlation_exclusion)
|
||||
r = self._prepare_request('POST', f'correlation_exclusions/delete/{exclusion_id}')
|
||||
return self._check_json_response(r)
|
||||
|
||||
def clean_correlation_exclusions(self):
|
||||
"""Initiate correlation exclusions cleanup"""
|
||||
r = self._prepare_request('POST', 'correlation_exclusions/clean')
|
||||
return self._check_json_response(r)
|
||||
|
||||
# ## END Correlation Exclusions ###
|
||||
|
||||
# ## BEGIN Galaxy ###
|
||||
|
||||
def galaxies(self, pythonify: bool = False) -> Union[Dict, List[MISPGalaxy]]:
|
||||
|
|
|
@ -2071,6 +2071,14 @@ class MISPNoticelist(AbstractMISP):
|
|||
super().from_dict(**kwargs)
|
||||
|
||||
|
||||
class MISPCorrelationExclusion(AbstractMISP):
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if 'CorrelationExclusion' in kwargs:
|
||||
kwargs = kwargs['CorrelationExclusion']
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
|
||||
class MISPRole(AbstractMISP):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
|
|
|
@ -27,7 +27,7 @@ logger = logging.getLogger('pymisp')
|
|||
|
||||
|
||||
try:
|
||||
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPGalaxyCluster
|
||||
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlocklist, MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster
|
||||
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
|
||||
from pymisp.exceptions import MISPServerError
|
||||
except ImportError:
|
||||
|
@ -1633,6 +1633,21 @@ class TestComprehensive(unittest.TestCase):
|
|||
r = self.admin_misp_connector.disable_noticelist(testnl)
|
||||
self.assertFalse(r['Noticelist']['enabled'], r)
|
||||
|
||||
def test_correlation_exclusions(self):
|
||||
newce = MISPCorrelationExclusion()
|
||||
newce.value = "test-correlation-exclusion"
|
||||
r = self.admin_misp_connector.add_correlation_exclusion(newce, pythonify=True)
|
||||
self.assertEqual(r.value, newce.value)
|
||||
correlation_exclusions = self.admin_misp_connector.correlation_exclusions(pythonify=True)
|
||||
self.assertTrue(isinstance(correlation_exclusions, list))
|
||||
testce = correlation_exclusions[0]
|
||||
r = self.admin_misp_connector.get_correlation_exclusion(testce, pythonify=True)
|
||||
self.assertEqual(r.value, testce.value)
|
||||
r = self.admin_misp_connector.delete_correlation_exclusion(r)
|
||||
self.assertTrue(r['success'])
|
||||
r = self.admin_misp_connector.clean_correlation_exclusions()
|
||||
self.assertTrue(r['success'])
|
||||
|
||||
def test_galaxies(self):
|
||||
# Make sure we're up-to-date
|
||||
r = self.admin_misp_connector.update_galaxies()
|
||||
|
|
Loading…
Reference in New Issue