diff --git a/pymisp/aping.py b/pymisp/aping.py index 38de3f8..329d235 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -58,6 +58,8 @@ class ExpandedPyMISP(PyMISP): self.auth = auth self.tool = tool + self.global_pythonify = False + self.resources_path = Path(__file__).parent / 'data' if debug: logger.setLevel(logging.DEBUG) @@ -144,36 +146,39 @@ class ExpandedPyMISP(PyMISP): return {'version': '{}.{}.{}'.format(master_version['major'], master_version['minor'], master_version['hotfix'])} return {'error': 'Impossible to retrieve the version of the master branch.'} + def toggle_global_pythonify(self): + self.global_pythonify = not self.global_pythonify + # ## BEGIN Event ## - def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=True): + def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=False): '''Get an event from a MISP instance''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) event = self._prepare_request('GET', f'events/{event_id}') event = self._check_response(event, expect_json=True) - if not pythonify or 'errors' in event: + if not (self.global_pythonify or pythonify) or 'errors' in event: return event e = MISPEvent() e.load(event) return e - def add_event(self, event: MISPEvent, pythonify: bool=True): + def add_event(self, event: MISPEvent, pythonify: bool=False): '''Add a new event on a MISP instance''' new_event = self._prepare_request('POST', 'events', data=event) new_event = self._check_response(new_event, expect_json=True) - if not pythonify or 'errors' in new_event: + if not (self.global_pythonify or pythonify) or 'errors' in new_event: return new_event e = MISPEvent() e.load(new_event) return e - def update_event(self, event: MISPEvent, event_id: int=None, pythonify: bool=True): + def update_event(self, event: MISPEvent, event_id: int=None, pythonify: bool=False): '''Update an event on a MISP instance''' if event_id is None: event_id = self.__get_uuid_or_id_from_abstract_misp(event) updated_event = self._prepare_request('POST', f'events/{event_id}', data=event) updated_event = self._check_response(updated_event, expect_json=True) - if not pythonify or 'errors' in updated_event: + if not (self.global_pythonify or pythonify) or 'errors' in updated_event: return updated_event e = MISPEvent() e.load(updated_event) @@ -199,35 +204,35 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Object ### - def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool=True): + def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool=False): '''Get an object from the remote MISP instance''' object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) misp_object = self._prepare_request('GET', f'objects/view/{object_id}') misp_object = self._check_response(misp_object, expect_json=True) - if not pythonify or 'errors' in misp_object: + if not (self.global_pythonify or pythonify) or 'errors' in misp_object: return misp_object o = MISPObject(misp_object['Object']['name']) o.from_dict(**misp_object) return o - def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool=True): + def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool=False): '''Add a MISP Object to an existing MISP event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) new_object = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) new_object = self._check_response(new_object, expect_json=True) - if not pythonify or 'errors' in new_object: + if not (self.global_pythonify or pythonify) or 'errors' in new_object: return new_object o = MISPObject(new_object['Object']['name']) o.from_dict(**new_object) return o - def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=True): + def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=False): '''Update an object on a MISP instance''' if object_id is None: object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) updated_object = self._prepare_request('POST', f'objects/edit/{object_id}', data=misp_object) updated_object = self._check_response(updated_object, expect_json=True) - if not pythonify or 'errors' in updated_object: + if not (self.global_pythonify or pythonify) or 'errors' in updated_object: return updated_object o = MISPObject(updated_object['Object']['name']) o.from_dict(**updated_object) @@ -244,7 +249,7 @@ class ExpandedPyMISP(PyMISP): """Add a reference to an object""" object_reference = self._prepare_request('POST', 'object_references/add', misp_object_reference) object_reference = self._check_response(object_reference, expect_json=True) - if not pythonify or 'errors' in object_reference: + if not (self.global_pythonify or pythonify) or 'errors' in object_reference: return object_reference r = MISPObjectReference() r.from_dict(**object_reference) @@ -258,11 +263,11 @@ class ExpandedPyMISP(PyMISP): # Object templates - def object_templates(self, pythonify=False): + def object_templates(self, pythonify: bool=False): """Get all the object templates.""" object_templates = self._prepare_request('GET', 'objectTemplates') object_templates = self._check_response(object_templates, expect_json=True) - if not pythonify or 'errors' in object_templates: + if not (self.global_pythonify or pythonify) or 'errors' in object_templates: return object_templates to_return = [] for object_template in object_templates: @@ -271,12 +276,12 @@ class ExpandedPyMISP(PyMISP): to_return.append(o) return to_return - def get_object_template(self, object_template: Union[MISPObjectTemplate, int, str, UUID], pythonify=False): + def get_object_template(self, object_template: Union[MISPObjectTemplate, int, str, UUID], pythonify: bool=False): """Gets the full object template corresponting the UUID passed as parameter""" object_template_id = self.__get_uuid_or_id_from_abstract_misp(object_template) object_template = self._prepare_request('GET', f'objectTemplates/view/{object_template_id}') object_template = self._check_response(object_template, expect_json=True) - if not pythonify or 'errors' in object_template: + if not (self.global_pythonify or pythonify) or 'errors' in object_template: return object_template t = MISPObjectTemplate() t.from_dict(**object_template) @@ -291,18 +296,18 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Attribute ### - def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=True): + def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=False): '''Get an attribute from a MISP instance''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) attribute = self._prepare_request('GET', f'attributes/view/{attribute_id}') attribute = self._check_response(attribute, expect_json=True) - if not pythonify or 'errors' in attribute: + if not (self.global_pythonify or pythonify) or 'errors' in attribute: return attribute a = MISPAttribute() a.from_dict(**attribute) return a - def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True): + def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): '''Add an attribute to an existing MISP event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute) @@ -312,13 +317,13 @@ class ExpandedPyMISP(PyMISP): # At this point, we assume the user tried to add an attribute on an event they don't own # Re-try with a proposal return self.add_attribute_proposal(event_id, attribute, pythonify) - if not pythonify or 'errors' in new_attribute: + if not (self.global_pythonify or pythonify) or 'errors' in new_attribute: return new_attribute a = MISPAttribute() a.from_dict(**new_attribute) return a - def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=True): + def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=False): '''Update an attribute on a MISP instance''' if attribute_id is None: attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) @@ -330,7 +335,7 @@ class ExpandedPyMISP(PyMISP): # At this point, we assume the user tried to update an attribute on an event they don't own # Re-try with a proposal return self.update_attribute_proposal(attribute_id, attribute, pythonify) - if not pythonify or 'errors' in updated_attribute: + if not (self.global_pythonify or pythonify) or 'errors' in updated_attribute: return updated_attribute a = MISPAttribute() a.from_dict(**updated_attribute) @@ -353,11 +358,11 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Attribute Proposal ### - def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=True): + def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=False): proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) attribute_proposal = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}') attribute_proposal = self._check_response(attribute_proposal, expect_json=True) - if not pythonify or 'errors' in attribute_proposal: + if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposal: return attribute_proposal a = MISPShadowAttribute() a.from_dict(**attribute_proposal) @@ -365,26 +370,26 @@ class ExpandedPyMISP(PyMISP): # NOTE: the tree following method have a very specific meaning, look at the comments - def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True): + def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): '''Propose a new attribute in an event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) # FIXME: attribute needs to be a complete MISPAttribute: https://github.com/MISP/MISP/issues/4868 new_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute) new_attribute_proposal = self._check_response(new_attribute_proposal, expect_json=True) - if not pythonify or 'errors' in new_attribute_proposal: + if not (self.global_pythonify or pythonify) or 'errors' in new_attribute_proposal: return new_attribute_proposal a = MISPShadowAttribute() a.from_dict(**new_attribute_proposal) return a - def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True): + def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): '''Propose a change for an attribute''' # FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4857 initial_attribute_id = self.__get_uuid_or_id_from_abstract_misp(initial_attribute) attribute = {'ShadowAttribute': attribute} update_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute) update_attribute_proposal = self._check_response(update_attribute_proposal, expect_json=True) - if not pythonify or 'errors' in update_attribute_proposal: + if not (self.global_pythonify or pythonify) or 'errors' in update_attribute_proposal: return update_attribute_proposal a = MISPShadowAttribute() a.from_dict(**update_attribute_proposal) @@ -414,7 +419,7 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Sighting ### - def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify=False): + def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False): """Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)""" # FIXME: https://github.com/MISP/MISP/issues/4875 if isinstance(misp_entity, MISPEvent): @@ -430,7 +435,7 @@ class ExpandedPyMISP(PyMISP): url = f'sightings/listSightings/{misp_entity.id}/{scope}' sightings = self._prepare_request('POST', url) sightings = self._check_response(sightings, expect_json=True) - if not pythonify or 'errors' in sightings: + if not (self.global_pythonify or pythonify) or 'errors' in sightings: return sightings to_return = [] for sighting in sightings: @@ -450,7 +455,7 @@ class ExpandedPyMISP(PyMISP): # Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value new_sighting = self._prepare_request('POST', f'sightings/add', data=sighting) new_sighting = self._check_response(new_sighting, expect_json=True) - if not pythonify or 'errors' in new_sighting: + if not (self.global_pythonify or pythonify) or 'errors' in new_sighting: return new_sighting s = MISPSighting() s.from_dict(**new_sighting) @@ -470,7 +475,7 @@ class ExpandedPyMISP(PyMISP): """Get the list of existing tags.""" tags = self._prepare_request('GET', 'tags') tags = self._check_response(tags, expect_json=True) - if not pythonify or 'errors' in tags: + if not (self.global_pythonify or pythonify) or 'errors' in tags: return tags['Tag'] to_return = [] for tag in tags['Tag']: @@ -484,17 +489,17 @@ class ExpandedPyMISP(PyMISP): tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) tag = self._prepare_request('GET', f'tags/view/{tag_id}') tag = self._check_response(tag, expect_json=True) - if not pythonify or 'errors' in tag: + if not (self.global_pythonify or pythonify) or 'errors' in tag: return tag t = MISPTag() t.from_dict(**tag) return t - def add_tag(self, tag: MISPTag, pythonify: bool=True): + def add_tag(self, tag: MISPTag, pythonify: bool=False): '''Add a new tag on a MISP instance''' new_tag = self._prepare_request('POST', 'tags/add', data=tag) new_tag = self._check_response(new_tag, expect_json=True) - if not pythonify or 'errors' in new_tag: + if not (self.global_pythonify or pythonify) or 'errors' in new_tag: return new_tag t = MISPTag() t.from_dict(**new_tag) @@ -518,7 +523,7 @@ class ExpandedPyMISP(PyMISP): tag = {'Tag': tag} updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', data=tag) updated_tag = self._check_response(updated_tag, expect_json=True) - if not pythonify or 'errors' in updated_tag: + if not (self.global_pythonify or pythonify) or 'errors' in updated_tag: return updated_tag t = MISPTag() t.from_dict(**updated_tag) @@ -543,7 +548,7 @@ class ExpandedPyMISP(PyMISP): """Get all the taxonomies.""" taxonomies = self._prepare_request('GET', 'taxonomies') taxonomies = self._check_response(taxonomies, expect_json=True) - if not pythonify or 'errors' in taxonomies: + if not (self.global_pythonify or pythonify) or 'errors' in taxonomies: return taxonomies to_return = [] for taxonomy in taxonomies: @@ -557,7 +562,7 @@ class ExpandedPyMISP(PyMISP): taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) taxonomy = self._prepare_request('GET', f'taxonomies/view/{taxonomy_id}') taxonomy = self._check_response(taxonomy, expect_json=True) - if not pythonify or 'errors' in taxonomy: + if not (self.global_pythonify or pythonify) or 'errors' in taxonomy: return taxonomy t = MISPTaxonomy() t.from_dict(**taxonomy) @@ -601,7 +606,7 @@ class ExpandedPyMISP(PyMISP): """Get all the warninglists.""" warninglists = self._prepare_request('GET', 'warninglists') warninglists = self._check_response(warninglists, expect_json=True) - if not pythonify or 'errors' in warninglists: + if not (self.global_pythonify or pythonify) or 'errors' in warninglists: return warninglists['Warninglists'] to_return = [] for warninglist in warninglists['Warninglists']: @@ -615,7 +620,7 @@ class ExpandedPyMISP(PyMISP): warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) warninglist = self._prepare_request('GET', f'warninglists/view/{warninglist_id}') warninglist = self._check_response(warninglist, expect_json=True) - if not pythonify or 'errors' in warninglist: + if not (self.global_pythonify or pythonify) or 'errors' in warninglist: return warninglist w = MISPWarninglist() w.from_dict(**warninglist) @@ -667,11 +672,11 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Noticelist ### - def noticelists(self, pythonify=False): + def noticelists(self, pythonify: bool=False): """Get all the noticelists.""" noticelists = self._prepare_request('GET', 'noticelists') noticelists = self._check_response(noticelists, expect_json=True) - if not pythonify or 'errors' in noticelists: + if not (self.global_pythonify or pythonify) or 'errors' in noticelists: return noticelists to_return = [] for noticelist in noticelists: @@ -680,12 +685,12 @@ class ExpandedPyMISP(PyMISP): to_return.append(n) return to_return - def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify=False): + def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify: bool=False): """Get a noticelist by id.""" noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) noticelist = self._prepare_request('GET', f'noticelists/view/{noticelist_id}') noticelist = self._check_response(noticelist, expect_json=True) - if not pythonify or 'errors' in noticelist: + if not (self.global_pythonify or pythonify) or 'errors' in noticelist: return noticelist n = MISPNoticelist() n.from_dict(**noticelist) @@ -716,11 +721,11 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Galaxy ### - def galaxies(self, pythonify=False): + def galaxies(self, pythonify: bool=False): """Get all the galaxies.""" galaxies = self._prepare_request('GET', 'galaxies') galaxies = self._check_response(galaxies, expect_json=True) - if not pythonify or 'errors' in galaxies: + if not (self.global_pythonify or pythonify) or 'errors' in galaxies: return galaxies to_return = [] for galaxy in galaxies: @@ -729,12 +734,12 @@ class ExpandedPyMISP(PyMISP): to_return.append(g) return to_return - def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify=False): + def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool=False): """Get a galaxy by id.""" galaxy_id = self.__get_uuid_or_id_from_abstract_misp(galaxy) galaxy = self._prepare_request('GET', f'galaxies/view/{galaxy_id}') galaxy = self._check_response(galaxy, expect_json=True) - if not pythonify or 'errors' in galaxy: + if not (self.global_pythonify or pythonify) or 'errors' in galaxy: return galaxy g = MISPGalaxy() g.from_dict(**galaxy) @@ -753,7 +758,7 @@ class ExpandedPyMISP(PyMISP): """Get the list of existing feeds.""" feeds = self._prepare_request('GET', 'feeds') feeds = self._check_response(feeds, expect_json=True) - if not pythonify or 'errors' in feeds: + if not (self.global_pythonify or pythonify) or 'errors' in feeds: return feeds to_return = [] for feed in feeds: @@ -767,7 +772,7 @@ class ExpandedPyMISP(PyMISP): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) feed = self._prepare_request('GET', f'feeds/view/{feed_id}') feed = self._check_response(feed, expect_json=True) - if not pythonify or 'errors' in feed: + if not (self.global_pythonify or pythonify) or 'errors' in feed: return feed f = MISPFeed() f.from_dict(**feed) @@ -779,7 +784,7 @@ class ExpandedPyMISP(PyMISP): feed = {'Feed': feed} new_feed = self._prepare_request('POST', 'feeds/add', data=feed) new_feed = self._check_response(new_feed, expect_json=True) - if not pythonify or 'errors' in new_feed: + if not (self.global_pythonify or pythonify) or 'errors' in new_feed: return new_feed f = MISPFeed() f.from_dict(**new_feed) @@ -829,7 +834,7 @@ class ExpandedPyMISP(PyMISP): feed = {'Feed': feed} updated_feed = self._prepare_request('POST', f'feeds/edit/{feed_id}', data=feed) updated_feed = self._check_response(updated_feed, expect_json=True) - if not pythonify or 'errors' in updated_feed: + if not (self.global_pythonify or pythonify) or 'errors' in updated_feed: return updated_feed f = MISPFeed() f.from_dict(**updated_feed) @@ -877,11 +882,11 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Server ### - def servers(self, pythonify=False): + def servers(self, pythonify: bool=False): """Get the existing servers the MISP instance can synchronise with""" servers = self._prepare_request('GET', 'servers') servers = self._check_response(servers, expect_json=True) - if not pythonify or 'errors' in servers: + if not (self.global_pythonify or pythonify) or 'errors' in servers: return servers to_return = [] for server in servers: @@ -890,23 +895,23 @@ class ExpandedPyMISP(PyMISP): to_return.append(s) return to_return - def add_server(self, server: MISPServer, pythonify: bool=True): + def add_server(self, server: MISPServer, pythonify: bool=False): """Add a server to synchronise with""" server = self._prepare_request('POST', f'servers/add', data=server) server = self._check_response(server, expect_json=True) - if not pythonify or 'errors' in server: + if not (self.global_pythonify or pythonify) or 'errors' in server: return server s = MISPServer() s.from_dict(**server) return s - def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=True): + def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=False): '''Update a server to synchronise with''' if server_id is None: server_id = self.__get_uuid_or_id_from_abstract_misp(server) updated_server = self._prepare_request('POST', f'servers/edit/{server_id}', data=server) updated_server = self._check_response(updated_server, expect_json=True) - if not pythonify or 'errors' in updated_server: + if not (self.global_pythonify or pythonify) or 'errors' in updated_server: return updated_server s = MISPServer() s.from_dict(**updated_server) @@ -952,7 +957,7 @@ class ExpandedPyMISP(PyMISP): """Get the existing sharing groups""" sharing_groups = self._prepare_request('GET', 'sharing_groups') sharing_groups = self._check_response(sharing_groups, expect_json=True) - if not pythonify or 'errors' in sharing_groups: + if not (self.global_pythonify or pythonify) or 'errors' in sharing_groups: return sharing_groups to_return = [] for sharing_group in sharing_groups: @@ -961,13 +966,13 @@ class ExpandedPyMISP(PyMISP): to_return.append(s) return to_return - def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=True): + def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=False): """Add a new sharing group""" sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group) sharing_group = self._check_response(sharing_group, expect_json=True) # FIXME: https://github.com/MISP/MISP/issues/4882 sharing_group = sharing_group[0] - if not pythonify or 'errors' in sharing_group: + if not (self.global_pythonify or pythonify) or 'errors' in sharing_group: return sharing_group s = MISPSharingGroup() s.from_dict(**sharing_group) @@ -1033,11 +1038,11 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Organisation ### - def organisations(self, scope="local", pythonify=False): + def organisations(self, scope="local", pythonify: bool=False): """Get all the organisations.""" organisations = self._prepare_request('GET', f'organisations/index/scope:{scope}') organisations = self._check_response(organisations, expect_json=True) - if not pythonify or 'errors' in organisations: + if not (self.global_pythonify or pythonify) or 'errors' in organisations: return organisations to_return = [] for organisation in organisations: @@ -1046,34 +1051,34 @@ class ExpandedPyMISP(PyMISP): to_return.append(o) return to_return - def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=True): + def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=False): '''Get an organisation.''' organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) organisation = self._prepare_request('GET', f'organisations/view/{organisation_id}') organisation = self._check_response(organisation, expect_json=True) - if not pythonify or 'errors' in organisation: + if not (self.global_pythonify or pythonify) or 'errors' in organisation: return organisation o = MISPOrganisation() o.from_dict(**organisation) return o - def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=True): + def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=False): '''Add an organisation''' new_organisation = self._prepare_request('POST', f'admin/organisations/add', data=organisation) new_organisation = self._check_response(new_organisation, expect_json=True) - if not pythonify or 'errors' in new_organisation: + if not (self.global_pythonify or pythonify) or 'errors' in new_organisation: return new_organisation o = MISPOrganisation() o.from_dict(**new_organisation) return o - def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=True): + def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=False): '''Update an organisation''' if organisation_id is None: organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) updated_organisation = self._prepare_request('POST', f'admin/organisations/edit/{organisation_id}', data=organisation) updated_organisation = self._check_response(updated_organisation, expect_json=True) - if not pythonify or 'errors' in updated_organisation: + if not (self.global_pythonify or pythonify) or 'errors' in updated_organisation: return updated_organisation o = MISPOrganisation() o.from_dict(**organisation) @@ -1090,11 +1095,11 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN User ### - def users(self, pythonify=False): + def users(self, pythonify: bool=False): """Get all the users.""" users = self._prepare_request('GET', 'admin/users') users = self._check_response(users, expect_json=True) - if not pythonify or 'errors' in users: + if not (self.global_pythonify or pythonify) or 'errors' in users: return users to_return = [] for user in users: @@ -1108,7 +1113,7 @@ class ExpandedPyMISP(PyMISP): user_id = self.__get_uuid_or_id_from_abstract_misp(user) user = self._prepare_request('GET', f'users/view/{user_id}') user = self._check_response(user, expect_json=True) - if not pythonify or 'errors' in user: + if not (self.global_pythonify or pythonify) or 'errors' in user: return user u = MISPUser() u.from_dict(**user) @@ -1118,7 +1123,7 @@ class ExpandedPyMISP(PyMISP): '''Add a new user''' user = self._prepare_request('POST', f'admin/users/add', data=user) user = self._check_response(user, expect_json=True) - if not pythonify or 'errors' in user: + if not (self.global_pythonify or pythonify) or 'errors' in user: return user u = MISPUser() u.from_dict(**user) @@ -1130,7 +1135,7 @@ class ExpandedPyMISP(PyMISP): user_id = self.__get_uuid_or_id_from_abstract_misp(user) updated_user = self._prepare_request('POST', f'admin/users/edit/{user_id}', data=user) updated_user = self._check_response(updated_user, expect_json=True) - if not pythonify or 'errors' in updated_user: + if not (self.global_pythonify or pythonify) or 'errors' in updated_user: return updated_user e = MISPUser() e.from_dict(**updated_user) @@ -1151,7 +1156,7 @@ class ExpandedPyMISP(PyMISP): """Get the existing roles""" roles = self._prepare_request('GET', 'roles') roles = self._check_response(roles, expect_json=True) - if not pythonify or 'errors' in roles: + if not (self.global_pythonify or pythonify) or 'errors' in roles: return roles to_return = [] for role in roles: @@ -1321,11 +1326,11 @@ class ExpandedPyMISP(PyMISP): else: normalized_response = self._check_response(response) - if return_format == 'csv' and pythonify and not headerless: + if return_format == 'csv' and (self.global_pythonify or pythonify) and not headerless: return self._csv_to_dict(normalized_response) elif 'errors' in normalized_response: return normalized_response - elif return_format == 'json' and pythonify: + elif return_format == 'json' and self.global_pythonify or pythonify: # The response is in json, we can convert it to a list of pythonic MISP objects to_return = [] if controller == 'events': @@ -1389,7 +1394,7 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('POST', url, data=query) normalized_response = self._check_response(response, expect_json=True) - if not pythonify: + if not (self.global_pythonify or pythonify): return normalized_response to_return = [] for e_meta in normalized_response: @@ -1455,9 +1460,9 @@ class ExpandedPyMISP(PyMISP): url = urljoin(self.root_url, url_path) response = self._prepare_request('POST', url, data=query) normalized_response = self._check_response(response, expect_json=True) - if not pythonify or 'errors' in normalized_response: + if not (self.global_pythonify or pythonify) or 'errors' in normalized_response: return normalized_response - elif pythonify: + elif self.global_pythonify or pythonify: to_return = [] for s in normalized_response: entries = {} @@ -1514,7 +1519,7 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('POST', 'admin/logs/index', data=query) normalized_response = self._check_response(response, expect_json=True) - if not pythonify or 'errors' in normalized_response: + if not (self.global_pythonify or pythonify) or 'errors' in normalized_response: return normalized_response to_return = [] @@ -1543,7 +1548,7 @@ class ExpandedPyMISP(PyMISP): return self._check_response(response, lenient_response_type=True) def freetext(self, event: Union[MISPEvent, int, str, UUID], string: str, adhereToWarninglists: Union[bool, str]=False, - distribution: int=None, returnMetaAttributes: bool=False, pythonify=False): + distribution: int=None, returnMetaAttributes: bool=False, pythonify: bool=False): """Pass a text to the freetext importer""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) query = {"value": string} @@ -1558,7 +1563,7 @@ class ExpandedPyMISP(PyMISP): query['returnMetaAttributes'] = returnMetaAttributes attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query) attributes = self._check_response(attributes, expect_json=True) - if returnMetaAttributes or not pythonify or 'errors' in attributes: + if returnMetaAttributes or not (self.global_pythonify or pythonify) or 'errors' in attributes: return attributes to_return = [] for attribute in attributes: @@ -1634,18 +1639,18 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Global helpers ### - def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id): + def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id, pythonify: bool=False): """Change the sharing group of an event, an attribute, or an object""" misp_entity.distribution = 4 # Needs to be 'Sharing group' if 'SharingGroup' in misp_entity: # Delete former SharingGroup information del misp_entity.SharingGroup misp_entity.sharing_group_id = sharing_group_id # Set new sharing group id if isinstance(misp_entity, MISPEvent): - return self.update_event(misp_entity) + return self.update_event(misp_entity, pythonify=pythonify) elif isinstance(misp_entity, MISPObject): - return self.update_object(misp_entity) + return self.update_object(misp_entity, pythonify=pythonify) elif isinstance(misp_entity, MISPAttribute): - return self.update_attribute(misp_entity) + return self.update_attribute(misp_entity, pythonify=pythonify) else: raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute') diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 4eee633..e354c0f 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -53,7 +53,7 @@ class TestComprehensive(unittest.TestCase): # Creates an org organisation = MISPOrganisation() organisation.name = 'Test Org' - cls.test_org = cls.admin_misp_connector.add_organisation(organisation) + cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True) # Set the refault role (id 3 on the VM) cls.admin_misp_connector.set_default_role(3) # Creates a user @@ -62,6 +62,7 @@ class TestComprehensive(unittest.TestCase): user.org_id = cls.test_org.id cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True) cls.user_misp_connector = ExpandedPyMISP(url, cls.test_usr.authkey, verifycert, debug=False) + cls.user_misp_connector.toggle_global_pythonify() # Creates a publisher user = MISPUser() user.email = 'testpub@user.local' @@ -136,8 +137,8 @@ class TestComprehensive(unittest.TestCase): # Create first and third event as admin # usr won't be able to see the first one - first = self.admin_misp_connector.add_event(first_event) - third = self.admin_misp_connector.add_event(third_event) + first = self.admin_misp_connector.add_event(first_event, pythonify=True) + third = self.admin_misp_connector.add_event(third_event, pythonify=True) # Create second event as user second = self.user_misp_connector.add_event(second_event) return first, second, third @@ -155,12 +156,12 @@ class TestComprehensive(unittest.TestCase): for e in events: self.assertIn(e.id, [first.id, second.id]) # Search as user - events = self.user_misp_connector.search(value=first.attributes[0].value, pythonify=True) + events = self.user_misp_connector.search(value=first.attributes[0].value) self.assertEqual(len(events), 1) for e in events: self.assertIn(e.id, [second.id]) # Non-existing value - events = self.user_misp_connector.search(value=str(uuid4()), pythonify=True) + events = self.user_misp_connector.search(value=str(uuid4())) self.assertEqual(events, []) finally: # Delete events @@ -178,12 +179,12 @@ class TestComprehensive(unittest.TestCase): for a in attributes: self.assertIn(a.event_id, [first.id, second.id]) # Search as user - attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value, pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', value=first.attributes[0].value) self.assertEqual(len(attributes), 1) for a in attributes: self.assertIn(a.event_id, [second.id]) # Non-existing value - attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4()), pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', value=str(uuid4())) self.assertEqual(attributes, []) finally: # Delete event @@ -254,15 +255,15 @@ class TestComprehensive(unittest.TestCase): for e in events: self.assertIn(e.id, [first.id]) # Search as user - events = self.user_misp_connector.search(tags='tlp:white___test', pythonify=True) + events = self.user_misp_connector.search(tags='tlp:white___test') self.assertEqual(len(events), 2) for e in events: self.assertIn(e.id, [second.id, third.id]) - events = self.user_misp_connector.search(tags='tlp:amber___test', pythonify=True) + events = self.user_misp_connector.search(tags='tlp:amber___test') self.assertEqual(len(events), 2) for e in events: self.assertIn(e.id, [second.id, third.id]) - events = self.user_misp_connector.search(tags='admin_only', pythonify=True) + events = self.user_misp_connector.search(tags='admin_only') self.assertEqual(events, []) finally: # Delete event @@ -282,14 +283,14 @@ class TestComprehensive(unittest.TestCase): attributes = self.admin_misp_connector.search(tags='admin_only', pythonify=True) self.assertEqual(len(attributes), 1) # Search as user - attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test', pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:white___test') self.assertEqual(len(attributes), 4) - attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test', pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', tags='tlp:amber___test') self.assertEqual(len(attributes), 3) - attributes = self.user_misp_connector.search(tags='admin_only', pythonify=True) + attributes = self.user_misp_connector.search(tags='admin_only') self.assertEqual(attributes, []) attributes_tags_search = self.admin_misp_connector.build_complex_query(or_parameters=['tlp:amber___test'], not_parameters=['tlp:white___test']) - attributes = self.user_misp_connector.search(controller='attributes', tags=attributes_tags_search, pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', tags=attributes_tags_search) self.assertEqual(len(attributes), 1) finally: # Delete event @@ -361,19 +362,19 @@ class TestComprehensive(unittest.TestCase): second = self.user_misp_connector.add_event(second) # Search as user # # Test - last 4 min - events = self.user_misp_connector.search(timestamp='4m', pythonify=True) + events = self.user_misp_connector.search(timestamp='4m') self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) # # Test timestamp of 2nd event - events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp(), pythonify=True) + events = self.user_misp_connector.search(timestamp=event_creation_timestamp_second.timestamp()) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) # # Test interval -6 min -> -4 min - events = self.user_misp_connector.search(timestamp=['6m', '4m'], pythonify=True) + events = self.user_misp_connector.search(timestamp=['6m', '4m']) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) self.assertEqual(events[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp())) @@ -399,19 +400,19 @@ class TestComprehensive(unittest.TestCase): second = self.user_misp_connector.add_event(second) # Search as user # # Test - last 4 min - attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m', pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', timestamp='4m') self.assertEqual(len(attributes), 1) self.assertEqual(attributes[0].event_id, second.id) self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) # # Test timestamp of 2nd event - attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp(), pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', timestamp=event_creation_timestamp_second.timestamp()) self.assertEqual(len(attributes), 1) self.assertEqual(attributes[0].event_id, second.id) self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_second.timestamp())) # # Test interval -6 min -> -4 min - attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m'], pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', timestamp=['6m', '4m']) self.assertEqual(len(attributes), 1) self.assertEqual(attributes[0].event_id, first.id) self.assertEqual(attributes[0].timestamp.timestamp(), int(event_creation_timestamp_first.timestamp())) @@ -430,7 +431,7 @@ class TestComprehensive(unittest.TestCase): self.assertFalse(first.published) # Add event as publisher first.publish() - first = self.pub_misp_connector.update_event(first) + first = self.pub_misp_connector.update_event(first, pythonify=True) self.assertTrue(first.published) finally: # Delete event @@ -445,9 +446,9 @@ class TestComprehensive(unittest.TestCase): second = self.create_simple_event() second.publish() try: - first = self.pub_misp_connector.add_event(first) + first = self.pub_misp_connector.add_event(first, pythonify=True) time.sleep(10) - second = self.pub_misp_connector.add_event(second) + second = self.pub_misp_connector.add_event(second, pythonify=True) # Test invalid query events = self.pub_misp_connector.search(publish_timestamp='5x', pythonify=True) self.assertEqual(events, []) @@ -497,7 +498,7 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(first.objects[1].distribution, Distribution.inherit.value) self.assertEqual(first.objects[1].attributes[0].distribution, Distribution.inherit.value) # Attribute create - attribute = self.user_misp_connector.add_attribute(first.id, {'type': 'comment', 'value': 'bar'}, pythonify=True) + attribute = self.user_misp_connector.add_attribute(first.id, {'type': 'comment', 'value': 'bar'}) self.assertEqual(attribute.value, 'bar', attribute.to_json()) self.assertEqual(attribute.distribution, Distribution.inherit.value, attribute.to_json()) # Object - add @@ -547,13 +548,13 @@ class TestComprehensive(unittest.TestCase): second = self.user_misp_connector.add_event(second) timeframe = [first.timestamp.timestamp() - 5, first.timestamp.timestamp() + 5] # Search event we just created in multiple ways. Make sure it doesn't catch it when it shouldn't - events = self.user_misp_connector.search(timestamp=timeframe, pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe) self.assertEqual(len(events), 2) self.assertEqual(events[0].id, first.id) self.assertEqual(events[1].id, second.id) - events = self.user_misp_connector.search(timestamp=timeframe, value='nothere', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, value='nothere') self.assertEqual(events, []) - events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value, pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, value=first.attributes[0].value) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) events = self.user_misp_connector.search(timestamp=[first.timestamp.timestamp() - 50, @@ -562,34 +563,34 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(events, []) # Test return content - events = self.user_misp_connector.search(timestamp=timeframe, metadata=False, pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, metadata=False) self.assertEqual(len(events), 2) self.assertEqual(len(events[0].attributes), 1) self.assertEqual(len(events[1].attributes), 2) - events = self.user_misp_connector.search(timestamp=timeframe, metadata=True, pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, metadata=True) self.assertEqual(len(events), 2) self.assertEqual(len(events[0].attributes), 0) self.assertEqual(len(events[1].attributes), 0) # other things - events = self.user_misp_connector.search(timestamp=timeframe, published=True, pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, published=True) self.assertEqual(events, []) - events = self.user_misp_connector.search(timestamp=timeframe, published=False, pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, published=False) self.assertEqual(len(events), 2) - events = self.user_misp_connector.search(eventid=first.id, pythonify=True) + events = self.user_misp_connector.search(eventid=first.id) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) - events = self.user_misp_connector.search(uuid=first.uuid, pythonify=True) + events = self.user_misp_connector.search(uuid=first.uuid) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) - events = self.user_misp_connector.search(org=first.orgc_id, pythonify=True) + events = self.user_misp_connector.search(org=first.orgc_id) self.assertEqual(len(events), 2) # test like search - events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2]), pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, value='%{}%'.format(first.attributes[0].value.split('-')[2])) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) - events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, eventinfo='%bar blah%') self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) @@ -602,24 +603,24 @@ class TestComprehensive(unittest.TestCase): # self.assertEqual(events[0].id, second.id) # date_from / date_to - events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat(), pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, date_from=date.today().isoformat()) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, first.id) - events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01') self.assertEqual(len(events), 2) - events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', date_to='2018-09-02', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, date_from='2018-09-01', date_to='2018-09-02') self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) # Category - events = self.user_misp_connector.search(timestamp=timeframe, category='Network activity', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, category='Network activity') self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) # toids - events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, to_ids='0') self.assertEqual(len(events), 2) - events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, to_ids='1') self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) self.assertEqual(len(events[0].attributes), 1) @@ -627,26 +628,26 @@ class TestComprehensive(unittest.TestCase): # deleted second.attributes[1].delete() self.user_misp_connector.update_event(second) - events = self.user_misp_connector.search(eventid=second.id, pythonify=True) + events = self.user_misp_connector.search(eventid=second.id) self.assertEqual(len(events[0].attributes), 1) - events = self.user_misp_connector.search(eventid=second.id, deleted=True, pythonify=True) + events = self.user_misp_connector.search(eventid=second.id, deleted=True) self.assertEqual(len(events[0].attributes), 1) # include_event_uuid - attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, include_event_uuid=True, pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, include_event_uuid=True) self.assertEqual(attributes[0].event_uuid, second.uuid) # event_timestamp time.sleep(1) second.add_attribute('ip-src', '8.8.8.9') second = self.user_misp_connector.update_event(second) - events = self.user_misp_connector.search(event_timestamp=second.timestamp.timestamp(), pythonify=True) + events = self.user_misp_connector.search(event_timestamp=second.timestamp.timestamp()) self.assertEqual(len(events), 1) # searchall second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment') second = self.user_misp_connector.update_event(second) - events = self.user_misp_connector.search(value='%for the full text%', searchall=True, pythonify=True) + events = self.user_misp_connector.search(value='%for the full text%', searchall=True) self.assertEqual(len(events), 1) # warninglist @@ -656,17 +657,17 @@ class TestComprehensive(unittest.TestCase): second.add_attribute('ip-src', '9.9.9.9') second = self.user_misp_connector.update_event(second) - events = self.user_misp_connector.search(eventid=second.id, pythonify=True) + events = self.user_misp_connector.search(eventid=second.id) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) self.assertEqual(len(events[0].attributes), 5) - events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=False, pythonify=True) + events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=False) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) self.assertEqual(len(events[0].attributes), 5) - events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=True, pythonify=True) + events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=True) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) self.assertEqual(len(events[0].attributes), 3) @@ -674,10 +675,10 @@ class TestComprehensive(unittest.TestCase): self.assertDictEqual(response, {'saved': True, 'success': '3 warninglist(s) toggled'}) # Page / limit - attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3, pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3) self.assertEqual(len(attributes), 3) - attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3, pythonify=True) + attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3) self.assertEqual(len(attributes), 2) time.sleep(1) # make sure the next attribute is added one at least one second later @@ -734,6 +735,8 @@ class TestComprehensive(unittest.TestCase): second = self.user_misp_connector.add_event(second) current_ts = int(time.time()) + # NOTE: no pythonify available yet + # r = self.user_misp_connector.add_sighting({'value': first.attributes[0].value}) r = self.user_misp_connector.add_sighting({'value': first.attributes[0].value}) self.assertEqual(r['message'], 'Sighting added') @@ -741,6 +744,8 @@ class TestComprehensive(unittest.TestCase): s.value = second.attributes[0].value s.source = 'Testcases' s.type = '1' + # NOTE: no pythonify available yet + # r = self.user_misp_connector.add_sighting(s, second.attributes[0].id) r = self.user_misp_connector.add_sighting(s, second.attributes[0].id) self.assertEqual(r['message'], 'Sighting added') @@ -781,15 +786,17 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(s[0]['sighting'].attribute_id, str(second.attributes[0].id)) # Get sightings from event/attribute / org - s = self.user_misp_connector.sightings(first, pythonify=True) + s = self.user_misp_connector.sightings(first) self.assertTrue(isinstance(s, list)) self.assertEqual(int(s[0].attribute_id), first.attributes[0].id) + # NOTE: no pythonify available yet + # r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id, pythonify=True) r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id) self.assertEqual(r['message'], 'Sighting added') - s = self.user_misp_connector.sightings(second.attributes[0], pythonify=True) + s = self.user_misp_connector.sightings(second.attributes[0]) self.assertEqual(len(s), 2) - s = self.user_misp_connector.sightings(second.attributes[0], self.test_org.id, pythonify=True) + s = self.user_misp_connector.sightings(second.attributes[0], self.test_org.id) self.assertEqual(len(s), 1) self.assertEqual(s[0].org_id, self.test_org.id) # Delete sighting @@ -819,39 +826,39 @@ class TestComprehensive(unittest.TestCase): first.attributes[0].to_ids = True first = self.user_misp_connector.update_event(first) self.admin_misp_connector.publish(first.id, alert=False) - csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp()) self.assertEqual(len(csv), 1) self.assertEqual(csv[0]['value'], first.attributes[0].value) # eventid - csv = self.user_misp_connector.search(return_format='csv', eventid=first.id, pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', eventid=first.id) self.assertEqual(len(csv), 1) self.assertEqual(csv[0]['value'], first.attributes[0].value) # category - csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Other') self.assertEqual(len(csv), 1) self.assertEqual(csv[0]['value'], first.attributes[0].value) - csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Person') self.assertEqual(len(csv), 0) # type_attribute - csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='text') self.assertEqual(len(csv), 1) self.assertEqual(csv[0]['value'], first.attributes[0].value) - csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src') self.assertEqual(len(csv), 0) # context - csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), include_context=True) self.assertEqual(len(csv), 1) self.assertTrue('event_info' in csv[0]) # date_from date_to - csv = self.user_misp_connector.search(return_format='csv', date_from=date.today().isoformat(), pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', date_from=date.today().isoformat()) self.assertEqual(len(csv), 1) self.assertEqual(csv[0]['value'], first.attributes[0].value) - csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02') self.assertEqual(len(csv), 2) # headerless @@ -862,14 +869,14 @@ class TestComprehensive(unittest.TestCase): # self.assertEqual(len(csv.strip().split('\n')), 2) # include_context - csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', include_context=True, pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', include_context=True) event_context_keys = ['event_info', 'event_member_org', 'event_source_org', 'event_distribution', 'event_threat_level_id', 'event_analysis', 'event_date', 'event_tag', 'event_timestamp'] for k in event_context_keys: self.assertTrue(k in csv[0]) # requested_attributes columns = ['value', 'event_id'] - csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', requested_attributes=columns, pythonify=True) + csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', requested_attributes=columns) self.assertEqual(len(csv[0].keys()), 2) for k in columns: self.assertTrue(k in csv[0]) @@ -1048,7 +1055,7 @@ class TestComprehensive(unittest.TestCase): first = self.create_simple_event() first.attributes[0].add_tag('non-exportable tag') try: - first = self.user_misp_connector.add_event(first, pythonify=True) + first = self.user_misp_connector.add_event(first) self.assertFalse(first.attributes[0].tags) first = self.admin_misp_connector.get_event(first, pythonify=True) # Reference: https://github.com/MISP/MISP/issues/1394 @@ -1075,7 +1082,7 @@ class TestComprehensive(unittest.TestCase): r = self.user_misp_connector.add_object(first.id, peo) self.assertEqual(r.name, 'pe', r) for ref in peo.ObjectReference: - r = self.user_misp_connector.add_object_reference(ref, pythonify=True) + r = self.user_misp_connector.add_object_reference(ref) # FIXME: https://github.com/MISP/MISP/issues/4866 # self.assertEqual(r.object_uuid, peo.uuid, r.to_json()) @@ -1083,7 +1090,7 @@ class TestComprehensive(unittest.TestCase): obj_attrs = r.get_attributes_by_relation('ssdeep') self.assertEqual(len(obj_attrs), 1, obj_attrs) self.assertEqual(r.name, 'file', r) - r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0], pythonify=True) + r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0]) # FIXME: https://github.com/MISP/MISP/issues/4866 # self.assertEqual(r.object_uuid, fo.uuid, r.to_json()) self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json()) @@ -1258,7 +1265,7 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(organisation.name, 'Test Org') # Update org organisation.name = 'blah' - organisation = self.admin_misp_connector.update_organisation(organisation) + organisation = self.admin_misp_connector.update_organisation(organisation, pythonify=True) self.assertEqual(organisation.name, 'blah', organisation) def test_attribute(self): @@ -1289,7 +1296,7 @@ class TestComprehensive(unittest.TestCase): new_attribute = self.user_misp_connector.update_attribute(new_attribute) self.assertEqual(new_attribute.value, '5.6.3.4') # Update attribute as proposal - new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False}, pythonify=True) + new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False}) self.assertEqual(new_proposal_update.to_ids, False) # Delete attribute as proposal proposal_delete = self.user_misp_connector.delete_attribute_proposal(new_attribute.id) @@ -1323,10 +1330,10 @@ class TestComprehensive(unittest.TestCase): self.assertTrue(isinstance(attribute, MISPShadowAttribute)) # Add attribute with the same value as an existing proposal prop_attr.uuid = str(uuid4()) - attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr) + attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr, pythonify=True) prop_attr.uuid = str(uuid4()) # Add a duplicate attribute (same value) - attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr) + attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr, pythonify=True) self.assertTrue('errors' in attribute) # Update attribute owned by someone else attribute = self.user_misp_connector.update_attribute({'comment': 'blah'}, second.attributes[0].id) @@ -1424,6 +1431,8 @@ class TestComprehensive(unittest.TestCase): # FIXME: https://github.com/MISP/MISP/issues/4880 # users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram') + # NOTE Not supported yet + # self.user_misp_connector.add_sighting({'value': first.attributes[0].value}) self.user_misp_connector.add_sighting({'value': first.attributes[0].value}) users_stats = self.user_misp_connector.users_statistics(context='sightings') self.assertEqual(list(users_stats.keys()), ['toplist', 'eventids']) @@ -1502,7 +1511,7 @@ class TestComprehensive(unittest.TestCase): first = self.create_simple_event() try: first = self.user_misp_connector.add_event(first) - first = self.admin_misp_connector.change_sharing_group_on_entity(first, sharing_group.id) + first = self.admin_misp_connector.change_sharing_group_on_entity(first, sharing_group.id, pythonify=True) self.assertEqual(first.SharingGroup['name'], 'Testcases SG') # FIXME https://github.com/MISP/MISP/issues/4891 # first_attribute = self.admin_misp_connector.change_sharing_group_on_entity(first.attributes[0], sharing_group.id) @@ -1606,7 +1615,7 @@ class TestComprehensive(unittest.TestCase): with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f: first.add_attribute('malware-sample', value='whoami.exe', data=BytesIO(f.read()), expand='binary') first.run_expansions() - first = self.admin_misp_connector.add_event(first) + first = self.admin_misp_connector.add_event(first, pythonify=True) self.assertEqual(len(first.objects), 7) finally: # Delete event @@ -1616,6 +1625,21 @@ class TestComprehensive(unittest.TestCase): # FIXME https://github.com/MISP/MISP/issues/4892 pass + def test_toggle_global_pythonify(self): + first = self.create_simple_event() + second = self.create_simple_event() + try: + self.admin_misp_connector.toggle_global_pythonify() + first = self.admin_misp_connector.add_event(first) + self.assertTrue(isinstance(first, MISPEvent)) + self.admin_misp_connector.toggle_global_pythonify() + second = self.admin_misp_connector.add_event(second) + self.assertTrue(isinstance(second, dict)) + finally: + # Delete event + self.admin_misp_connector.delete_event(first.id) + self.admin_misp_connector.delete_event(second['Event']['id']) + if __name__ == '__main__': unittest.main()