From ce4cb36d0dc92af340cd5eb3f752de5b984ee600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 17 Jul 2019 15:37:14 +0200 Subject: [PATCH] chg: Reorganise ExpandedPyMISP methods, normalise the parameters --- pymisp/aping.py | 1873 ++++++++++++++++--------------- tests/testlive_comprehensive.py | 13 +- 2 files changed, 987 insertions(+), 899 deletions(-) diff --git a/pymisp/aping.py b/pymisp/aping.py index 0c549e9..a7c2bfa 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -11,6 +11,7 @@ import json import requests from requests.auth import AuthBase import re +from uuid import UUID from . import __version__ from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey @@ -143,63 +144,310 @@ class ExpandedPyMISP(PyMISP): return {'version': '{}.{}.{}'.format(master_version['major'], master_version['minor'], master_version['hotfix'])} return {'error': 'Impossible to retrieve the version of the master branch.'} - # ## BEGIN Taxonomies ### + # ## BEGIN Event ## - def update_taxonomies(self): - """Update all the taxonomies.""" - response = self._prepare_request('POST', 'taxonomies/update') + def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=True): + '''Get an event from a MISP instance''' + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + event = self._prepare_request('GET', f'events/{event_id}') + event = self._check_response(event, expect_json=True) + if not pythonify or 'errors' in event: + return event + e = MISPEvent() + e.load(event) + return e + + def add_event(self, event: MISPEvent, pythonify: bool=True): + '''Add a new event on a MISP instance''' + new_event = self._prepare_request('POST', 'events', data=event) + new_event = self._check_response(new_event, expect_json=True) + if not pythonify or 'errors' in new_event: + return new_event + e = MISPEvent() + e.load(new_event) + return e + + def update_event(self, event: MISPEvent, event_id: int=None, pythonify: bool=True): + '''Update an event on a MISP instance''' + if event_id is None: + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + updated_event = self._prepare_request('POST', f'events/{event_id}', data=event) + updated_event = self._check_response(updated_event, expect_json=True) + if not pythonify or 'errors' in updated_event: + return updated_event + e = MISPEvent() + e.load(updated_event) + return e + + def delete_event(self, event: Union[MISPEvent, int, str, UUID]): + '''Delete an event from a MISP instance''' + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + response = self._prepare_request('DELETE', f'events/delete/{event_id}') return self._check_response(response, expect_json=True) - def taxonomies(self, pythonify: bool=False): - """Get all the taxonomies.""" - taxonomies = self._prepare_request('GET', 'taxonomies') - taxonomies = self._check_response(taxonomies, expect_json=True) - if not pythonify or 'errors' in taxonomies: - return taxonomies + def publish(self, event_id: int, alert: bool=False): + """Publish the event with one single HTTP POST. + The default is to not send a mail as it is assumed this method is called on update. + """ + if alert: + response = self._prepare_request('POST', f'events/alert/{event_id}') + else: + response = self._prepare_request('POST', f'events/publish/{event_id}') + return self._check_response(response, expect_json=True) + + # ## END Event ### + + # ## BEGIN Object ### + + def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool=True): + '''Get an object from the remote MISP instance''' + object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) + misp_object = self._prepare_request('GET', f'objects/view/{object_id}') + misp_object = self._check_response(misp_object, expect_json=True) + if not pythonify or 'errors' in misp_object: + return misp_object + o = MISPObject(misp_object['Object']['name']) + o.from_dict(**misp_object) + return o + + def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool=True): + '''Add a MISP Object to an existing MISP event''' + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + new_object = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) + new_object = self._check_response(new_object, expect_json=True) + if not pythonify or 'errors' in new_object: + return new_object + o = MISPObject(new_object['Object']['name']) + o.from_dict(**new_object) + return o + + def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=True): + '''Update an object on a MISP instance''' + if object_id is None: + object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) + updated_object = self._prepare_request('POST', f'objects/edit/{object_id}', data=misp_object) + updated_object = self._check_response(updated_object, expect_json=True) + if not pythonify or 'errors' in updated_object: + return updated_object + o = MISPObject(updated_object['Object']['name']) + o.from_dict(**updated_object) + return o + + def delete_object(self, misp_object: Union[MISPObject, int, str, UUID]): + '''Delete an object from a MISP instance''' + # FIXME: MISP doesn't support DELETE on this endpoint + object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) + response = self._prepare_request('POST', f'objects/delete/{object_id}') + return self._check_response(response, expect_json=True) + + def add_object_reference(self, misp_object_reference: MISPObjectReference, pythonify: bool=False): + """Add a reference to an object""" + object_reference = self._prepare_request('POST', 'object_references/add', misp_object_reference) + object_reference = self._check_response(object_reference, expect_json=True) + if not pythonify or 'errors' in object_reference: + return object_reference + r = MISPObjectReference() + r.from_dict(**object_reference) + return r + + def delete_object_reference(self, object_reference: Union[MISPObjectReference, int, str, UUID]): + """Delete a reference to an object""" + object_reference_id = self.__get_uuid_or_id_from_abstract_misp(object_reference) + response = self._prepare_request('POST', f'object_references/delete/{object_reference_id}') + return self._check_response(response, expect_json=True) + + # Object templates + + def object_templates(self, pythonify=False): + """Get all the object templates.""" + object_templates = self._prepare_request('GET', 'objectTemplates') + object_templates = self._check_response(object_templates, expect_json=True) + if not pythonify or 'errors' in object_templates: + return object_templates to_return = [] - for taxonomy in taxonomies: - t = MISPTaxonomy() - t.from_dict(**taxonomy) - to_return.append(t) + for object_template in object_templates: + o = MISPObjectTemplate() + o.from_dict(**object_template) + to_return.append(o) return to_return - def get_taxonomy(self, taxonomy_id: int, pythonify: bool=False): - """Get a taxonomy by id.""" - taxonomy = self._prepare_request('GET', f'taxonomies/view/{taxonomy_id}') - taxonomy = self._check_response(taxonomy, expect_json=True) - if not pythonify or 'errors' in taxonomy: - return taxonomy - t = MISPTaxonomy() - t.from_dict(**taxonomy) + def get_object_template(self, object_template: Union[MISPObjectTemplate, int, str, UUID], pythonify=False): + """Gets the full object template corresponting the UUID passed as parameter""" + object_template_id = self.__get_uuid_or_id_from_abstract_misp(object_template) + object_template = self._prepare_request('GET', f'objectTemplates/view/{object_template_id}') + object_template = self._check_response(object_template, expect_json=True) + if not pythonify or 'errors' in object_template: + return object_template + t = MISPObjectTemplate() + t.from_dict(**object_template) return t - def enable_taxonomy(self, taxonomy_id: int): - """Enable a taxonomy by id.""" - response = self._prepare_request('POST', f'taxonomies/enable/{taxonomy_id}') + def update_object_templates(self): + """Trigger an update of the object templates""" + response = self._prepare_request('POST', 'objectTemplates/update') return self._check_response(response, expect_json=True) - def disable_taxonomy(self, taxonomy_id: int): - """Disable a taxonomy by id.""" - self.disable_taxonomy_tags(taxonomy_id) - response = self._prepare_request('POST', f'taxonomies/disable/{taxonomy_id}') + # ## END Object ### + + # ## BEGIN Attribute ### + + def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=True): + '''Get an attribute from a MISP instance''' + attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) + attribute = self._prepare_request('GET', f'attributes/view/{attribute_id}') + attribute = self._check_response(attribute, expect_json=True) + if not pythonify or 'errors' in attribute: + return attribute + a = MISPAttribute() + a.from_dict(**attribute) + return a + + def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True): + '''Add an attribute to an existing MISP event''' + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute) + if new_attribute.status_code == 403: + # Re-try with a proposal + return self.add_attribute_proposal(event_id, attribute, pythonify) + new_attribute = self._check_response(new_attribute, expect_json=True) + if not pythonify or 'errors' in new_attribute: + return new_attribute + a = MISPAttribute() + a.from_dict(**new_attribute) + return a + + def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=True): + '''Update an attribute on a MISP instance''' + if attribute_id is None: + attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) + updated_attribute = self._prepare_request('POST', f'attributes/edit/{attribute_id}', data=attribute) + if updated_attribute.status_code == 403: + # Re-try with a proposal + return self.update_attribute_proposal(attribute_id, attribute, pythonify) + updated_attribute = self._check_response(updated_attribute, expect_json=True) + if not pythonify or 'errors' in updated_attribute: + return updated_attribute + a = MISPAttribute() + a.from_dict(**updated_attribute) + return a + + def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID]): + '''Delete an attribute from a MISP instance''' + attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) + response = self._prepare_request('POST', f'attributes/delete/{attribute_id}') + if response.status_code == 403: + # Re-try with a proposal + return self.delete_attribute_proposal(attribute_id) return self._check_response(response, expect_json=True) - def disable_taxonomy_tags(self, taxonomy_id: int): - """Disable all the tags of a taxonomy by id.""" - response = self._prepare_request('POST', 'taxonomies/disableTag/{taxonomy_id}') + # ## END Attribute ### + + # ## BEGIN Attribute Proposal ### + + def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=True): + proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) + attribute_proposal = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}') + attribute_proposal = self._check_response(attribute_proposal, expect_json=True) + if not pythonify or 'errors' in attribute_proposal: + return attribute_proposal + a = MISPShadowAttribute() + a.from_dict(**attribute_proposal) + return a + + # NOTE: the tree following method have a very specific meaning, look at the comments + + def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True): + '''Propose a new attribute in an event''' + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + # FIXME: attribute needs to be a complete MISPAttribute: https://github.com/MISP/MISP/issues/4868 + new_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute) + new_attribute_proposal = self._check_response(new_attribute_proposal, expect_json=True) + if not pythonify or 'errors' in new_attribute_proposal: + return new_attribute_proposal + a = MISPShadowAttribute() + a.from_dict(**new_attribute_proposal) + return a + + def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=True): + '''Propose a change for an attribute''' + # FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4857 + initial_attribute_id = self.__get_uuid_or_id_from_abstract_misp(initial_attribute) + attribute = {'ShadowAttribute': attribute} + update_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute) + update_attribute_proposal = self._check_response(update_attribute_proposal, expect_json=True) + if not pythonify or 'errors' in update_attribute_proposal: + return update_attribute_proposal + a = MISPShadowAttribute() + a.from_dict(**update_attribute_proposal) + return a + + def delete_attribute_proposal(self, attribute: Union[MISPAttribute, int, str, UUID]): + '''Propose the deletion of an attribute''' + attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) + response = self._prepare_request('POST', f'shadow_attributes/delete/{attribute_id}') return self._check_response(response, expect_json=True) - def enable_taxonomy_tags(self, taxonomy_id: int): - """Enable all the tags of a taxonomy by id. - NOTE: this automatically done when you call enable_taxonomy.""" - taxonomy = self.get_taxonomy(taxonomy_id) - if not taxonomy['Taxonomy']['enabled']: - raise PyMISPError(f"The taxonomy {taxonomy['Taxonomy']['name']} is not enabled.") - url = urljoin(self.root_url, 'taxonomies/addTag/{}'.format(taxonomy_id)) - response = self._prepare_request('POST', url) + # NOTE: You cannot modify an existing proposal, only accept/discard + + def accept_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID]): + '''Accept a proposal''' + proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) + response = self._prepare_request('POST', f'shadow_attributes/accept/{proposal_id}') return self._check_response(response, expect_json=True) - # ## END Taxonomies ### + def discard_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID]): + '''Discard a proposal''' + proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) + response = self._prepare_request('POST', f'shadow_attributes/discard/{proposal_id}') + return self._check_response(response, expect_json=True) + + # ## END Attribute Proposal ### + + # ## BEGIN Sighting ### + + def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify=False): + """Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)""" + # FIXME: https://github.com/MISP/MISP/issues/4875 + if isinstance(misp_entity, MISPEvent): + scope = 'event' + elif isinstance(misp_entity, MISPAttribute): + scope = 'attribute' + else: + raise PyMISPError('misp_entity can only be a MISPEvent or a MISPAttribute') + if org is not None: + org_id = self.__get_uuid_or_id_from_abstract_misp(org) + url = f'sightings/listSightings/{misp_entity.id}/{scope}/{org_id}' + else: + url = f'sightings/listSightings/{misp_entity.id}/{scope}' + sightings = self._prepare_request('POST', url) + sightings = self._check_response(sightings, expect_json=True) + if not pythonify or 'errors' in sightings: + return sightings + to_return = [] + for sighting in sightings: + s = MISPSighting() + s.from_dict(**sighting) + to_return.append(s) + return to_return + + def add_sighting(self, sighting: MISPSighting, attribute: Union[MISPAttribute, int, str, UUID]=None): + '''Add a new sighting (globally, or to a specific attribute)''' + # FIXME: no pythonify possible: https://github.com/MISP/MISP/issues/4867 + pythonify = False + if attribute: + attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) + new_sighting = self._prepare_request('POST', f'sightings/add/{attribute_id}', data=sighting) + else: + # Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value + new_sighting = self._prepare_request('POST', f'sightings/add', data=sighting) + new_sighting = self._check_response(new_sighting, expect_json=True) + if not pythonify or 'errors' in new_sighting: + return new_sighting + s = MISPSighting() + s.from_dict(**new_sighting) + return s + + # ## END Sighting ### # ## BEGIN Tags ### @@ -216,8 +464,9 @@ class ExpandedPyMISP(PyMISP): to_return.append(t) return to_return - def get_tag(self, tag_id: int, pythonify: bool=False): + def get_tag(self, tag: Union[MISPTag, int, str, UUID], pythonify: bool=False): """Get a tag by id.""" + tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) tag = self._prepare_request('GET', f'tags/view/{tag_id}') tag = self._check_response(tag, expect_json=True) if not pythonify or 'errors' in tag: @@ -249,11 +498,10 @@ class ExpandedPyMISP(PyMISP): def update_tag(self, tag: MISPTag, tag_id: int=None, pythonify: bool=False): """Edit only the provided parameters of a tag.""" if tag_id is None: - if tag.get('id') is None: - raise PyMISPError('The name of the tag you want to update is required. Either directly in the parameters of the method or in the tag itself.') - tag_id = tag.id + tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) # FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4852 - updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', {'Tag': tag}) + tag = {'Tag': tag} + updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', data=tag) updated_tag = self._check_response(updated_tag, expect_json=True) if not pythonify or 'errors' in updated_tag: return updated_tag @@ -261,12 +509,77 @@ class ExpandedPyMISP(PyMISP): t.from_dict(**updated_tag) return t - def delete_tag(self, tag_id: int): + def delete_tag(self, tag: Union[MISPTag, int, str, UUID]): + '''Delete an attribute from a MISP instance''' + tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) response = self._prepare_request('POST', f'tags/delete/{tag_id}') return self._check_response(response, expect_json=True) # ## END Tags ### + # ## BEGIN Taxonomies ### + + def update_taxonomies(self): + """Update all the taxonomies.""" + response = self._prepare_request('POST', 'taxonomies/update') + return self._check_response(response, expect_json=True) + + def taxonomies(self, pythonify: bool=False): + """Get all the taxonomies.""" + taxonomies = self._prepare_request('GET', 'taxonomies') + taxonomies = self._check_response(taxonomies, expect_json=True) + if not pythonify or 'errors' in taxonomies: + return taxonomies + to_return = [] + for taxonomy in taxonomies: + t = MISPTaxonomy() + t.from_dict(**taxonomy) + to_return.append(t) + return to_return + + def get_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID], pythonify: bool=False): + """Get a taxonomy from a MISP instance.""" + taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) + taxonomy = self._prepare_request('GET', f'taxonomies/view/{taxonomy_id}') + taxonomy = self._check_response(taxonomy, expect_json=True) + if not pythonify or 'errors' in taxonomy: + return taxonomy + t = MISPTaxonomy() + t.from_dict(**taxonomy) + return t + + def enable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + """Enable a taxonomy.""" + taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) + response = self._prepare_request('POST', f'taxonomies/enable/{taxonomy_id}') + return self._check_response(response, expect_json=True) + + def disable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + """Disable a taxonomy.""" + taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) + self.disable_taxonomy_tags(taxonomy_id) + response = self._prepare_request('POST', f'taxonomies/disable/{taxonomy_id}') + return self._check_response(response, expect_json=True) + + def disable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + """Disable all the tags of a taxonomy.""" + taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) + response = self._prepare_request('POST', f'taxonomies/disableTag/{taxonomy_id}') + return self._check_response(response, expect_json=True) + + def enable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + """Enable all the tags of a taxonomy. + NOTE: this automatically done when you call enable_taxonomy.""" + taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) + taxonomy = self.get_taxonomy(taxonomy_id) + if not taxonomy['Taxonomy']['enabled']: + raise PyMISPError(f"The taxonomy {taxonomy['Taxonomy']['name']} is not enabled.") + url = urljoin(self.root_url, 'taxonomies/addTag/{}'.format(taxonomy_id)) + response = self._prepare_request('POST', url) + return self._check_response(response, expect_json=True) + + # ## END Taxonomies ### + # ## BEGIN Warninglists ### def warninglists(self, pythonify: bool=False): @@ -282,8 +595,9 @@ class ExpandedPyMISP(PyMISP): to_return.append(w) return to_return - def get_warninglist(self, warninglist_id: int, pythonify: bool=False): - """Get a warninglist by id.""" + def get_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID], pythonify: bool=False): + """Get a warninglist.""" + warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) warninglist = self._prepare_request('GET', f'warninglists/view/{warninglist_id}') warninglist = self._check_response(warninglist, expect_json=True) if not pythonify or 'errors' in warninglist: @@ -319,12 +633,14 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('POST', 'warninglists/update') return self._check_response(response, expect_json=True) - def enable_warninglist(self, warninglist_id: int): - """Enable a warninglist by id.""" + def enable_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID]): + """Enable a warninglist.""" + warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=True) - def disable_warninglist(self, warninglist_id: int): - """Disable a warninglist by id.""" + def disable_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID]): + """Disable a warninglist.""" + warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False) def values_in_warninglist(self, value: list): @@ -334,247 +650,6 @@ class ExpandedPyMISP(PyMISP): # ## END Warninglists ### - # FIXME: ids can be UUID, str, or integer - # ## BEGIN Event ### - - def get_event(self, event_id: int, pythonify: bool=True): - event = self._prepare_request('GET', f'events/{event_id}') - event = self._check_response(event, expect_json=True) - if not pythonify or 'errors' in event: - return event - e = MISPEvent() - e.load(event) - return e - - def add_event(self, event: MISPEvent, pythonify: bool=True): - '''Add a new event on a MISP instance''' - new_event = self._prepare_request('POST', 'events', data=event) - new_event = self._check_response(new_event, expect_json=True) - if not pythonify or 'errors' in new_event: - return new_event - e = MISPEvent() - e.load(new_event) - return e - - def update_event(self, event: MISPEvent, event_id: int=None, pythonify: bool=True): - '''Update an event on a MISP instance''' - if event_id is None: - if event.get('id') is None: - raise PyMISPError('The ID of the event you want to update is required. Either directly in the parameters of the method or in the event itself.') - event_id = event.id - updated_event = self._prepare_request('POST', f'events/{event_id}', data=event) - updated_event = self._check_response(updated_event, expect_json=True) - if not pythonify or 'errors' in updated_event: - return updated_event - e = MISPEvent() - e.load(updated_event) - return e - - def delete_event(self, event_id: int): - response = self._prepare_request('DELETE', f'events/delete/{event_id}') - return self._check_response(response, expect_json=True) - - def publish(self, event_id: int, alert: bool=False): - """Publish the event with one single HTTP POST. - The default is to not send a mail as it is assumed this method is called on update. - """ - if alert: - response = self._prepare_request('POST', f'events/alert/{event_id}') - else: - response = self._prepare_request('POST', f'events/publish/{event_id}') - return self._check_response(response, expect_json=True) - - # ## END Event ### - - # ## BEGIN Object ### - - def get_object(self, object_id: int, pythonify: bool=True): - misp_object = self._prepare_request('GET', f'objects/view/{object_id}') - misp_object = self._check_response(misp_object, expect_json=True) - if not pythonify or 'errors' in misp_object: - return misp_object - o = MISPObject(misp_object['Object']['name']) - o.from_dict(**misp_object) - return o - - def add_object(self, event_id: int, misp_object: MISPObject, pythonify: bool=True): - '''Add a MISP Object to an existing MISP event''' - new_object = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) - new_object = self._check_response(new_object, expect_json=True) - if not pythonify or 'errors' in new_object: - return new_object - o = MISPObject(new_object['Object']['name']) - o.from_dict(**new_object) - return o - - def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=True): - if object_id is None: - if misp_object.get('id') is None: - raise PyMISPError('The ID of the object you want to update is required. Either directly in the parameters of the method or in the object itself.') - object_id = misp_object.id - updated_object = self._prepare_request('POST', f'objects/edit/{object_id}', data=misp_object) - updated_object = self._check_response(updated_object, expect_json=True) - if not pythonify or 'errors' in updated_object: - return updated_object - o = MISPObject(updated_object['Object']['name']) - o.from_dict(**updated_object) - return o - - def delete_object(self, object_id: int): - # FIXME: MISP doesn't support DELETE on this endpoint - response = self._prepare_request('POST', f'objects/delete/{object_id}') - return self._check_response(response, expect_json=True) - - def add_object_reference(self, misp_object_reference: MISPObjectReference, pythonify: bool=False): - """Add a reference to an object""" - object_reference = self._prepare_request('POST', 'object_references/add', misp_object_reference) - object_reference = self._check_response(object_reference, expect_json=True) - if not pythonify or 'errors' in object_reference: - return object_reference - r = MISPObjectReference() - r.from_dict(**object_reference) - return r - - def delete_object_reference(self, object_reference_id: int): - response = self._prepare_request('POST', f'object_references/delete/{object_reference_id}') - return self._check_response(response, expect_json=True) - - def object_templates(self, pythonify=False): - """Get all the object templates.""" - object_templates = self._prepare_request('GET', 'objectTemplates') - object_templates = self._check_response(object_templates, expect_json=True) - if not pythonify or 'errors' in object_templates: - return object_templates - to_return = [] - for object_template in object_templates: - o = MISPObjectTemplate() - o.from_dict(**object_template) - to_return.append(o) - return to_return - - def get_object_template(self, object_id: int, pythonify=False): - """Gets the full object template corresponting the UUID passed as parameter""" - object_template = self._prepare_request('GET', f'objectTemplates/view/{object_id}') - object_template = self._check_response(object_template, expect_json=True) - if not pythonify or 'errors' in object_template: - return object_template - t = MISPObjectTemplate() - t.from_dict(**object_template) - return t - - def update_object_templates(self): - response = self._prepare_request('POST', 'objectTemplates/update') - return self._check_response(response, expect_json=True) - - # ## END Object ### - - # ## BEGIN Attribute ### - - def get_attribute(self, attribute_id: int, pythonify: bool=True): - attribute = self._prepare_request('GET', f'attributes/view/{attribute_id}') - attribute = self._check_response(attribute, expect_json=True) - if not pythonify or 'errors' in attribute: - return attribute - a = MISPAttribute() - a.from_dict(**attribute) - return a - - def add_attribute(self, event_id: int, attribute: MISPAttribute, pythonify: bool=True): - '''Add an attribute to an existing MISP event''' - new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute) - if new_attribute.status_code == 403: - # Re-try with a proposal - return self.add_attribute_proposal(event_id, attribute, pythonify) - new_attribute = self._check_response(new_attribute, expect_json=True) - if not pythonify or 'errors' in new_attribute: - return new_attribute - a = MISPAttribute() - a.from_dict(**new_attribute) - return a - - def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=True): - if attribute_id is None: - if attribute.get('uuid'): - attribute_id = attribute.uuid - elif attribute.get('id'): - attribute_id = attribute.id - else: - raise PyMISPError('The ID of the attribute you want to update is required. Either directly in the parameters of the method or in the attribute itself.') - updated_attribute = self._prepare_request('POST', f'attributes/edit/{attribute_id}', data=attribute) - if updated_attribute.status_code == 403: - # Re-try with a proposal - return self.update_attribute_proposal(attribute_id, attribute, pythonify) - updated_attribute = self._check_response(updated_attribute, expect_json=True) - if not pythonify or 'errors' in updated_attribute: - return updated_attribute - a = MISPAttribute() - a.from_dict(**updated_attribute) - return a - - def delete_attribute(self, attribute_id: int): - response = self._prepare_request('POST', f'attributes/delete/{attribute_id}') - if response.status_code == 403: - # Re-try with a proposal - return self.delete_attribute_proposal(attribute_id) - return self._check_response(response, expect_json=True) - - # ## END Attribute ### - - # ## BEGIN Attribute Proposal ### - - def get_attribute_proposal(self, proposal_id: int, pythonify: bool=True): - attribute_proposal = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}') - attribute_proposal = self._check_response(attribute_proposal, expect_json=True) - if not pythonify or 'errors' in attribute_proposal: - return attribute_proposal - a = MISPShadowAttribute() - a.from_dict(**attribute_proposal) - return a - - # NOTE: the tree following method have a very specific meaning, look at the comments - - def add_attribute_proposal(self, event_id: int, attribute: MISPAttribute, pythonify: bool=True): - '''Propose a new attribute in an event''' - # FIXME: attribute needs to be a complete MISPAttribute: https://github.com/MISP/MISP/issues/4868 - new_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute) - new_attribute_proposal = self._check_response(new_attribute_proposal, expect_json=True) - if not pythonify or 'errors' in new_attribute_proposal: - return new_attribute_proposal - a = MISPShadowAttribute() - a.from_dict(**new_attribute_proposal) - return a - - def update_attribute_proposal(self, attribute_id: int, attribute: MISPAttribute, pythonify: bool=True): - '''Propose a change for an attribute''' - # FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4857 - attribute = {'ShadowAttribute': attribute} - update_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/edit/{attribute_id}', data=attribute) - update_attribute_proposal = self._check_response(update_attribute_proposal, expect_json=True) - if not pythonify or 'errors' in update_attribute_proposal: - return update_attribute_proposal - a = MISPShadowAttribute() - a.from_dict(**update_attribute_proposal) - return a - - def delete_attribute_proposal(self, attribute_id: int): - '''Propose the deletion of an attribute''' - response = self._prepare_request('POST', f'shadow_attributes/delete/{attribute_id}') - return self._check_response(response, expect_json=True) - - # NOTE: You cannot modify an existing proposal, only accept/discard - - def accept_attribute_proposal(self, proposal_id: int): - '''Accept a proposal''' - response = self._prepare_request('POST', f'shadow_attributes/accept/{proposal_id}') - return self._check_response(response, expect_json=True) - - def discard_attribute_proposal(self, proposal_id: int): - '''Discard a proposal''' - response = self._prepare_request('POST', f'shadow_attributes/discard/{proposal_id}') - return self._check_response(response, expect_json=True) - - # ## END Attribute Proposal ### - # ## BEGIN Noticelist ### def noticelists(self, pythonify=False): @@ -590,8 +665,9 @@ class ExpandedPyMISP(PyMISP): to_return.append(n) return to_return - def get_noticelist(self, noticelist_id: int, pythonify=False): + def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify=False): """Get a noticelist by id.""" + noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) noticelist = self._prepare_request('GET', f'noticelists/view/{noticelist_id}') noticelist = self._check_response(noticelist, expect_json=True) if not pythonify or 'errors' in noticelist: @@ -600,17 +676,19 @@ class ExpandedPyMISP(PyMISP): n.from_dict(**noticelist) return n - def enable_noticelist(self, noticelist_id): + def enable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]): """Enable a noticelist by id.""" # FIXME: https://github.com/MISP/MISP/issues/4856 # response = self._prepare_request('POST', f'noticelists/enable/{noticelist_id}') + noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}/true') return self._check_response(response, expect_json=True) - def disable_noticelist(self, noticelist_id): + def disable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]): """Disable a noticelist by id.""" # FIXME: https://github.com/MISP/MISP/issues/4856 # response = self._prepare_request('POST', f'noticelists/disable/{noticelist_id}') + noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}') return self._check_response(response, expect_json=True) @@ -619,7 +697,7 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('POST', 'noticelists/update') return self._check_response(response, expect_json=True) - # ## END Galaxy ### + # ## END Noticelist ### # ## BEGIN Galaxy ### @@ -636,8 +714,9 @@ class ExpandedPyMISP(PyMISP): to_return.append(g) return to_return - def get_galaxy(self, galaxy_id: int, pythonify=False): + def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify=False): """Get a galaxy by id.""" + galaxy_id = self.__get_uuid_or_id_from_abstract_misp(galaxy) galaxy = self._prepare_request('GET', f'galaxies/view/{galaxy_id}') galaxy = self._check_response(galaxy, expect_json=True) if not pythonify or 'errors' in galaxy: @@ -653,310 +732,6 @@ class ExpandedPyMISP(PyMISP): # ## END Galaxy ### - # ## BEGIN User ### - - def users(self, pythonify=False): - """Get all the users.""" - users = self._prepare_request('GET', 'admin/users') - users = self._check_response(users, expect_json=True) - if not pythonify or 'errors' in users: - return users - to_return = [] - for user in users: - u = MISPUser() - u.from_dict(**user) - to_return.append(u) - return to_return - - def get_user(self, user_id: int='me', pythonify: bool=False): - '''Get a user. `me` means the owner of the API key doing the query.''' - user = self._prepare_request('GET', f'users/view/{user_id}') - user = self._check_response(user, expect_json=True) - if not pythonify or 'errors' in user: - return user - u = MISPUser() - u.from_dict(**user) - return u - - def add_user(self, user: MISPUser, pythonify: bool=False): - user = self._prepare_request('POST', f'admin/users/add', data=user) - user = self._check_response(user, expect_json=True) - if not pythonify or 'errors' in user: - return user - u = MISPUser() - u.from_dict(**user) - return u - - def update_user(self, user: MISPUser, user_id: int=None, pythonify: bool=False): - '''Update an event on a MISP instance''' - if user_id is None: - if user.get('id') is None: - raise PyMISPError('The ID of the user you want to update is required. Either directly in the parameters of the method or in the user itself.') - user_id = user.id - updated_user = self._prepare_request('POST', f'admin/users/edit/{user_id}', data=user) - updated_user = self._check_response(updated_user, expect_json=True) - if not pythonify or 'errors' in updated_user: - return updated_user - e = MISPUser() - e.from_dict(**updated_user) - return e - - def delete_user(self, user_id: int): - # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE - response = self._prepare_request('POST', f'admin/users/delete/{user_id}') - return self._check_response(response, expect_json=True) - - # ## END User ### - - # ## BEGIN Organisation ### - - def organisations(self, scope="local", pythonify=False): - """Get all the organisations.""" - organisations = self._prepare_request('GET', f'organisations/index/scope:{scope}') - organisations = self._check_response(organisations, expect_json=True) - if not pythonify or 'errors' in organisations: - return organisations - to_return = [] - for organisation in organisations: - o = MISPOrganisation() - o.from_dict(**organisation) - to_return.append(o) - return to_return - - def get_organisation(self, organisation_id: int, pythonify: bool=True): - '''Get an organisation.''' - organisation = self._prepare_request('GET', f'organisations/view/{organisation_id}') - organisation = self._check_response(organisation, expect_json=True) - if not pythonify or 'errors' in organisation: - return organisation - o = MISPOrganisation() - o.from_dict(**organisation) - return o - - def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=True): - new_organisation = self._prepare_request('POST', f'admin/organisations/add', data=organisation) - new_organisation = self._check_response(new_organisation, expect_json=True) - if not pythonify or 'errors' in new_organisation: - return new_organisation - o = MISPOrganisation() - o.from_dict(**new_organisation) - return o - - def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=True): - '''Update an organisation''' - if organisation_id is None: - if organisation.get('id') is None: - raise PyMISPError('The ID of the organisation you want to update is required. Either directly in the parameters of the method or in the organisation itself.') - organisation_id = organisation.id - updated_organisation = self._prepare_request('POST', f'admin/organisations/edit/{organisation_id}', data=organisation) - updated_organisation = self._check_response(updated_organisation, expect_json=True) - if not pythonify or 'errors' in updated_organisation: - return updated_organisation - o = MISPOrganisation() - o.from_dict(**organisation) - return o - - def delete_organisation(self, organisation_id: int): - # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE - response = self._prepare_request('POST', f'admin/organisations/delete/{organisation_id}') - return self._check_response(response, expect_json=True) - - # ## END Organisation ### - - # ## BEGIN Sighting ### - - def sightings(self, misp_entity: AbstractMISP, org_id: int=None, pythonify=False): - """Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)""" - # FIXME: https://github.com/MISP/MISP/issues/4875 - if isinstance(misp_entity, MISPEvent): - scope = 'event' - elif isinstance(misp_entity, MISPAttribute): - scope = 'attribute' - else: - raise PyMISPError('misp_entity can only be a MISPEvent or a MISPAttribute') - if org_id is not None: - url = f'sightings/listSightings/{misp_entity.id}/{scope}/{org_id}' - else: - url = f'sightings/listSightings/{misp_entity.id}/{scope}' - sightings = self._prepare_request('POST', url) - sightings = self._check_response(sightings, expect_json=True) - if not pythonify or 'errors' in sightings: - return sightings - to_return = [] - for sighting in sightings: - s = MISPSighting() - s.from_dict(**sighting) - to_return.append(s) - return to_return - - def add_sighting(self, sighting: MISPSighting, attribute_id: int=None): - # FIXME: no pythonify possible: https://github.com/MISP/MISP/issues/4867 - pythonify = False - if attribute_id: - new_sighting = self._prepare_request('POST', f'sightings/add/{attribute_id}', data=sighting) - else: - # Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value - new_sighting = self._prepare_request('POST', f'sightings/add', data=sighting) - new_sighting = self._check_response(new_sighting, expect_json=True) - if not pythonify or 'errors' in new_sighting: - return new_sighting - s = MISPSighting() - s.from_dict(**new_sighting) - return s - - # ## END Sighting ### - - # ## BEGIN Statistics ### - - def attributes_statistics(self, context: str='type', percentage: bool=False): - """Get attributes statistics from the MISP instance.""" - # FIXME: https://github.com/MISP/MISP/issues/4874 - if context not in ['type', 'category']: - raise PyMISPError('context can only be "type" or "category"') - if percentage: - path = f'attributes/attributeStatistics/{context}/true' - else: - path = f'attributes/attributeStatistics/{context}' - response = self._prepare_request('GET', path) - return self._check_response(response, expect_json=True) - - def tags_statistics(self, percentage: bool=False, name_sort: bool=False): - """Get tags statistics from the MISP instance""" - # FIXME: https://github.com/MISP/MISP/issues/4874 - # NOTE: https://github.com/MISP/MISP/issues/4879 - if percentage: - percentage = 'true' - else: - percentage = 'false' - if name_sort: - name_sort = 'true' - else: - name_sort = 'false' - response = self._prepare_request('GET', f'tags/tagStatistics/{percentage}/{name_sort}') - return self._check_response(response) - - def users_statistics(self, context: str='data'): - """Get users statistics from the MISP instance""" - # FIXME: https://github.com/MISP/MISP/issues/4874 - availables_contexts = ['data', 'orgs', 'users', 'tags', 'attributehistogram', 'sightings', 'galaxyMatrix'] - if context not in availables_contexts: - raise PyMISPError("context can only be {','.join(availables_contexts)}") - response = self._prepare_request('GET', f'users/statistics/{context}.json') - return self._check_response(response) - - # ## END Statistics ### - - # ## BEGIN Others ### - - def push_event_to_ZMQ(self, event_id: int): - """Force push an event on ZMQ""" - response = self._prepare_request('POST', f'events/pushEventToZMQ/{event_id}.json') - return self._check_response(response, expect_json=True) - - def direct_call(self, url: str, data: dict=None, params: dict={}): - '''Very lightweight call that posts a data blob (python dictionary or json string) on the URL''' - if data is None: - response = self._prepare_request('GET', url, params=params) - else: - response = self._prepare_request('POST', url, data=data, params=params) - return self._check_response(response, lenient_response_type=True) - - def freetext(self, event_id: int, string: str, adhereToWarninglists: Union[bool, str]=False, - distribution: int=None, returnMetaAttributes: bool=False, pythonify=False): - """Pass a text to the freetext importer""" - query = {"value": string} - wl_params = [False, True, 'soft'] - if adhereToWarninglists in wl_params: - query['adhereToWarninglists'] = adhereToWarninglists - else: - raise Exception('Invalid parameter, adhereToWarninglists Can only be {}'.format(', '.join(wl_params))) - if distribution is not None: - query['distribution'] = distribution - if returnMetaAttributes: - query['returnMetaAttributes'] = returnMetaAttributes - attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query) - attributes = self._check_response(attributes, expect_json=True) - if returnMetaAttributes or not pythonify or 'errors' in attributes: - return attributes - to_return = [] - for attribute in attributes: - a = MISPAttribute() - a.from_dict(**attribute) - to_return.append(a) - return to_return - - # ## END Others ### - - # ## BEGIN Sharing group ### - - def sharing_groups(self, pythonify: bool=False): - """Get the existing sharing groups""" - sharing_groups = self._prepare_request('GET', 'sharing_groups') - sharing_groups = self._check_response(sharing_groups, expect_json=True) - if not pythonify or 'errors' in sharing_groups: - return sharing_groups - to_return = [] - for sharing_group in sharing_groups: - s = MISPSharingGroup() - s.from_dict(**sharing_group) - to_return.append(s) - return to_return - - def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=True): - sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group) - sharing_group = self._check_response(sharing_group, expect_json=True) - # FIXME: https://github.com/MISP/MISP/issues/4882 - sharing_group = sharing_group[0] - if not pythonify or 'errors' in sharing_group: - return sharing_group - s = MISPSharingGroup() - s.from_dict(**sharing_group) - return s - - def delete_sharing_group(self, sharing_group_id: int): - response = self._prepare_request('POST', f'sharing_groups/delete/{sharing_group_id}') - return self._check_response(response, expect_json=True) - - def add_org_to_sharing_group(self, sharing_group_id: int, organisation_id: int, extend: bool=False): - '''Add an organisation to a sharing group. - :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID - :organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance - :extend: Allow the organisation to extend the group - ''' - to_jsonify = {'sg_id': sharing_group_id, 'org_id': organisation_id, 'extend': extend} - response = self._prepare_request('POST', 'sharingGroups/addOrg', data=to_jsonify) - return self._check_response(response) - - def remove_org_from_sharing_group(self, sharing_group_id: int, organisation_id: int): - '''Remove an organisation from a sharing group. - :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID - :organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance - ''' - to_jsonify = {'sg_id': sharing_group_id, 'org_id': organisation_id} - response = self._prepare_request('POST', 'sharingGroups/removeOrg', data=to_jsonify) - return self._check_response(response) - - def add_server_to_sharing_group(self, sharing_group_id: int, server_id: int, all_orgs: bool=False): - '''Add a server to a sharing group. - :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID - :server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance - :all_orgs: Add all the organisations of the server to the group - ''' - to_jsonify = {'sg_id': sharing_group_id, 'server_id': server_id, 'all_orgs': all_orgs} - response = self._prepare_request('POST', 'sharingGroups/addServer', data=to_jsonify) - return self._check_response(response) - - def remove_server_from_sharing_group(self, sharing_group_id: int, server_id: int): - '''Remove a server from a sharing group. - :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID - :server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance - ''' - to_jsonify = {'sg_id': sharing_group_id, 'server_id': server_id} - response = self._prepare_request('POST', 'sharingGroups/removeServer', data=to_jsonify) - return self._check_response(response) - - # ## END Sharing groups ### - # ## BEGIN Feed ### def feeds(self, pythonify: bool=False): @@ -972,8 +747,9 @@ class ExpandedPyMISP(PyMISP): to_return.append(f) return to_return - def get_feed(self, feed_id: int, pythonify: bool=False): + def get_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): """Get a feed by id.""" + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) feed = self._prepare_request('GET', f'feeds/view/{feed_id}') feed = self._check_response(feed, expect_json=True) if not pythonify or 'errors' in feed: @@ -994,36 +770,46 @@ class ExpandedPyMISP(PyMISP): f.from_dict(**new_feed) return f - def enable_feed(self, feed_id: int, pythonify: bool=False): - feed = MISPFeed() - feed.id = feed_id - feed.enabled = True + def enable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + '''Enable a feed (fetching it will create event(s)''' + if not isinstance(feed, MISPFeed): + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID + feed = MISPFeed() + feed.id = feed_id + feed.enabled = True return self.update_feed(feed=feed, pythonify=pythonify) - def disable_feed(self, feed_id: int, pythonify: bool=False): - feed = MISPFeed() - feed.id = feed_id - feed.enabled = False + def disable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + '''Disable a feed''' + if not isinstance(feed, MISPFeed): + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID + feed = MISPFeed() + feed.id = feed_id + feed.enabled = False return self.update_feed(feed=feed, pythonify=pythonify) - def enable_feed_cache(self, feed_id: int, pythonify: bool=False): - feed = MISPFeed() - feed.id = feed_id - feed.caching_enabled = True + def enable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + '''Enable the caching of a feed''' + if not isinstance(feed, MISPFeed): + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID + feed = MISPFeed() + feed.id = feed_id + feed.caching_enabled = True return self.update_feed(feed=feed, pythonify=pythonify) - def disable_feed_cache(self, feed_id: int, pythonify: bool=False): - feed = MISPFeed() - feed.id = feed_id - feed.caching_enabled = False + def disable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + '''Disable the caching of a feed''' + if not isinstance(feed, MISPFeed): + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID + feed = MISPFeed() + feed.id = feed_id + feed.caching_enabled = False return self.update_feed(feed=feed, pythonify=pythonify) def update_feed(self, feed: MISPFeed, feed_id: int=None, pythonify: bool=False): '''Update a feed on a MISP instance''' if feed_id is None: - if feed.get('id') is None: - raise PyMISPError('The ID of the feed you want to update is required. Either directly in the parameters of the method or in the user itself.') - feed_id = feed.id + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # FIXME: https://github.com/MISP/MISP/issues/4834 feed = {'Feed': feed} updated_feed = self._prepare_request('POST', f'feeds/edit/{feed_id}', data=feed) @@ -1034,12 +820,15 @@ class ExpandedPyMISP(PyMISP): f.from_dict(**updated_feed) return f - def delete_feed(self, feed_id: int): + def delete_feed(self, feed: Union[MISPFeed, int, str, UUID]): + '''Delete a feed from a MISP instance''' + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('POST', f'feeds/delete/{feed_id}') return self._check_response(response, expect_json=True) - def fetch_feed(self, feed_id): + def fetch_feed(self, feed: Union[MISPFeed, int, str, UUID]): """Fetch one single feed""" + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('GET', f'feeds/fetchFromFeed/{feed_id}') return self._check_response(response) @@ -1048,8 +837,9 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('GET', 'feeds/cacheFeeds/all') return self._check_response(response) - def cache_feed(self, feed_id): + def cache_feed(self, feed: Union[MISPFeed, int, str, UUID]): """Cache a specific feed""" + feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('GET', f'feeds/cacheFeeds/{feed_id}') return self._check_response(response) @@ -1070,6 +860,276 @@ class ExpandedPyMISP(PyMISP): # ## END Feed ### + # ## BEGIN Server ### + + def servers(self, pythonify=False): + """Get the existing servers the MISP instance can synchronise with""" + servers = self._prepare_request('GET', 'servers') + servers = self._check_response(servers, expect_json=True) + if not pythonify or 'errors' in servers: + return servers + to_return = [] + for server in servers: + s = MISPServer() + s.from_dict(**server) + to_return.append(s) + return to_return + + def add_server(self, server: MISPServer, pythonify: bool=True): + """Add a server to synchronise with""" + server = self._prepare_request('POST', f'servers/add', data=server) + server = self._check_response(server, expect_json=True) + if not pythonify or 'errors' in server: + return server + s = MISPServer() + s.from_dict(**server) + return s + + def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=True): + '''Update a server to synchronise with''' + if server_id is None: + server_id = self.__get_uuid_or_id_from_abstract_misp(server) + updated_server = self._prepare_request('POST', f'servers/edit/{server_id}', data=server) + updated_server = self._check_response(updated_server, expect_json=True) + if not pythonify or 'errors' in updated_server: + return updated_server + s = MISPServer() + s.from_dict(**updated_server) + return s + + def delete_server(self, server: Union[MISPServer, int, str, UUID]): + '''Delete a sync server''' + server_id = self.__get_uuid_or_id_from_abstract_misp(server) + response = self._prepare_request('POST', f'servers/delete/{server_id}') + return self._check_response(response, expect_json=True) + + def server_pull(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None): + '''Initialize a pull from a sync server''' + server_id = self.__get_uuid_or_id_from_abstract_misp(server) + # FIXME: POST & data + if event: + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + url = f'servers/pull/{server_id}/{event_id}' + else: + url = f'servers/pull/{server_id}' + response = self._prepare_request('GET', url) + # FIXME: can we pythonify? + return self._check_response(response) + + def server_push(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None): + '''Initialize a push to a sync server''' + server_id = self.__get_uuid_or_id_from_abstract_misp(server) + # FIXME: POST & data + if event: + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + url = f'servers/push/{server_id}/{event_id}' + else: + url = f'servers/push/{server_id}' + response = self._prepare_request('GET', url) + # FIXME: can we pythonify? + return self._check_response(response) + + # ## END Server ### + + # ## BEGIN Sharing group ### + + def sharing_groups(self, pythonify: bool=False): + """Get the existing sharing groups""" + sharing_groups = self._prepare_request('GET', 'sharing_groups') + sharing_groups = self._check_response(sharing_groups, expect_json=True) + if not pythonify or 'errors' in sharing_groups: + return sharing_groups + to_return = [] + for sharing_group in sharing_groups: + s = MISPSharingGroup() + s.from_dict(**sharing_group) + to_return.append(s) + return to_return + + def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=True): + """Add a new sharing group""" + sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group) + sharing_group = self._check_response(sharing_group, expect_json=True) + # FIXME: https://github.com/MISP/MISP/issues/4882 + sharing_group = sharing_group[0] + if not pythonify or 'errors' in sharing_group: + return sharing_group + s = MISPSharingGroup() + s.from_dict(**sharing_group) + return s + + def delete_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID]): + """Delete a sharing group""" + sharing_group_id = self.__get_uuid_or_id_from_abstract_misp(sharing_group) + response = self._prepare_request('POST', f'sharing_groups/delete/{sharing_group_id}') + return self._check_response(response, expect_json=True) + + def add_org_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], + organisation: Union[MISPOrganisation, int, str, UUID], extend: bool=False): + '''Add an organisation to a sharing group. + :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID + :organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance + :extend: Allow the organisation to extend the group + ''' + sharing_group_id = self.__get_uuid_or_id_from_abstract_misp(sharing_group) + organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) + to_jsonify = {'sg_id': sharing_group_id, 'org_id': organisation_id, 'extend': extend} + response = self._prepare_request('POST', 'sharingGroups/addOrg', data=to_jsonify) + return self._check_response(response) + + def remove_org_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], + organisation: Union[MISPOrganisation, int, str, UUID]): + '''Remove an organisation from a sharing group. + :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID + :organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance + ''' + sharing_group_id = self.__get_uuid_or_id_from_abstract_misp(sharing_group) + organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) + to_jsonify = {'sg_id': sharing_group_id, 'org_id': organisation_id} + response = self._prepare_request('POST', 'sharingGroups/removeOrg', data=to_jsonify) + return self._check_response(response) + + def add_server_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], + server: Union[MISPServer, int, str, UUID], all_orgs: bool=False): + '''Add a server to a sharing group. + :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID + :server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance + :all_orgs: Add all the organisations of the server to the group + ''' + sharing_group_id = self.__get_uuid_or_id_from_abstract_misp(sharing_group) + server_id = self.__get_uuid_or_id_from_abstract_misp(server) + to_jsonify = {'sg_id': sharing_group_id, 'server_id': server_id, 'all_orgs': all_orgs} + response = self._prepare_request('POST', 'sharingGroups/addServer', data=to_jsonify) + return self._check_response(response) + + def remove_server_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], + server: Union[MISPServer, int, str, UUID]): + '''Remove a server from a sharing group. + :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID + :server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance + ''' + sharing_group_id = self.__get_uuid_or_id_from_abstract_misp(sharing_group) + server_id = self.__get_uuid_or_id_from_abstract_misp(server) + to_jsonify = {'sg_id': sharing_group_id, 'server_id': server_id} + response = self._prepare_request('POST', 'sharingGroups/removeServer', data=to_jsonify) + return self._check_response(response) + + # ## END Sharing groups ### + + # ## BEGIN Organisation ### + + def organisations(self, scope="local", pythonify=False): + """Get all the organisations.""" + organisations = self._prepare_request('GET', f'organisations/index/scope:{scope}') + organisations = self._check_response(organisations, expect_json=True) + if not pythonify or 'errors' in organisations: + return organisations + to_return = [] + for organisation in organisations: + o = MISPOrganisation() + o.from_dict(**organisation) + to_return.append(o) + return to_return + + def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=True): + '''Get an organisation.''' + organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) + organisation = self._prepare_request('GET', f'organisations/view/{organisation_id}') + organisation = self._check_response(organisation, expect_json=True) + if not pythonify or 'errors' in organisation: + return organisation + o = MISPOrganisation() + o.from_dict(**organisation) + return o + + def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=True): + '''Add an organisation''' + new_organisation = self._prepare_request('POST', f'admin/organisations/add', data=organisation) + new_organisation = self._check_response(new_organisation, expect_json=True) + if not pythonify or 'errors' in new_organisation: + return new_organisation + o = MISPOrganisation() + o.from_dict(**new_organisation) + return o + + def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=True): + '''Update an organisation''' + if organisation_id is None: + organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) + updated_organisation = self._prepare_request('POST', f'admin/organisations/edit/{organisation_id}', data=organisation) + updated_organisation = self._check_response(updated_organisation, expect_json=True) + if not pythonify or 'errors' in updated_organisation: + return updated_organisation + o = MISPOrganisation() + o.from_dict(**organisation) + return o + + def delete_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID]): + '''Delete an organisation''' + # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE + organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) + response = self._prepare_request('POST', f'admin/organisations/delete/{organisation_id}') + return self._check_response(response, expect_json=True) + + # ## END Organisation ### + + # ## BEGIN User ### + + def users(self, pythonify=False): + """Get all the users.""" + users = self._prepare_request('GET', 'admin/users') + users = self._check_response(users, expect_json=True) + if not pythonify or 'errors' in users: + return users + to_return = [] + for user in users: + u = MISPUser() + u.from_dict(**user) + to_return.append(u) + return to_return + + def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False): + '''Get a user. `me` means the owner of the API key doing the query.''' + user_id = self.__get_uuid_or_id_from_abstract_misp(user) + user = self._prepare_request('GET', f'users/view/{user_id}') + user = self._check_response(user, expect_json=True) + if not pythonify or 'errors' in user: + return user + u = MISPUser() + u.from_dict(**user) + return u + + def add_user(self, user: MISPUser, pythonify: bool=False): + '''Add a new user''' + user = self._prepare_request('POST', f'admin/users/add', data=user) + user = self._check_response(user, expect_json=True) + if not pythonify or 'errors' in user: + return user + u = MISPUser() + u.from_dict(**user) + return u + + def update_user(self, user: MISPUser, user_id: int=None, pythonify: bool=False): + '''Update an event on a MISP instance''' + if user_id is None: + user_id = self.__get_uuid_or_id_from_abstract_misp(user) + updated_user = self._prepare_request('POST', f'admin/users/edit/{user_id}', data=user) + updated_user = self._check_response(updated_user, expect_json=True) + if not pythonify or 'errors' in updated_user: + return updated_user + e = MISPUser() + e.from_dict(**updated_user) + return e + + def delete_user(self, user: Union[MISPUser, int, str, UUID]): + '''Delete a user''' + # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE + user_id = self.__get_uuid_or_id_from_abstract_misp(user) + response = self._prepare_request('POST', f'admin/users/delete/{user_id}') + return self._check_response(response, expect_json=True) + + # ## END User ### + # ## BEGIN Role ### def roles(self, pythonify: bool=False): @@ -1085,195 +1145,16 @@ class ExpandedPyMISP(PyMISP): to_return.append(r) return to_return + def set_default_role(self, role: Union[MISPRole, int, str, UUID]): + role_id = self.__get_uuid_or_id_from_abstract_misp(role) + url = urljoin(self.root_url, f'/admin/roles/set_default/{role_id}') + response = self._prepare_request('POST', url) + return self._check_response(response, expect_json=True) + # ## END Role ### - # ## BEGIN Server ### - - def servers(self, pythonify=False): - """Get the existing servers""" - servers = self._prepare_request('GET', 'servers') - servers = self._check_response(servers, expect_json=True) - if not pythonify or 'errors' in servers: - return servers - to_return = [] - for server in servers: - s = MISPServer() - s.from_dict(**server) - to_return.append(s) - return to_return - - def add_server(self, server: MISPServer, pythonify: bool=True): - server = self._prepare_request('POST', f'servers/add', data=server) - server = self._check_response(server, expect_json=True) - if not pythonify or 'errors' in server: - return server - s = MISPServer() - s.from_dict(**server) - return s - - def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=True): - '''Update a server on a MISP instance''' - if server_id is None: - if server.get('id') is None: - raise PyMISPError('The ID of the server you want to update is required. Either directly in the parameters of the method or in the user itself.') - server_id = server.id - updated_server = self._prepare_request('POST', f'servers/edit/{server_id}', data=server) - updated_server = self._check_response(updated_server, expect_json=True) - if not pythonify or 'errors' in updated_server: - return updated_server - s = MISPServer() - s.from_dict(**updated_server) - return s - - def delete_server(self, server_id: int): - response = self._prepare_request('POST', f'servers/delete/{server_id}') - return self._check_response(response, expect_json=True) - - def server_pull(self, server_id: int, event_id: int=None): - # FIXME: POST & data - if event_id: - url = f'servers/pull/{server_id}/{event_id}' - else: - url = f'servers/pull/{server_id}' - response = self._prepare_request('GET', url) - # FIXME: can we pythonify? - return self._check_response(response) - - def server_push(self, server_id: int, event_id: int=None): - # FIXME: POST & data - if event_id: - url = f'servers/push/{server_id}/{event_id}' - else: - url = f'servers/push/{server_id}' - response = self._prepare_request('GET', url) - # FIXME: can we pythonify? - return self._check_response(response) - - # ## END Server ### - - # ## BEGIN Global helpers ### - - def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id): - """Change the sharing group of an event, an attribute, or an object""" - misp_entity.distribution = 4 # Needs to be 'Sharing group' - if 'SharingGroup' in misp_entity: # Delete former SharingGroup information - del misp_entity.SharingGroup - misp_entity.sharing_group_id = sharing_group_id # Set new sharing group id - if isinstance(misp_entity, MISPEvent): - return self.update_event(misp_entity) - elif isinstance(misp_entity, MISPObject): - return self.update_object(misp_entity) - elif isinstance(misp_entity, MISPAttribute): - return self.update_attribute(misp_entity) - else: - raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute') - - def tag(self, misp_entity: Union[AbstractMISP, str], tag: str): - """Tag an event or an attribute. misp_entity can be a UUID""" - if 'uuid' in misp_entity: - uuid = misp_entity.uuid - else: - uuid = misp_entity - to_post = {'uuid': uuid, 'tag': tag} - response = self._prepare_request('POST', 'tags/attachTagToObject', data=to_post) - return self._check_response(response, expect_json=True) - - def untag(self, misp_entity: Union[AbstractMISP, str], tag: str): - """Untag an event or an attribute. misp_entity can be a UUID""" - if 'uuid' in misp_entity: - uuid = misp_entity.uuid - else: - uuid = misp_entity - to_post = {'uuid': uuid, 'tag': tag} - response = self._prepare_request('POST', 'tags/removeTagFromObject', data=to_post) - return self._check_response(response, expect_json=True) - - # ## END Global helpers ### - # ## BEGIN Search methods ### - def search_sightings(self, context: Optional[str]=None, - context_id: Optional[SearchType]=None, - type_sighting: Optional[str]=None, - date_from: Optional[DateTypes]=None, - date_to: Optional[DateTypes]=None, - publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None, - org: Optional[SearchType]=None, - source: Optional[str]=None, - include_attribute: Optional[bool]=None, - include_event_meta: Optional[bool]=None, - pythonify: Optional[bool]=False - ): - '''Search sightings - - :param context: The context of the search. Can be either "attribute", "event", or nothing (will then match on events and attributes). - :param context_id: Only relevant if context is either "attribute" or "event". Then it is the relevant ID. - :param type_sighting: Type of sighting - :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. - :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. - :param publish_timestamp: Restrict the results by the last publish timestamp (newer than). - :param org: Search by the creator organisation by supplying the organisation identifier. - :param source: Source of the sighting - :param include_attribute: Include the attribute. - :param include_event_meta: Include the meta information of the event. - - Deprecated: - - :param last: synonym for publish_timestamp - - :Example: - - >>> misp.search_sightings(publish_timestamp='30d') # search sightings for the last 30 days on the instance - [ ... ] - >>> misp.search_sightings(context='attribute', context_id=6, include_attribute=True) # return list of sighting for attribute 6 along with the attribute itself - [ ... ] - >>> misp.search_sightings(context='event', context_id=17, include_event_meta=True, org=2) # return list of sighting for event 17 filtered with org id 2 - ''' - query = {'returnFormat': 'json'} - if context is not None: - if context not in ['attribute', 'event']: - raise ValueError('context has to be in {}'.format(', '.join(['attribute', 'event']))) - url_path = f'sightings/restSearch/{context}' - else: - url_path = 'sightings/restSearch' - query['id'] = context_id - query['type'] = type_sighting - query['from'] = date_from - query['to'] = date_to - query['last'] = publish_timestamp - query['org_id'] = org - query['source'] = source - query['includeAttribute'] = include_attribute - query['includeEvent'] = include_event_meta - - url = urljoin(self.root_url, url_path) - response = self._prepare_request('POST', url, data=query) - normalized_response = self._check_response(response, expect_json=True) - if not pythonify or 'errors' in normalized_response: - return normalized_response - elif pythonify: - to_return = [] - for s in normalized_response: - entries = {} - s_data = s['Sighting'] - if include_event_meta: - e = s_data.pop('Event') - me = MISPEvent() - me.from_dict(**e) - entries['event'] = me - if include_attribute: - a = s_data.pop('Attribute') - ma = MISPAttribute() - ma.from_dict(**a) - entries['attribute'] = ma - ms = MISPSighting() - ms.from_dict(**s_data) - entries['sighting'] = ms - to_return.append(entries) - return to_return - else: - return normalized_response - def search(self, controller: str='events', return_format: str='json', limit: Optional[int]=None, page: Optional[int]=None, value: Optional[SearchParameterTypes]=None, @@ -1383,22 +1264,22 @@ class ExpandedPyMISP(PyMISP): query['org'] = org query['tags'] = tags query['quickFilter'] = quick_filter - query['from'] = self.make_timestamp(date_from) - query['to'] = self.make_timestamp(date_to) + query['from'] = self._make_timestamp(date_from) + query['to'] = self._make_timestamp(date_to) query['eventid'] = eventid query['withAttachments'] = with_attachments query['metadata'] = metadata query['uuid'] = uuid if publish_timestamp is not None: if isinstance(publish_timestamp, (list, tuple)): - query['publish_timestamp'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1])) + query['publish_timestamp'] = (self._make_timestamp(publish_timestamp[0]), self._make_timestamp(publish_timestamp[1])) else: - query['publish_timestamp'] = self.make_timestamp(publish_timestamp) + query['publish_timestamp'] = self._make_timestamp(publish_timestamp) if timestamp is not None: if isinstance(timestamp, (list, tuple)): - query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1])) + query['timestamp'] = (self._make_timestamp(timestamp[0]), self._make_timestamp(timestamp[1])) else: - query['timestamp'] = self.make_timestamp(timestamp) + query['timestamp'] = self._make_timestamp(timestamp) query['published'] = published query['enforceWarninglist'] = enforce_warninglist if to_ids is not None: @@ -1409,9 +1290,9 @@ class ExpandedPyMISP(PyMISP): query['includeEventUuid'] = include_event_uuid if event_timestamp is not None: if isinstance(event_timestamp, (list, tuple)): - query['event_timestamp'] = (self.make_timestamp(event_timestamp[0]), self.make_timestamp(event_timestamp[1])) + query['event_timestamp'] = (self._make_timestamp(event_timestamp[0]), self._make_timestamp(event_timestamp[1])) else: - query['event_timestamp'] = self.make_timestamp(event_timestamp) + query['event_timestamp'] = self._make_timestamp(event_timestamp) query['sgReferenceOnly'] = sg_reference_only query['eventinfo'] = eventinfo query['searchall'] = searchall @@ -1448,6 +1329,142 @@ class ExpandedPyMISP(PyMISP): else: return normalized_response + def search_index(self, published: Optional[bool]=None, eventid: Optional[SearchType]=None, + tags: Optional[SearchParameterTypes]=None, + date_from: Optional[DateTypes]=None, + date_to: Optional[DateTypes]=None, + eventinfo: Optional[str]=None, + threatlevel: Optional[List[SearchType]]=None, + distribution: Optional[List[SearchType]]=None, + analysis: Optional[List[SearchType]]=None, + org: Optional[SearchParameterTypes]=None, + timestamp: Optional[DateInterval]=None, + pythonify: Optional[bool]=None): + """Search only at the index level. Using ! in front of a value means NOT (default is OR) + + :param published: Set whether published or unpublished events should be returned. Do not set the parameter if you want both. + :param eventid: The events that should be included / excluded from the search + :param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query` + :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. + :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. + :param eventinfo: Filter on the event's info field. + :param threatlevel: Threat level(s) (1,2,3,4) | list + :param distribution: Distribution level(s) (0,1,2,3) | list + :param analysis: Analysis level(s) (0,1,2) | list + :param org: Search by the creator organisation by supplying the organisation identifier. + :param timestamp: Restrict the results by the timestamp (last edit). Any event with a timestamp newer than the given timestamp will be returned. In case you are dealing with /attributes as scope, the attribute's timestamp will be used for the lookup. + :param pythonify: Returns a list of PyMISP Objects instead or the plain json output. Warning: it might use a lot of RAM + """ + query = locals() + query.pop('self') + query.pop('pythonify') + if query.get('date_from'): + query['datefrom'] = self._make_timestamp(query.pop('date_from')) + if query.get('date_to'): + query['dateuntil'] = self._make_timestamp(query.pop('date_to')) + + if query.get('timestamp') is not None: + timestamp = query.pop('timestamp') + if isinstance(timestamp, (list, tuple)): + query['timestamp'] = (self._make_timestamp(timestamp[0]), self._make_timestamp(timestamp[1])) + else: + query['timestamp'] = self._make_timestamp(timestamp) + + url = urljoin(self.root_url, 'events/index') + response = self._prepare_request('POST', url, data=query) + normalized_response = self._check_response(response, expect_json=True) + + if not pythonify: + return normalized_response + to_return = [] + for e_meta in normalized_response: + me = MISPEvent() + me.from_dict(**e_meta) + to_return.append(me) + return to_return + + def search_sightings(self, context: Optional[str]=None, + context_id: Optional[SearchType]=None, + type_sighting: Optional[str]=None, + date_from: Optional[DateTypes]=None, + date_to: Optional[DateTypes]=None, + publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None, + org: Optional[SearchType]=None, + source: Optional[str]=None, + include_attribute: Optional[bool]=None, + include_event_meta: Optional[bool]=None, + pythonify: Optional[bool]=False + ): + '''Search sightings + + :param context: The context of the search. Can be either "attribute", "event", or nothing (will then match on events and attributes). + :param context_id: Only relevant if context is either "attribute" or "event". Then it is the relevant ID. + :param type_sighting: Type of sighting + :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. + :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. + :param publish_timestamp: Restrict the results by the last publish timestamp (newer than). + :param org: Search by the creator organisation by supplying the organisation identifier. + :param source: Source of the sighting + :param include_attribute: Include the attribute. + :param include_event_meta: Include the meta information of the event. + + Deprecated: + + :param last: synonym for publish_timestamp + + :Example: + + >>> misp.search_sightings(publish_timestamp='30d') # search sightings for the last 30 days on the instance + [ ... ] + >>> misp.search_sightings(context='attribute', context_id=6, include_attribute=True) # return list of sighting for attribute 6 along with the attribute itself + [ ... ] + >>> misp.search_sightings(context='event', context_id=17, include_event_meta=True, org=2) # return list of sighting for event 17 filtered with org id 2 + ''' + query = {'returnFormat': 'json'} + if context is not None: + if context not in ['attribute', 'event']: + raise ValueError('context has to be in {}'.format(', '.join(['attribute', 'event']))) + url_path = f'sightings/restSearch/{context}' + else: + url_path = 'sightings/restSearch' + query['id'] = context_id + query['type'] = type_sighting + query['from'] = date_from + query['to'] = date_to + query['last'] = publish_timestamp + query['org_id'] = org + query['source'] = source + query['includeAttribute'] = include_attribute + query['includeEvent'] = include_event_meta + + url = urljoin(self.root_url, url_path) + response = self._prepare_request('POST', url, data=query) + normalized_response = self._check_response(response, expect_json=True) + if not pythonify or 'errors' in normalized_response: + return normalized_response + elif pythonify: + to_return = [] + for s in normalized_response: + entries = {} + s_data = s['Sighting'] + if include_event_meta: + e = s_data.pop('Event') + me = MISPEvent() + me.from_dict(**e) + entries['event'] = me + if include_attribute: + a = s_data.pop('Attribute') + ma = MISPAttribute() + ma.from_dict(**a) + entries['attribute'] = ma + ms = MISPSighting() + ms.from_dict(**s_data) + entries['sighting'] = ms + to_return.append(entries) + return to_return + else: + return normalized_response + def search_logs(self, limit: Optional[int]=None, page: Optional[int]=None, log_id: Optional[int]=None, title: Optional[str]=None, created: Optional[DateTypes]=None, model: Optional[str]=None, @@ -1492,65 +1509,49 @@ class ExpandedPyMISP(PyMISP): to_return.append(ml) return to_return - def search_index(self, published: Optional[bool]=None, eventid: Optional[SearchType]=None, - tags: Optional[SearchParameterTypes]=None, - date_from: Optional[DateTypes]=None, - date_to: Optional[DateTypes]=None, - eventinfo: Optional[str]=None, - threatlevel: Optional[List[SearchType]]=None, - distribution: Optional[List[SearchType]]=None, - analysis: Optional[List[SearchType]]=None, - org: Optional[SearchParameterTypes]=None, - timestamp: Optional[DateInterval]=None, - pythonify: Optional[bool]=None): - """Search only at the index level. Using ! in front of a value means NOT (default is OR) + # ## END Search methods ### - :param published: Set whether published or unpublished events should be returned. Do not set the parameter if you want both. - :param eventid: The events that should be included / excluded from the search - :param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query` - :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. - :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. - :param eventinfo: Filter on the event's info field. - :param threatlevel: Threat level(s) (1,2,3,4) | list - :param distribution: Distribution level(s) (0,1,2,3) | list - :param analysis: Analysis level(s) (0,1,2) | list - :param org: Search by the creator organisation by supplying the organisation identifier. - :param timestamp: Restrict the results by the timestamp (last edit). Any event with a timestamp newer than the given timestamp will be returned. In case you are dealing with /attributes as scope, the attribute's timestamp will be used for the lookup. - :param pythonify: Returns a list of PyMISP Objects instead or the plain json output. Warning: it might use a lot of RAM - """ - query = locals() - query.pop('self') - query.pop('pythonify') - if query.get('date_from'): - query['datefrom'] = self.make_timestamp(query.pop('date_from')) - if query.get('date_to'): - query['dateuntil'] = self.make_timestamp(query.pop('date_to')) + # ## BEGIN Others ### - if query.get('timestamp') is not None: - timestamp = query.pop('timestamp') - if isinstance(timestamp, (list, tuple)): - query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1])) - else: - query['timestamp'] = self.make_timestamp(timestamp) - - url = urljoin(self.root_url, 'events/index') - response = self._prepare_request('POST', url, data=query) - normalized_response = self._check_response(response, expect_json=True) - - if not pythonify: - return normalized_response - to_return = [] - for e_meta in normalized_response: - me = MISPEvent() - me.from_dict(**e_meta) - to_return.append(me) - return to_return - - def set_default_role(self, role_id: int): - url = urljoin(self.root_url, f'/admin/roles/set_default/{role_id}') - response = self._prepare_request('POST', url) + def push_event_to_ZMQ(self, event: Union[MISPEvent, int, str, UUID]): + """Force push an event on ZMQ""" + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + response = self._prepare_request('POST', f'events/pushEventToZMQ/{event_id}.json') return self._check_response(response, expect_json=True) + def direct_call(self, url: str, data: dict=None, params: dict={}): + '''Very lightweight call that posts a data blob (python dictionary or json string) on the URL''' + if data is None: + response = self._prepare_request('GET', url, params=params) + else: + response = self._prepare_request('POST', url, data=data, params=params) + return self._check_response(response, lenient_response_type=True) + + def freetext(self, event: Union[MISPEvent, int, str, UUID], string: str, adhereToWarninglists: Union[bool, str]=False, + distribution: int=None, returnMetaAttributes: bool=False, pythonify=False): + """Pass a text to the freetext importer""" + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + query = {"value": string} + wl_params = [False, True, 'soft'] + if adhereToWarninglists in wl_params: + query['adhereToWarninglists'] = adhereToWarninglists + else: + raise Exception('Invalid parameter, adhereToWarninglists Can only be {}'.format(', '.join(wl_params))) + if distribution is not None: + query['distribution'] = distribution + if returnMetaAttributes: + query['returnMetaAttributes'] = returnMetaAttributes + attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query) + attributes = self._check_response(attributes, expect_json=True) + if returnMetaAttributes or not pythonify or 'errors' in attributes: + return attributes + to_return = [] + for attribute in attributes: + a = MISPAttribute() + a.from_dict(**attribute) + to_return.append(a) + return to_return + def upload_stix(self, path, version: str='2'): """Upload a STIX file to MISP. :param path: Path to the STIX on the disk (can be a path-like object, or a pseudofile) @@ -1574,11 +1575,112 @@ class ExpandedPyMISP(PyMISP): return response - # ## END Search methods ### + # ## END Others ### - # ## Helpers ### + # ## BEGIN Statistics ### - def make_timestamp(self, value: DateTypes): + def attributes_statistics(self, context: str='type', percentage: bool=False): + """Get attributes statistics from the MISP instance.""" + # FIXME: https://github.com/MISP/MISP/issues/4874 + if context not in ['type', 'category']: + raise PyMISPError('context can only be "type" or "category"') + if percentage: + path = f'attributes/attributeStatistics/{context}/true' + else: + path = f'attributes/attributeStatistics/{context}' + response = self._prepare_request('GET', path) + return self._check_response(response, expect_json=True) + + def tags_statistics(self, percentage: bool=False, name_sort: bool=False): + """Get tags statistics from the MISP instance""" + # FIXME: https://github.com/MISP/MISP/issues/4874 + # NOTE: https://github.com/MISP/MISP/issues/4879 + if percentage: + percentage = 'true' + else: + percentage = 'false' + if name_sort: + name_sort = 'true' + else: + name_sort = 'false' + response = self._prepare_request('GET', f'tags/tagStatistics/{percentage}/{name_sort}') + return self._check_response(response) + + def users_statistics(self, context: str='data'): + """Get users statistics from the MISP instance""" + # FIXME: https://github.com/MISP/MISP/issues/4874 + availables_contexts = ['data', 'orgs', 'users', 'tags', 'attributehistogram', 'sightings', 'galaxyMatrix'] + if context not in availables_contexts: + raise PyMISPError("context can only be {','.join(availables_contexts)}") + response = self._prepare_request('GET', f'users/statistics/{context}.json') + return self._check_response(response) + + # ## END Statistics ### + + # ## BEGIN Global helpers ### + + def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id): + """Change the sharing group of an event, an attribute, or an object""" + misp_entity.distribution = 4 # Needs to be 'Sharing group' + if 'SharingGroup' in misp_entity: # Delete former SharingGroup information + del misp_entity.SharingGroup + misp_entity.sharing_group_id = sharing_group_id # Set new sharing group id + if isinstance(misp_entity, MISPEvent): + return self.update_event(misp_entity) + elif isinstance(misp_entity, MISPObject): + return self.update_object(misp_entity) + elif isinstance(misp_entity, MISPAttribute): + return self.update_attribute(misp_entity) + else: + raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute') + + def tag(self, misp_entity: Union[AbstractMISP, str], tag: str): + """Tag an event or an attribute. misp_entity can be a UUID""" + if 'uuid' in misp_entity: + uuid = misp_entity.uuid + else: + uuid = misp_entity + to_post = {'uuid': uuid, 'tag': tag} + response = self._prepare_request('POST', 'tags/attachTagToObject', data=to_post) + return self._check_response(response, expect_json=True) + + def untag(self, misp_entity: Union[AbstractMISP, str], tag: str): + """Untag an event or an attribute. misp_entity can be a UUID""" + if 'uuid' in misp_entity: + uuid = misp_entity.uuid + else: + uuid = misp_entity + to_post = {'uuid': uuid, 'tag': tag} + response = self._prepare_request('POST', 'tags/removeTagFromObject', data=to_post) + return self._check_response(response, expect_json=True) + + def build_complex_query(self, or_parameters: Optional[List[SearchType]]=None, + and_parameters: Optional[List[SearchType]]=None, + not_parameters: Optional[List[SearchType]]=None): + '''Build a complex search query. MISP expects a dictionary with AND, OR and NOT keys.''' + to_return = {} + if and_parameters: + to_return['AND'] = and_parameters + if not_parameters: + to_return['NOT'] = not_parameters + if or_parameters: + to_return['OR'] = or_parameters + return to_return + + # ## END Global helpers ### + + # ## Internal methods ### + + def __get_uuid_or_id_from_abstract_misp(self, obj: Union[AbstractMISP, int, str, UUID]): + if isinstance(obj, UUID): + return str(obj) + if isinstance(obj, (int, str)): + return obj + elif 'id' in obj: + return obj['id'] + return obj['uuid'] + + def _make_timestamp(self, value: DateTypes): '''Catch-all method to normalize anything that can be converted to a timestamp''' if isinstance(value, datetime): return datetime.timestamp() @@ -1597,21 +1699,6 @@ class ExpandedPyMISP(PyMISP): else: return value - def build_complex_query(self, or_parameters: Optional[List[SearchType]]=None, - and_parameters: Optional[List[SearchType]]=None, - not_parameters: Optional[List[SearchType]]=None): - '''Build a complex search query. MISP expects a dictionary with AND, OR and NOT keys.''' - to_return = {} - if and_parameters: - to_return['AND'] = and_parameters - if not_parameters: - to_return['NOT'] = not_parameters - if or_parameters: - to_return['OR'] = or_parameters - return to_return - - # ## Internal methods ### - def _check_response(self, response, lenient_response_type=False, expect_json=False): """Check if the response from the server is not an unexpected error""" if response.status_code >= 500: @@ -1680,7 +1767,7 @@ class ExpandedPyMISP(PyMISP): settings = s.merge_environment_settings(req.url, proxies=self.proxies or {}, stream=None, verify=self.ssl, cert=self.cert) return s.send(prepped, **settings) - def _csv_to_dict(self, csv_content): + def _csv_to_dict(self, csv_content: str): '''Makes a list of dict out of a csv file (requires headers)''' fieldnames, lines = csv_content.split('\n', 1) fieldnames = fieldnames.split(',') diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index e4fd526..46fab83 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -81,11 +81,11 @@ class TestComprehensive(unittest.TestCase): @classmethod def tearDownClass(cls): # Delete publisher - cls.admin_misp_connector.delete_user(user_id=cls.test_pub.id) + cls.admin_misp_connector.delete_user(cls.test_pub.id) # Delete user - cls.admin_misp_connector.delete_user(user_id=cls.test_usr.id) + cls.admin_misp_connector.delete_user(cls.test_usr.id) # Delete org - cls.admin_misp_connector.delete_organisation(organisation_id=cls.test_org.id) + cls.admin_misp_connector.delete_organisation(cls.test_org.id) def create_simple_event(self, force_timestamps=False): mispevent = MISPEvent(force_timestamps=force_timestamps) @@ -1282,7 +1282,7 @@ class TestComprehensive(unittest.TestCase): # Update org organisation.name = 'blah' organisation = self.admin_misp_connector.update_organisation(organisation) - self.assertEqual(organisation.name, 'blah') + self.assertEqual(organisation.name, 'blah', organisation) def test_attribute(self): first = self.create_simple_event() @@ -1501,12 +1501,12 @@ class TestComprehensive(unittest.TestCase): # add org # FIXME: https://github.com/MISP/MISP/issues/4884 # r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group.id, - # organisation_id=self.test_org.id, extend=True) + # self.test_org.id, extend=True) # delete org # FIXME: https://github.com/MISP/MISP/issues/4884 # r = self.admin_misp_connector.remove_org_from_sharing_group(sharing_group.id, - # organisation_id=self.test_org.id) + # self.test_org.id) # Get list sharing_groups = self.admin_misp_connector.sharing_groups(pythonify=True) @@ -1558,6 +1558,7 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(botvrij.url, "http://www.botvrij.eu/data/feed-osint") # Enable # MISP OSINT + print(feeds[0].id) feed = self.admin_misp_connector.enable_feed(feeds[0].id, pythonify=True) self.assertTrue(feed.enabled) feed = self.admin_misp_connector.enable_feed_cache(feeds[0].id, pythonify=True)