chg: Make pythonify=False default everywhere

Add a method to toggle pythonify globally
pull/426/head
Raphaël Vinot 2019-07-22 11:28:24 +02:00
parent 08da648dfb
commit a6a0fcd4fb
2 changed files with 192 additions and 163 deletions

View File

@ -58,6 +58,8 @@ class ExpandedPyMISP(PyMISP):
self.auth = auth
self.tool = tool
self.global_pythonify = False
self.resources_path = Path(__file__).parent / 'data'
if debug:
logger.setLevel(logging.DEBUG)
@ -144,36 +146,39 @@ class ExpandedPyMISP(PyMISP):
return {'version': '{}.{}.{}'.format(master_version['major'], master_version['minor'], master_version['hotfix'])}
return {'error': 'Impossible to retrieve the version of the master branch.'}
def toggle_global_pythonify(self):
self.global_pythonify = not self.global_pythonify
# ## BEGIN Event ##
def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=True):
def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=False):
'''Get an event from a MISP instance'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
event = self._prepare_request('GET', f'events/{event_id}')
event = self._check_response(event, expect_json=True)
if not pythonify or 'errors' in event:
if not (self.global_pythonify or pythonify) or 'errors' in event:
return event
e = MISPEvent()
e.load(event)
return e
def add_event(self, event: MISPEvent, pythonify: bool=True):
def add_event(self, event: MISPEvent, pythonify: bool=False):
'''Add a new event on a MISP instance'''
new_event = self._prepare_request('POST', 'events', data=event)
new_event = self._check_response(new_event, expect_json=True)
if not pythonify or 'errors' in new_event:
if not (self.global_pythonify or pythonify) or 'errors' in new_event:
return new_event
e = MISPEvent()
e.load(new_event)
return e
def update_event(self, event: MISPEvent, event_id: int=None, pythonify: bool=True):
def update_event(self, event: MISPEvent, event_id: int=None, pythonify: bool=False):
'''Update an event on a MISP instance'''
if event_id is None:
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
updated_event = self._prepare_request('POST', f'events/{event_id}', data=event)
updated_event = self._check_response(updated_event, expect_json=True)
if not pythonify or 'errors' in updated_event:
if not (self.global_pythonify or pythonify) or 'errors' in updated_event:
return updated_event
e = MISPEvent()
e.load(updated_event)
@ -199,35 +204,35 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Object ###
def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool=True):
def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool=False):
'''Get an object from the remote MISP instance'''
object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object)
misp_object = self._prepare_request('GET', f'objects/view/{object_id}')
misp_object = self._check_response(misp_object, expect_json=True)
if not pythonify or 'errors' in misp_object:
if not (self.global_pythonify or pythonify) or 'errors' in misp_object:
return misp_object
o = MISPObject(misp_object['Object']['name'])
o.from_dict(**misp_object)
return o
def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool=True):
def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool=False):
'''Add a MISP Object to an existing MISP event'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
new_object = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object)
new_object = self._check_response(new_object, expect_json=True)
if not pythonify or 'errors' in new_object:
if not (self.global_pythonify or pythonify) or 'errors' in new_object:
return new_object
o = MISPObject(new_object['Object']['name'])
o.from_dict(**new_object)
return o
def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=True):
def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=False):
'''Update an object on a MISP instance'''
if object_id is None:
object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object)
updated_object = self._prepare_request('POST', f'objects/edit/{object_id}', data=misp_object)
updated_object = self._check_response(updated_object, expect_json=True)
if not pythonify or 'errors' in updated_object:
if not (self.global_pythonify or pythonify) or 'errors' in updated_object:
return updated_object
o = MISPObject(updated_object['Object']['name'])
o.from_dict(**updated_object)
@ -244,7 +249,7 @@ class ExpandedPyMISP(PyMISP):
"""Add a reference to an object"""
object_reference = self._prepare_request('POST', 'object_references/add', misp_object_reference)
object_reference = self._check_response(object_reference, expect_json=True)
if not pythonify or 'errors' in object_reference:
if not (self.global_pythonify or pythonify) or 'errors' in object_reference:
return object_reference
r = MISPObjectReference()
r.from_dict(**object_reference)
@ -258,11 +263,11 @@ class ExpandedPyMISP(PyMISP):
# Object templates
def object_templates(self, pythonify=False):
def object_templates(self, pythonify: bool=False):
"""Get all the object templates."""
object_templates = self._prepare_request('GET', 'objectTemplates')
object_templates = self._check_response(object_templates, expect_json=True)
if not pythonify or 'errors' in object_templates:
if not (self.global_pythonify or pythonify) or 'errors' in object_templates:
return object_templates
to_return = []
for object_template in object_templates:
@ -271,12 +276,12 @@ class ExpandedPyMISP(PyMISP):
to_return.append(o)
return to_return
def get_object_template(self, object_template: Union[MISPObjectTemplate, int, str, UUID], pythonify=False):
def get_object_template(self, object_template: Union[MISPObjectTemplate, int, str, UUID], pythonify: bool=False):
"""Gets the full object template corresponting the UUID passed as parameter"""
object_template_id = self.__get_uuid_or_id_from_abstract_misp(object_template)
object_template = self._prepare_request('GET', f'objectTemplates/view/{object_template_id}')
object_template = self._check_response(object_template, expect_json=True)
if not pythonify or 'errors' in object_template:
if not (self.global_pythonify or pythonify) or 'errors' in object_template:
return object_template
t = MISPObjectTemplate()
t.from_dict(**object_template)
@ -291,18 +296,18 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Attribute ###
def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=True):
def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=False):
'''Get an attribute from a MISP instance'''
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
attribute = self._prepare_request('GET', f'attributes/view/{attribute_id}')
attribute = self._check_response(attribute, expect_json=True)
if not pythonify or 'errors' in attribute:
if not (self.global_pythonify or pythonify) or 'errors' in attribute:
return attribute
a = MISPAttribute()
a.from_dict(**attribute)
return a
def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True):
def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False):
'''Add an attribute to an existing MISP event'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute)
@ -312,13 +317,13 @@ class ExpandedPyMISP(PyMISP):
# At this point, we assume the user tried to add an attribute on an event they don't own
# Re-try with a proposal
return self.add_attribute_proposal(event_id, attribute, pythonify)
if not pythonify or 'errors' in new_attribute:
if not (self.global_pythonify or pythonify) or 'errors' in new_attribute:
return new_attribute
a = MISPAttribute()
a.from_dict(**new_attribute)
return a
def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=True):
def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=False):
'''Update an attribute on a MISP instance'''
if attribute_id is None:
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
@ -330,7 +335,7 @@ class ExpandedPyMISP(PyMISP):
# At this point, we assume the user tried to update an attribute on an event they don't own
# Re-try with a proposal
return self.update_attribute_proposal(attribute_id, attribute, pythonify)
if not pythonify or 'errors' in updated_attribute:
if not (self.global_pythonify or pythonify) or 'errors' in updated_attribute:
return updated_attribute
a = MISPAttribute()
a.from_dict(**updated_attribute)
@ -353,11 +358,11 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Attribute Proposal ###
def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=True):
def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=False):
proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal)
attribute_proposal = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}')
attribute_proposal = self._check_response(attribute_proposal, expect_json=True)
if not pythonify or 'errors' in attribute_proposal:
if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposal:
return attribute_proposal
a = MISPShadowAttribute()
a.from_dict(**attribute_proposal)
@ -365,26 +370,26 @@ class ExpandedPyMISP(PyMISP):
# NOTE: the tree following method have a very specific meaning, look at the comments
def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True):
def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False):
'''Propose a new attribute in an event'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
# FIXME: attribute needs to be a complete MISPAttribute: https://github.com/MISP/MISP/issues/4868
new_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute)
new_attribute_proposal = self._check_response(new_attribute_proposal, expect_json=True)
if not pythonify or 'errors' in new_attribute_proposal:
if not (self.global_pythonify or pythonify) or 'errors' in new_attribute_proposal:
return new_attribute_proposal
a = MISPShadowAttribute()
a.from_dict(**new_attribute_proposal)
return a
def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True):
def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False):
'''Propose a change for an attribute'''
# FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4857
initial_attribute_id = self.__get_uuid_or_id_from_abstract_misp(initial_attribute)
attribute = {'ShadowAttribute': attribute}
update_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute)
update_attribute_proposal = self._check_response(update_attribute_proposal, expect_json=True)
if not pythonify or 'errors' in update_attribute_proposal:
if not (self.global_pythonify or pythonify) or 'errors' in update_attribute_proposal:
return update_attribute_proposal
a = MISPShadowAttribute()
a.from_dict(**update_attribute_proposal)
@ -414,7 +419,7 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Sighting ###
def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify=False):
def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False):
"""Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)"""
# FIXME: https://github.com/MISP/MISP/issues/4875
if isinstance(misp_entity, MISPEvent):
@ -430,7 +435,7 @@ class ExpandedPyMISP(PyMISP):
url = f'sightings/listSightings/{misp_entity.id}/{scope}'
sightings = self._prepare_request('POST', url)
sightings = self._check_response(sightings, expect_json=True)
if not pythonify or 'errors' in sightings:
if not (self.global_pythonify or pythonify) or 'errors' in sightings:
return sightings
to_return = []
for sighting in sightings:
@ -450,7 +455,7 @@ class ExpandedPyMISP(PyMISP):
# Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value
new_sighting = self._prepare_request('POST', f'sightings/add', data=sighting)
new_sighting = self._check_response(new_sighting, expect_json=True)
if not pythonify or 'errors' in new_sighting:
if not (self.global_pythonify or pythonify) or 'errors' in new_sighting:
return new_sighting
s = MISPSighting()
s.from_dict(**new_sighting)
@ -470,7 +475,7 @@ class ExpandedPyMISP(PyMISP):
"""Get the list of existing tags."""
tags = self._prepare_request('GET', 'tags')
tags = self._check_response(tags, expect_json=True)
if not pythonify or 'errors' in tags:
if not (self.global_pythonify or pythonify) or 'errors' in tags:
return tags['Tag']
to_return = []
for tag in tags['Tag']:
@ -484,17 +489,17 @@ class ExpandedPyMISP(PyMISP):
tag_id = self.__get_uuid_or_id_from_abstract_misp(tag)
tag = self._prepare_request('GET', f'tags/view/{tag_id}')
tag = self._check_response(tag, expect_json=True)
if not pythonify or 'errors' in tag:
if not (self.global_pythonify or pythonify) or 'errors' in tag:
return tag
t = MISPTag()
t.from_dict(**tag)
return t
def add_tag(self, tag: MISPTag, pythonify: bool=True):
def add_tag(self, tag: MISPTag, pythonify: bool=False):
'''Add a new tag on a MISP instance'''
new_tag = self._prepare_request('POST', 'tags/add', data=tag)
new_tag = self._check_response(new_tag, expect_json=True)
if not pythonify or 'errors' in new_tag:
if not (self.global_pythonify or pythonify) or 'errors' in new_tag:
return new_tag
t = MISPTag()
t.from_dict(**new_tag)
@ -518,7 +523,7 @@ class ExpandedPyMISP(PyMISP):
tag = {'Tag': tag}
updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', data=tag)
updated_tag = self._check_response(updated_tag, expect_json=True)
if not pythonify or 'errors' in updated_tag:
if not (self.global_pythonify or pythonify) or 'errors' in updated_tag:
return updated_tag
t = MISPTag()
t.from_dict(**updated_tag)
@ -543,7 +548,7 @@ class ExpandedPyMISP(PyMISP):
"""Get all the taxonomies."""
taxonomies = self._prepare_request('GET', 'taxonomies')
taxonomies = self._check_response(taxonomies, expect_json=True)
if not pythonify or 'errors' in taxonomies:
if not (self.global_pythonify or pythonify) or 'errors' in taxonomies:
return taxonomies
to_return = []
for taxonomy in taxonomies:
@ -557,7 +562,7 @@ class ExpandedPyMISP(PyMISP):
taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy)
taxonomy = self._prepare_request('GET', f'taxonomies/view/{taxonomy_id}')
taxonomy = self._check_response(taxonomy, expect_json=True)
if not pythonify or 'errors' in taxonomy:
if not (self.global_pythonify or pythonify) or 'errors' in taxonomy:
return taxonomy
t = MISPTaxonomy()
t.from_dict(**taxonomy)
@ -601,7 +606,7 @@ class ExpandedPyMISP(PyMISP):
"""Get all the warninglists."""
warninglists = self._prepare_request('GET', 'warninglists')
warninglists = self._check_response(warninglists, expect_json=True)
if not pythonify or 'errors' in warninglists:
if not (self.global_pythonify or pythonify) or 'errors' in warninglists:
return warninglists['Warninglists']
to_return = []
for warninglist in warninglists['Warninglists']:
@ -615,7 +620,7 @@ class ExpandedPyMISP(PyMISP):
warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist)
warninglist = self._prepare_request('GET', f'warninglists/view/{warninglist_id}')
warninglist = self._check_response(warninglist, expect_json=True)
if not pythonify or 'errors' in warninglist:
if not (self.global_pythonify or pythonify) or 'errors' in warninglist:
return warninglist
w = MISPWarninglist()
w.from_dict(**warninglist)
@ -667,11 +672,11 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Noticelist ###
def noticelists(self, pythonify=False):
def noticelists(self, pythonify: bool=False):
"""Get all the noticelists."""
noticelists = self._prepare_request('GET', 'noticelists')
noticelists = self._check_response(noticelists, expect_json=True)
if not pythonify or 'errors' in noticelists:
if not (self.global_pythonify or pythonify) or 'errors' in noticelists:
return noticelists
to_return = []
for noticelist in noticelists:
@ -680,12 +685,12 @@ class ExpandedPyMISP(PyMISP):
to_return.append(n)
return to_return
def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify=False):
def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify: bool=False):
"""Get a noticelist by id."""
noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist)
noticelist = self._prepare_request('GET', f'noticelists/view/{noticelist_id}')
noticelist = self._check_response(noticelist, expect_json=True)
if not pythonify or 'errors' in noticelist:
if not (self.global_pythonify or pythonify) or 'errors' in noticelist:
return noticelist
n = MISPNoticelist()
n.from_dict(**noticelist)
@ -716,11 +721,11 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Galaxy ###
def galaxies(self, pythonify=False):
def galaxies(self, pythonify: bool=False):
"""Get all the galaxies."""
galaxies = self._prepare_request('GET', 'galaxies')
galaxies = self._check_response(galaxies, expect_json=True)
if not pythonify or 'errors' in galaxies:
if not (self.global_pythonify or pythonify) or 'errors' in galaxies:
return galaxies
to_return = []
for galaxy in galaxies:
@ -729,12 +734,12 @@ class ExpandedPyMISP(PyMISP):
to_return.append(g)
return to_return
def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify=False):
def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool=False):
"""Get a galaxy by id."""
galaxy_id = self.__get_uuid_or_id_from_abstract_misp(galaxy)
galaxy = self._prepare_request('GET', f'galaxies/view/{galaxy_id}')
galaxy = self._check_response(galaxy, expect_json=True)
if not pythonify or 'errors' in galaxy:
if not (self.global_pythonify or pythonify) or 'errors' in galaxy:
return galaxy
g = MISPGalaxy()
g.from_dict(**galaxy)
@ -753,7 +758,7 @@ class ExpandedPyMISP(PyMISP):
"""Get the list of existing feeds."""
feeds = self._prepare_request('GET', 'feeds')
feeds = self._check_response(feeds, expect_json=True)
if not pythonify or 'errors' in feeds:
if not (self.global_pythonify or pythonify) or 'errors' in feeds:
return feeds
to_return = []
for feed in feeds:
@ -767,7 +772,7 @@ class ExpandedPyMISP(PyMISP):
feed_id = self.__get_uuid_or_id_from_abstract_misp(feed)
feed = self._prepare_request('GET', f'feeds/view/{feed_id}')
feed = self._check_response(feed, expect_json=True)
if not pythonify or 'errors' in feed:
if not (self.global_pythonify or pythonify) or 'errors' in feed:
return feed
f = MISPFeed()
f.from_dict(**feed)
@ -779,7 +784,7 @@ class ExpandedPyMISP(PyMISP):
feed = {'Feed': feed}
new_feed = self._prepare_request('POST', 'feeds/add', data=feed)
new_feed = self._check_response(new_feed, expect_json=True)
if not pythonify or 'errors' in new_feed:
if not (self.global_pythonify or pythonify) or 'errors' in new_feed:
return new_feed
f = MISPFeed()
f.from_dict(**new_feed)
@ -829,7 +834,7 @@ class ExpandedPyMISP(PyMISP):
feed = {'Feed': feed}
updated_feed = self._prepare_request('POST', f'feeds/edit/{feed_id}', data=feed)
updated_feed = self._check_response(updated_feed, expect_json=True)
if not pythonify or 'errors' in updated_feed:
if not (self.global_pythonify or pythonify) or 'errors' in updated_feed:
return updated_feed
f = MISPFeed()
f.from_dict(**updated_feed)
@ -877,11 +882,11 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Server ###
def servers(self, pythonify=False):
def servers(self, pythonify: bool=False):
"""Get the existing servers the MISP instance can synchronise with"""
servers = self._prepare_request('GET', 'servers')
servers = self._check_response(servers, expect_json=True)
if not pythonify or 'errors' in servers:
if not (self.global_pythonify or pythonify) or 'errors' in servers:
return servers
to_return = []
for server in servers:
@ -890,23 +895,23 @@ class ExpandedPyMISP(PyMISP):
to_return.append(s)
return to_return
def add_server(self, server: MISPServer, pythonify: bool=True):
def add_server(self, server: MISPServer, pythonify: bool=False):
"""Add a server to synchronise with"""
server = self._prepare_request('POST', f'servers/add', data=server)
server = self._check_response(server, expect_json=True)
if not pythonify or 'errors' in server:
if not (self.global_pythonify or pythonify) or 'errors' in server:
return server
s = MISPServer()
s.from_dict(**server)
return s
def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=True):
def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=False):
'''Update a server to synchronise with'''
if server_id is None:
server_id = self.__get_uuid_or_id_from_abstract_misp(server)
updated_server = self._prepare_request('POST', f'servers/edit/{server_id}', data=server)
updated_server = self._check_response(updated_server, expect_json=True)
if not pythonify or 'errors' in updated_server:
if not (self.global_pythonify or pythonify) or 'errors' in updated_server:
return updated_server
s = MISPServer()
s.from_dict(**updated_server)
@ -952,7 +957,7 @@ class ExpandedPyMISP(PyMISP):
"""Get the existing sharing groups"""
sharing_groups = self._prepare_request('GET', 'sharing_groups')
sharing_groups = self._check_response(sharing_groups, expect_json=True)
if not pythonify or 'errors' in sharing_groups:
if not (self.global_pythonify or pythonify) or 'errors' in sharing_groups:
return sharing_groups
to_return = []
for sharing_group in sharing_groups:
@ -961,13 +966,13 @@ class ExpandedPyMISP(PyMISP):
to_return.append(s)
return to_return
def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=True):
def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=False):
"""Add a new sharing group"""
sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group)
sharing_group = self._check_response(sharing_group, expect_json=True)
# FIXME: https://github.com/MISP/MISP/issues/4882
sharing_group = sharing_group[0]
if not pythonify or 'errors' in sharing_group:
if not (self.global_pythonify or pythonify) or 'errors' in sharing_group:
return sharing_group
s = MISPSharingGroup()
s.from_dict(**sharing_group)
@ -1033,11 +1038,11 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Organisation ###
def organisations(self, scope="local", pythonify=False):
def organisations(self, scope="local", pythonify: bool=False):
"""Get all the organisations."""
organisations = self._prepare_request('GET', f'organisations/index/scope:{scope}')
organisations = self._check_response(organisations, expect_json=True)
if not pythonify or 'errors' in organisations:
if not (self.global_pythonify or pythonify) or 'errors' in organisations:
return organisations
to_return = []
for organisation in organisations:
@ -1046,34 +1051,34 @@ class ExpandedPyMISP(PyMISP):
to_return.append(o)
return to_return
def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=True):
def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=False):
'''Get an organisation.'''
organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation)
organisation = self._prepare_request('GET', f'organisations/view/{organisation_id}')
organisation = self._check_response(organisation, expect_json=True)
if not pythonify or 'errors' in organisation:
if not (self.global_pythonify or pythonify) or 'errors' in organisation:
return organisation
o = MISPOrganisation()
o.from_dict(**organisation)
return o
def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=True):
def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=False):
'''Add an organisation'''
new_organisation = self._prepare_request('POST', f'admin/organisations/add', data=organisation)
new_organisation = self._check_response(new_organisation, expect_json=True)
if not pythonify or 'errors' in new_organisation:
if not (self.global_pythonify or pythonify) or 'errors' in new_organisation:
return new_organisation
o = MISPOrganisation()
o.from_dict(**new_organisation)
return o
def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=True):
def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=False):
'''Update an organisation'''
if organisation_id is None:
organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation)
updated_organisation = self._prepare_request('POST', f'admin/organisations/edit/{organisation_id}', data=organisation)
updated_organisation = self._check_response(updated_organisation, expect_json=True)
if not pythonify or 'errors' in updated_organisation:
if not (self.global_pythonify or pythonify) or 'errors' in updated_organisation:
return updated_organisation
o = MISPOrganisation()
o.from_dict(**organisation)
@ -1090,11 +1095,11 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN User ###
def users(self, pythonify=False):
def users(self, pythonify: bool=False):
"""Get all the users."""
users = self._prepare_request('GET', 'admin/users')
users = self._check_response(users, expect_json=True)
if not pythonify or 'errors' in users:
if not (self.global_pythonify or pythonify) or 'errors' in users:
return users
to_return = []
for user in users:
@ -1108,7 +1113,7 @@ class ExpandedPyMISP(PyMISP):
user_id = self.__get_uuid_or_id_from_abstract_misp(user)
user = self._prepare_request('GET', f'users/view/{user_id}')
user = self._check_response(user, expect_json=True)
if not pythonify or 'errors' in user:
if not (self.global_pythonify or pythonify) or 'errors' in user:
return user
u = MISPUser()
u.from_dict(**user)
@ -1118,7 +1123,7 @@ class ExpandedPyMISP(PyMISP):
'''Add a new user'''
user = self._prepare_request('POST', f'admin/users/add', data=user)
user = self._check_response(user, expect_json=True)
if not pythonify or 'errors' in user:
if not (self.global_pythonify or pythonify) or 'errors' in user:
return user
u = MISPUser()
u.from_dict(**user)
@ -1130,7 +1135,7 @@ class ExpandedPyMISP(PyMISP):
user_id = self.__get_uuid_or_id_from_abstract_misp(user)
updated_user = self._prepare_request('POST', f'admin/users/edit/{user_id}', data=user)
updated_user = self._check_response(updated_user, expect_json=True)
if not pythonify or 'errors' in updated_user:
if not (self.global_pythonify or pythonify) or 'errors' in updated_user:
return updated_user
e = MISPUser()
e.from_dict(**updated_user)
@ -1151,7 +1156,7 @@ class ExpandedPyMISP(PyMISP):
"""Get the existing roles"""
roles = self._prepare_request('GET', 'roles')
roles = self._check_response(roles, expect_json=True)
if not pythonify or 'errors' in roles:
if not (self.global_pythonify or pythonify) or 'errors' in roles:
return roles
to_return = []
for role in roles:
@ -1321,11 +1326,11 @@ class ExpandedPyMISP(PyMISP):
else:
normalized_response = self._check_response(response)
if return_format == 'csv' and pythonify and not headerless:
if return_format == 'csv' and (self.global_pythonify or pythonify) and not headerless:
return self._csv_to_dict(normalized_response)
elif 'errors' in normalized_response:
return normalized_response
elif return_format == 'json' and pythonify:
elif return_format == 'json' and self.global_pythonify or pythonify:
# The response is in json, we can convert it to a list of pythonic MISP objects
to_return = []
if controller == 'events':
@ -1389,7 +1394,7 @@ class ExpandedPyMISP(PyMISP):
response = self._prepare_request('POST', url, data=query)
normalized_response = self._check_response(response, expect_json=True)
if not pythonify:
if not (self.global_pythonify or pythonify):
return normalized_response
to_return = []
for e_meta in normalized_response:
@ -1455,9 +1460,9 @@ class ExpandedPyMISP(PyMISP):
url = urljoin(self.root_url, url_path)
response = self._prepare_request('POST', url, data=query)
normalized_response = self._check_response(response, expect_json=True)
if not pythonify or 'errors' in normalized_response:
if not (self.global_pythonify or pythonify) or 'errors' in normalized_response:
return normalized_response
elif pythonify:
elif self.global_pythonify or pythonify:
to_return = []
for s in normalized_response:
entries = {}
@ -1514,7 +1519,7 @@ class ExpandedPyMISP(PyMISP):
response = self._prepare_request('POST', 'admin/logs/index', data=query)
normalized_response = self._check_response(response, expect_json=True)
if not pythonify or 'errors' in normalized_response:
if not (self.global_pythonify or pythonify) or 'errors' in normalized_response:
return normalized_response
to_return = []
@ -1543,7 +1548,7 @@ class ExpandedPyMISP(PyMISP):
return self._check_response(response, lenient_response_type=True)
def freetext(self, event: Union[MISPEvent, int, str, UUID], string: str, adhereToWarninglists: Union[bool, str]=False,
distribution: int=None, returnMetaAttributes: bool=False, pythonify=False):
distribution: int=None, returnMetaAttributes: bool=False, pythonify: bool=False):
"""Pass a text to the freetext importer"""
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
query = {"value": string}
@ -1558,7 +1563,7 @@ class ExpandedPyMISP(PyMISP):
query['returnMetaAttributes'] = returnMetaAttributes
attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query)
attributes = self._check_response(attributes, expect_json=True)
if returnMetaAttributes or not pythonify or 'errors' in attributes:
if returnMetaAttributes or not (self.global_pythonify or pythonify) or 'errors' in attributes:
return attributes
to_return = []
for attribute in attributes:
@ -1634,18 +1639,18 @@ class ExpandedPyMISP(PyMISP):
# ## BEGIN Global helpers ###
def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id):
def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id, pythonify: bool=False):
"""Change the sharing group of an event, an attribute, or an object"""
misp_entity.distribution = 4 # Needs to be 'Sharing group'
if 'SharingGroup' in misp_entity: # Delete former SharingGroup information
del misp_entity.SharingGroup
misp_entity.sharing_group_id = sharing_group_id # Set new sharing group id
if isinstance(misp_entity, MISPEvent):
return self.update_event(misp_entity)
return self.update_event(misp_entity, pythonify=pythonify)
elif isinstance(misp_entity, MISPObject):
return self.update_object(misp_entity)
return self.update_object(misp_entity, pythonify=pythonify)
elif isinstance(misp_entity, MISPAttribute):
return self.update_attribute(misp_entity)
return self.update_attribute(misp_entity, pythonify=pythonify)
else:
raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute')

View File

@ -53,7 +53,7 @@ class TestComprehensive(unittest.TestCase):
# Creates an org
organisation = MISPOrganisation()
organisation.name = 'Test Org'
cls.test_org = cls.admin_misp_connector.add_organisation(organisation)
cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True)
# Set the refault role (id 3 on the VM)
cls.admin_misp_connector.set_default_role(3)
# Creates a user
@ -62,6 +62,7 @@ class TestComprehensive(unittest.TestCase):
user.org_id = cls.test_org.id
cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True)
cls.user_misp_connector = ExpandedPyMISP(url, cls.test_usr.authkey, verifycert, debug=False)
cls.user_misp_connector.toggle_global_pythonify()
# Creates a publisher
user = MISPUser()
user.email = 'testpub@user.local'
@ -136,8 +137,8 @@ class TestComprehensive(unittest.TestCase):
# Create first and third event as admin
# usr won't be able to see the first one
first = self.admin_misp_connector.add_event(first_event)
third = self.admin_misp_connector.add_event(third_event)
first = self.admin_misp_connector.add_event(first_event, pythonify=True)
third = self.admin_misp_connector.add_event(third_event, pythonify=True)
# Create second event as user
second = self.user_misp_connector.add_event(second_event)
return first, second, third
@ -155,12 +156,12 @@ class TestComprehensive(unittest.TestCase):
for e in events:
self.assertIn(e.id, [first.id, second.id])
# Search as user
events = self.user_misp_connector.search(value=first.attributes[0].value, pythonify=True)
events = self.user_misp_connector.search(value=first.attributes[0].value)
self.assertEqual(len(events), 1)
for e in events:
self.assertIn(e.id, [second.id])
# Non-existing value
events = self.user_misp_connector.search(value=str(uuid4()), pythonify=True)
events = self.user_misp_connector.search(value=str(uuid4()))
self.assertEqual(events, [])
finally:
# Delete events
@ -178,12 +179,12 @@ class TestComprehensive(unittest.TestCase):
for a in attributes:
self.assertIn(a.event_id, [first.id, second.id])
# Search as user
attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value)
self.assertEqual(len(attributes), 1)
for a in attributes:
self.assertIn(a.event_id, [second.id])
# Non-existing value
attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()), pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()))
self.assertEqual(attributes, [])
finally:
# Delete event
@ -254,15 +255,15 @@ class TestComprehensive(unittest.TestCase):
for e in events:
self.assertIn(e.id, [first.id])
# Search as user
events = self.user_misp_connector.search(tags='tlp:white___test', pythonify=True)
events = self.user_misp_connector.search(tags='tlp:white___test')
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [second.id, third.id])
events = self.user_misp_connector.search(tags='tlp:amber___test', pythonify=True)
events = self.user_misp_connector.search(tags='tlp:amber___test')
self.assertEqual(len(events), 2)
for e in events:
self.assertIn(e.id, [second.id, third.id])
events = self.user_misp_connector.search(tags='admin_only', pythonify=True)
events = self.user_misp_connector.search(tags='admin_only')
self.assertEqual(events, [])
finally:
# Delete event
@ -282,14 +283,14 @@ class TestComprehensive(unittest.TestCase):
attributes = self.admin_misp_connector.search(tags='admin_only', pythonify=True)
self.assertEqual(len(attributes), 1)
# Search as user
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test', pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test')
self.assertEqual(len(attributes), 4)
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test', pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test')
self.assertEqual(len(attributes), 3)
attributes = self.user_misp_connector.search(tags='admin_only', pythonify=True)
attributes = self.user_misp_connector.search(tags='admin_only')
self.assertEqual(attributes, [])
attributes_tags_search = self.admin_misp_connector.build_complex_query(or_parameters=['tlp:amber___test'], not_parameters=['tlp:white___test'])
attributes = self.user_misp_connector.search(controller='attributes', tags=attributes_tags_search, pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', tags=attributes_tags_search)
self.assertEqual(len(attributes), 1)
finally:
# Delete event
@ -361,19 +362,19 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second)
# Search as user
# # Test - last 4 min
events = self.user_misp_connector.search(timestamp='4m', pythonify=True)
events = self.user_misp_connector.search(timestamp='4m')
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event
events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp(), pythonify=True)
events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp())
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min
events = self.user_misp_connector.search(timestamp=['6m', '4m'], pythonify=True)
events = self.user_misp_connector.search(timestamp=['6m', '4m'])
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
@ -399,19 +400,19 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second)
# Search as user
# # Test - last 4 min
attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m', pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m')
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test timestamp of 2nd event
attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp(), pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp())
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, second.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp()))
# # Test interval -6 min -> -4 min
attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m'], pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m'])
self.assertEqual(len(attributes), 1)
self.assertEqual(attributes[0].event_id, first.id)
self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp()))
@ -430,7 +431,7 @@ class TestComprehensive(unittest.TestCase):
self.assertFalse(first.published)
# Add event as publisher
first.publish()
first = self.pub_misp_connector.update_event(first)
first = self.pub_misp_connector.update_event(first, pythonify=True)
self.assertTrue(first.published)
finally:
# Delete event
@ -445,9 +446,9 @@ class TestComprehensive(unittest.TestCase):
second = self.create_simple_event()
second.publish()
try:
first = self.pub_misp_connector.add_event(first)
first = self.pub_misp_connector.add_event(first, pythonify=True)
time.sleep(10)
second = self.pub_misp_connector.add_event(second)
second = self.pub_misp_connector.add_event(second, pythonify=True)
# Test invalid query
events = self.pub_misp_connector.search(publish_timestamp='5x', pythonify=True)
self.assertEqual(events, [])
@ -497,7 +498,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(first.objects[1].distribution, Distribution.inherit.value)
self.assertEqual(first.objects[1].attributes[0].distribution, Distribution.inherit.value)
# Attribute create
attribute = self.user_misp_connector.add_attribute(first.id, {'type': 'comment', 'value': 'bar'}, pythonify=True)
attribute = self.user_misp_connector.add_attribute(first.id, {'type': 'comment', 'value': 'bar'})
self.assertEqual(attribute.value, 'bar', attribute.to_json())
self.assertEqual(attribute.distribution, Distribution.inherit.value, attribute.to_json())
# Object - add
@ -547,13 +548,13 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second)
timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5]
# Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't
events = self.user_misp_connector.search(timestamp=timeframe, pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe)
self.assertEqual(len(events), 2)
self.assertEqual(events[0].id, first.id)
self.assertEqual(events[1].id, second.id)
events = self.user_misp_connector.search(timestamp=timeframe, value='nothere', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, value='nothere')
self.assertEqual(events, [])
events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value, pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=[first.timestamp.timestamp() - 50,
@ -562,34 +563,34 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(events, [])
# Test return content
events = self.user_misp_connector.search(timestamp=timeframe, metadata=False, pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, metadata=False)
self.assertEqual(len(events), 2)
self.assertEqual(len(events[0].attributes), 1)
self.assertEqual(len(events[1].attributes), 2)
events = self.user_misp_connector.search(timestamp=timeframe, metadata=True, pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, metadata=True)
self.assertEqual(len(events), 2)
self.assertEqual(len(events[0].attributes), 0)
self.assertEqual(len(events[1].attributes), 0)
# other things
events = self.user_misp_connector.search(timestamp=timeframe, published=True, pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, published=True)
self.assertEqual(events, [])
events = self.user_misp_connector.search(timestamp=timeframe, published=False, pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, published=False)
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(eventid=first.id, pythonify=True)
events = self.user_misp_connector.search(eventid=first.id)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(uuid=first.uuid, pythonify=True)
events = self.user_misp_connector.search(uuid=first.uuid)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(org=first.orgc_id, pythonify=True)
events = self.user_misp_connector.search(org=first.orgc_id)
self.assertEqual(len(events), 2)
# test like search
events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2]), pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2]))
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%')
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
@ -602,24 +603,24 @@ class TestComprehensive(unittest.TestCase):
# self.assertEqual(events[0].id, second.id)
# date_from / date_to
events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat(), pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat())
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, first.id)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01')
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', date_to='2018-09-02')
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# Category
events = self.user_misp_connector.search(timestamp=timeframe, category='Network activity', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, category='Network activity')
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
# toids
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0')
self.assertEqual(len(events), 2)
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1')
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 1)
@ -627,26 +628,26 @@ class TestComprehensive(unittest.TestCase):
# deleted
second.attributes[1].delete()
self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(eventid=second.id, pythonify=True)
events = self.user_misp_connector.search(eventid=second.id)
self.assertEqual(len(events[0].attributes), 1)
events = self.user_misp_connector.search(eventid=second.id, deleted=True, pythonify=True)
events = self.user_misp_connector.search(eventid=second.id, deleted=True)
self.assertEqual(len(events[0].attributes), 1)
# include_event_uuid
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, include_event_uuid=True, pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, include_event_uuid=True)
self.assertEqual(attributes[0].event_uuid, second.uuid)
# event_timestamp
time.sleep(1)
second.add_attribute('ip-src', '8.8.8.9')
second = self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(event_timestamp=second.timestamp.timestamp(), pythonify=True)
events = self.user_misp_connector.search(event_timestamp=second.timestamp.timestamp())
self.assertEqual(len(events), 1)
# searchall
second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment')
second = self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(value='%for the full text%', searchall=True, pythonify=True)
events = self.user_misp_connector.search(value='%for the full text%', searchall=True)
self.assertEqual(len(events), 1)
# warninglist
@ -656,17 +657,17 @@ class TestComprehensive(unittest.TestCase):
second.add_attribute('ip-src', '9.9.9.9')
second = self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(eventid=second.id, pythonify=True)
events = self.user_misp_connector.search(eventid=second.id)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 5)
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=False, pythonify=True)
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=False)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 5)
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=True, pythonify=True)
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 3)
@ -674,10 +675,10 @@ class TestComprehensive(unittest.TestCase):
self.assertDictEqual(response, {'saved': True, 'success': '3 warninglist(s) toggled'})
# Page / limit
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3, pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3)
self.assertEqual(len(attributes), 3)
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3, pythonify=True)
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3)
self.assertEqual(len(attributes), 2)
time.sleep(1) # make sure the next attribute is added one at least one second later
@ -734,6 +735,8 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second)
current_ts = int(time.time())
# NOTE: no pythonify available yet
# r = self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
r = self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
self.assertEqual(r['message'], 'Sighting added')
@ -741,6 +744,8 @@ class TestComprehensive(unittest.TestCase):
s.value = second.attributes[0].value
s.source = 'Testcases'
s.type = '1'
# NOTE: no pythonify available yet
# r = self.user_misp_connector.add_sighting(s, second.attributes[0].id)
r = self.user_misp_connector.add_sighting(s, second.attributes[0].id)
self.assertEqual(r['message'], 'Sighting added')
@ -781,15 +786,17 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(s[0]['sighting'].attribute_id, str(second.attributes[0].id))
# Get sightings from event/attribute / org
s = self.user_misp_connector.sightings(first, pythonify=True)
s = self.user_misp_connector.sightings(first)
self.assertTrue(isinstance(s, list))
self.assertEqual(int(s[0].attribute_id), first.attributes[0].id)
# NOTE: no pythonify available yet
# r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id, pythonify=True)
r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id)
self.assertEqual(r['message'], 'Sighting added')
s = self.user_misp_connector.sightings(second.attributes[0], pythonify=True)
s = self.user_misp_connector.sightings(second.attributes[0])
self.assertEqual(len(s), 2)
s = self.user_misp_connector.sightings(second.attributes[0], self.test_org.id, pythonify=True)
s = self.user_misp_connector.sightings(second.attributes[0], self.test_org.id)
self.assertEqual(len(s), 1)
self.assertEqual(s[0].org_id, self.test_org.id)
# Delete sighting
@ -819,39 +826,39 @@ class TestComprehensive(unittest.TestCase):
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.publish(first.id, alert=False)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp())
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# eventid
csv = self.user_misp_connector.search(return_format='csv', eventid=first.id, pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', eventid=first.id)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# category
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Other')
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Person')
self.assertEqual(len(csv), 0)
# type_attribute
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='text')
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src')
self.assertEqual(len(csv), 0)
# context
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), include_context=True)
self.assertEqual(len(csv), 1)
self.assertTrue('event_info' in csv[0])
# date_from date_to
csv = self.user_misp_connector.search(return_format='csv', date_from=date.today().isoformat(), pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', date_from=date.today().isoformat())
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02')
self.assertEqual(len(csv), 2)
# headerless
@ -862,14 +869,14 @@ class TestComprehensive(unittest.TestCase):
# self.assertEqual(len(csv.strip().split('\n')), 2)
# include_context
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', include_context=True, pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', include_context=True)
event_context_keys = ['event_info', 'event_member_org', 'event_source_org', 'event_distribution', 'event_threat_level_id', 'event_analysis', 'event_date', 'event_tag', 'event_timestamp']
for k in event_context_keys:
self.assertTrue(k in csv[0])
# requested_attributes
columns = ['value', 'event_id']
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', requested_attributes=columns, pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', requested_attributes=columns)
self.assertEqual(len(csv[0].keys()), 2)
for k in columns:
self.assertTrue(k in csv[0])
@ -1048,7 +1055,7 @@ class TestComprehensive(unittest.TestCase):
first = self.create_simple_event()
first.attributes[0].add_tag('non-exportable tag')
try:
first = self.user_misp_connector.add_event(first, pythonify=True)
first = self.user_misp_connector.add_event(first)
self.assertFalse(first.attributes[0].tags)
first = self.admin_misp_connector.get_event(first, pythonify=True)
# Reference: https://github.com/MISP/MISP/issues/1394
@ -1075,7 +1082,7 @@ class TestComprehensive(unittest.TestCase):
r = self.user_misp_connector.add_object(first.id, peo)
self.assertEqual(r.name, 'pe', r)
for ref in peo.ObjectReference:
r = self.user_misp_connector.add_object_reference(ref, pythonify=True)
r = self.user_misp_connector.add_object_reference(ref)
# FIXME: https://github.com/MISP/MISP/issues/4866
# self.assertEqual(r.object_uuid, peo.uuid, r.to_json())
@ -1083,7 +1090,7 @@ class TestComprehensive(unittest.TestCase):
obj_attrs = r.get_attributes_by_relation('ssdeep')
self.assertEqual(len(obj_attrs), 1, obj_attrs)
self.assertEqual(r.name, 'file', r)
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0], pythonify=True)
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0])
# FIXME: https://github.com/MISP/MISP/issues/4866
# self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())
@ -1258,7 +1265,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(organisation.name, 'Test Org')
# Update org
organisation.name = 'blah'
organisation = self.admin_misp_connector.update_organisation(organisation)
organisation = self.admin_misp_connector.update_organisation(organisation, pythonify=True)
self.assertEqual(organisation.name, 'blah', organisation)
def test_attribute(self):
@ -1289,7 +1296,7 @@ class TestComprehensive(unittest.TestCase):
new_attribute = self.user_misp_connector.update_attribute(new_attribute)
self.assertEqual(new_attribute.value, '5.6.3.4')
# Update attribute as proposal
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False}, pythonify=True)
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False})
self.assertEqual(new_proposal_update.to_ids, False)
# Delete attribute as proposal
proposal_delete = self.user_misp_connector.delete_attribute_proposal(new_attribute.id)
@ -1323,10 +1330,10 @@ class TestComprehensive(unittest.TestCase):
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
# Add attribute with the same value as an existing proposal
prop_attr.uuid = str(uuid4())
attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr)
attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr, pythonify=True)
prop_attr.uuid = str(uuid4())
# Add a duplicate attribute (same value)
attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr)
attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr, pythonify=True)
self.assertTrue('errors' in attribute)
# Update attribute owned by someone else
attribute = self.user_misp_connector.update_attribute({'comment': 'blah'}, second.attributes[0].id)
@ -1424,6 +1431,8 @@ class TestComprehensive(unittest.TestCase):
# FIXME: https://github.com/MISP/MISP/issues/4880
# users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram')
# NOTE Not supported yet
# self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
users_stats = self.user_misp_connector.users_statistics(context='sightings')
self.assertEqual(list(users_stats.keys()), ['toplist', 'eventids'])
@ -1502,7 +1511,7 @@ class TestComprehensive(unittest.TestCase):
first = self.create_simple_event()
try:
first = self.user_misp_connector.add_event(first)
first = self.admin_misp_connector.change_sharing_group_on_entity(first, sharing_group.id)
first = self.admin_misp_connector.change_sharing_group_on_entity(first, sharing_group.id, pythonify=True)
self.assertEqual(first.SharingGroup['name'], 'Testcases SG')
# FIXME https://github.com/MISP/MISP/issues/4891
# first_attribute = self.admin_misp_connector.change_sharing_group_on_entity(first.attributes[0], sharing_group.id)
@ -1606,7 +1615,7 @@ class TestComprehensive(unittest.TestCase):
with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f:
first.add_attribute('malware-sample', value='whoami.exe', data=BytesIO(f.read()), expand='binary')
first.run_expansions()
first = self.admin_misp_connector.add_event(first)
first = self.admin_misp_connector.add_event(first, pythonify=True)
self.assertEqual(len(first.objects), 7)
finally:
# Delete event
@ -1616,6 +1625,21 @@ class TestComprehensive(unittest.TestCase):
# FIXME https://github.com/MISP/MISP/issues/4892
pass
def test_toggle_global_pythonify(self):
first = self.create_simple_event()
second = self.create_simple_event()
try:
self.admin_misp_connector.toggle_global_pythonify()
first = self.admin_misp_connector.add_event(first)
self.assertTrue(isinstance(first, MISPEvent))
self.admin_misp_connector.toggle_global_pythonify()
second = self.admin_misp_connector.add_event(second)
self.assertTrue(isinstance(second, dict))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second['Event']['id'])
if __name__ == '__main__':
unittest.main()