new: Blacklist methods

pull/612/head
Raphaël Vinot 2020-08-03 15:59:54 +02:00
parent 83273b6ce8
commit 2bbf888ca7
4 changed files with 160 additions and 7 deletions

View File

@ -24,7 +24,7 @@ Response (if any):
try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlacklist, MISPOrganisationBlacklist # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -21,7 +21,8 @@ from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPError,
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObject, \
MISPUser, MISPOrganisation, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, \
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, MISPInbox
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlacklist, MISPOrganisationBlacklist
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
SearchType = TypeVar('SearchType', str, int)
@ -2176,6 +2177,91 @@ class PyMISP:
# ## END User Settings ###
# ## BEGIN Blacklists ###
def event_blacklists(self, pythonify: bool = False) -> Union[Dict, List[MISPEventBlacklist]]:
"""Get all the blacklisted events"""
r = self._prepare_request('GET', 'eventBlacklists/index')
event_blacklists = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in event_blacklists:
return event_blacklists
to_return = []
for event_blacklist in event_blacklists:
ebl = MISPEventBlacklist()
ebl.from_dict(**event_blacklist)
to_return.append(ebl)
return to_return
def organisation_blacklists(self, pythonify: bool = False) -> Union[Dict, List[MISPOrganisationBlacklist]]:
"""Get all the blacklisted organisations"""
r = self._prepare_request('GET', 'orgBlacklists/index')
organisation_blacklists = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in organisation_blacklists:
return organisation_blacklists
to_return = []
for organisation_blacklist in organisation_blacklists:
obl = MISPOrganisationBlacklist()
obl.from_dict(**organisation_blacklist)
to_return.append(obl)
return to_return
def _add_entries_to_blacklist(self, blacklist_type: str, uuids: List[str], **kwargs) -> Dict:
if blacklist_type == 'event':
url = 'eventBlacklists/add'
elif blacklist_type == 'organisation':
url = 'orgBlacklists/add'
else:
raise PyMISPError('blacklist_type can only be "event" or "organisation"')
data = {'uuids': uuids}
if kwargs:
data.update({k: v for k, v in kwargs.items() if v})
r = self._prepare_request('POST', url, data=data)
return self._check_json_response(r)
def add_event_blacklist(self, uuids: List[str], comment: Optional[str] = None,
event_info: Optional[str] = None, event_orgc: Optional[str] = None) -> Dict:
'''Add a new event in the blacklist'''
return self._add_entries_to_blacklist('event', uuids=uuids, comment=comment, event_info=event_info, event_orgc=event_orgc)
def add_organisation_blacklist(self, uuids: List[str], comment: Optional[str] = None,
org_name: Optional[str] = None) -> Dict:
'''Add a new organisation in the blacklist'''
return self._add_entries_to_blacklist('organisation', uuids=uuids, comment=comment, org_name=org_name)
"""
# Not working yet
def update_event_blacklist(self, event_blacklist: MISPEventBlacklist, event_blacklist_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPEventBlacklist]:
'''Update an event in the blacklist'''
if event_blacklist_id is None:
eblid = get_uuid_or_id_from_abstract_misp(event_blacklist)
else:
eblid = get_uuid_or_id_from_abstract_misp(event_blacklist_id)
url = f'eventBlacklists/edit/{eblid}'
# event_blacklist.uuids = [event_blacklist.pop('event_uuid')]
print(event_blacklist.to_json(indent=2))
r = self._prepare_request('POST', url, data={'EventBlacklist': event_blacklist})
updated_event_blacklist = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_event_blacklist:
return updated_event_blacklist
e = MISPEventBlacklist()
e.from_dict(**updated_event_blacklist)
return e
"""
def delete_event_blacklist(self, event_blacklist: Union[MISPEventBlacklist, int, str, UUID]) -> Dict:
'''Delete a blacklisted event'''
event_blacklist_id = get_uuid_or_id_from_abstract_misp(event_blacklist)
response = self._prepare_request('POST', f'eventBlacklists/delete/{event_blacklist_id}')
return self._check_json_response(response)
def delete_organisation_blacklist(self, organisation_blacklist: Union[MISPOrganisationBlacklist, int, str, UUID]) -> Dict:
'''Delete a blacklisted organisation'''
org_blacklist_id = get_uuid_or_id_from_abstract_misp(organisation_blacklist)
response = self._prepare_request('POST', f'orgBlacklists/delete/{org_blacklist_id}')
return self._check_json_response(response)
# ## END Blacklists ###
# ## BEGIN Global helpers ###
def change_sharing_group_on_entity(self, misp_entity: Union[MISPEvent, MISPAttribute, MISPObject], sharing_group_id, pythonify: bool = False) -> Union[Dict, MISPEvent, MISPObject, MISPAttribute, MISPShadowAttribute]:

View File

@ -1697,3 +1697,25 @@ class MISPInbox(AbstractMISP):
def __repr__(self):
return f'<{self.__class__.__name__}(name={self.type})>'
class MISPEventBlacklist(AbstractMISP):
def from_dict(self, **kwargs):
if 'EventBlacklist' in kwargs:
kwargs = kwargs['EventBlacklist']
super().from_dict(**kwargs)
def __repr__(self):
return f'<{self.__class__.__name__}(event_uuid={self.event_uuid}'
class MISPOrganisationBlacklist(AbstractMISP):
def from_dict(self, **kwargs):
if 'OrgBlacklist' in kwargs:
kwargs = kwargs['OrgBlacklist']
super().from_dict(**kwargs)
def __repr__(self):
return f'<{self.__class__.__name__}(org_uuid={self.org_uuid}'

View File

@ -26,7 +26,7 @@ logger = logging.getLogger('pymisp')
try:
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting
from pymisp import register_user, PyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventBlacklist
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
from pymisp.exceptions import MISPServerError
except ImportError:
@ -2371,6 +2371,49 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_tag(tag)
def test_blacklists(self):
first = self.create_simple_event()
second = self.create_simple_event()
second.Orgc = self.test_org
to_delete = {'bl_events': [], 'bl_organisations': []}
try:
# test events BL
ebl = self.admin_misp_connector.add_event_blacklist(uuids=[first.uuid])
self.assertEqual(ebl['result']['successes'][0], first.uuid, ebl)
bl_events = self.admin_misp_connector.event_blacklists(pythonify=True)
for ble in bl_events:
if ble.event_uuid == first.uuid:
to_delete['bl_events'].append(ble)
break
else:
raise Exception('Unable to find UUID in Events blacklist')
first = self.user_misp_connector.add_event(first, pythonify=True)
self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first)
# ble.comment = 'This is a test'
# ble.event_info = 'foo'
# ble.event_orgc = 'bar'
# ble = self.admin_misp_connector.update_event_blacklist(ble)
# print(ble.to_json(indent=2))
# self.assertEqual(ble.comment, 'This is a test')
# test Org BL
obl = self.admin_misp_connector.add_organisation_blacklist(uuids=[self.test_org.uuid])
self.assertEqual(ebl['result']['successes'][0], self.test_org.uuid, obl)
bl_orgs = self.admin_misp_connector.organisation_blacklists(pythonify=True)
for blo in bl_orgs:
if blo.org_uuid == self.test_org.uuid:
to_delete['bl_organisations'].append(blo)
break
else:
raise Exception('Unable to find UUID in Orgs blacklist')
first = self.user_misp_connector.add_event(first, pythonify=True)
self.assertEqual(first['errors'][1]['message'], 'Could not add Event', first)
finally:
for ble in to_delete['bl_events']:
self.admin_misp_connector.delete_event_blacklist(ble)
for blo in to_delete['bl_organisations']:
self.admin_misp_connector.delete_organisation_blacklist(blo)
@unittest.skip("Internal use only")
def missing_methods(self):
skip = [
@ -2392,6 +2435,7 @@ class TestComprehensive(unittest.TestCase):
"attributes/bro",
"attributes/reportValidationIssuesAttributes",
"attributes/generateCorrelation",
"attributes/getMassEditForm",
"attributes/fetchViewValue",
"attributes/fetchEditForm",
"attributes/attributeReplace",
@ -2402,13 +2446,14 @@ class TestComprehensive(unittest.TestCase):
"attributes/hoverEnrichment",
"attributes/addTag",
"attributes/removeTag",
"attributes/toggleCorrelation", # TODO
"attributes/toggleToIDS", # TODO
"attributes/toggleCorrelation", # Use update attribute
"attributes/toggleToIDS", # Use update attribute
"attributes/checkAttachments",
"attributes/exportSearch",
'dashboards',
'decayingModel',
'eventBlacklists', # TODO
"eventBlacklists/edit",
"eventBlacklists/massDelete",
"eventDelegations/view",
"eventDelegations/index",
"eventGraph/view",
@ -2534,13 +2579,13 @@ class TestComprehensive(unittest.TestCase):
"objects/orphanedObjectDiagnostics",
"objects/proposeObjectsFromAttributes",
"objects/groupAttributesIntoObject",
'orgBlacklists', # TODO
"admin/organisations/generateuuid",
"organisations/landingpage",
"organisations/fetchOrgsForSG",
"organisations/fetchSGOrgRow",
"organisations/getUUIDs",
"admin/organisations/merge",
'orgBlacklists/edit',
"pages/display",
"posts/pushMessageToZMQ",
"posts/add",