From aa17663b58356bd2a8cffedba1c891378cdaaec4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 3 Jan 2020 15:42:15 +0100 Subject: [PATCH] chg: Add more typing information --- pymisp/api.py | 491 ++++++++++++++++++++++---------------------- pymisp/mispevent.py | 2 +- 2 files changed, 247 insertions(+), 246 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 137fc8d..2a03d6a 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -109,30 +109,30 @@ class PyMISP: return self._check_response(response, expect_json=True) @property - def describe_types_local(self): + def describe_types_local(self) -> dict: '''Returns the content of describe types from the package''' return describe_types @property - def describe_types_remote(self): + def describe_types_remote(self) -> dict: '''Returns the content of describe types from the remote instance''' response = self._prepare_request('GET', 'attributes/describeTypes.json') remote_describe_types = self._check_response(response, expect_json=True) return remote_describe_types['result'] @property - def recommended_pymisp_version(self): + def recommended_pymisp_version(self) -> dict: """Returns the recommended API version from the server""" response = self._prepare_request('GET', 'servers/getPyMISPVersion.json') return self._check_response(response, expect_json=True) @property - def version(self): + def version(self) -> dict: """Returns the version of PyMISP you're curently using""" return {'version': __version__} @property - def pymisp_version_master(self): + def pymisp_version_master(self) -> dict: """Get the most recent version of PyMISP from github""" r = requests.get('https://raw.githubusercontent.com/MISP/PyMISP/master/pymisp/__init__.py') if r.status_code == 200: @@ -141,13 +141,13 @@ class PyMISP: return {'error': 'Impossible to retrieve the version of the master branch.'} @property - def misp_instance_version(self): + def misp_instance_version(self) -> dict: """Returns the version of the instance.""" response = self._prepare_request('GET', 'servers/getVersion.json') return self._check_response(response, expect_json=True) @property - def misp_instance_version_master(self): + def misp_instance_version_master(self) -> dict: """Get the most recent version from github""" r = requests.get('https://raw.githubusercontent.com/MISP/MISP/2.4/VERSION.json') if r.status_code == 200: @@ -155,94 +155,94 @@ class PyMISP: return {'version': '{}.{}.{}'.format(master_version['major'], master_version['minor'], master_version['hotfix'])} return {'error': 'Impossible to retrieve the version of the master branch.'} - def update_misp(self): + def update_misp(self) -> dict: response = self._prepare_request('POST', '/servers/update') return self._check_response(response, expect_json=True) - def set_server_setting(self, setting: str, value: Union[str, int, bool], force: bool=False): + def set_server_setting(self, setting: str, value: Union[str, int, bool], force: bool=False) -> dict: data = {'value': value, 'force': force} response = self._prepare_request('POST', f'/servers/serverSettingsEdit/{setting}', data=data) return self._check_response(response, expect_json=True) - def get_server_setting(self, setting: str): + def get_server_setting(self, setting: str) -> dict: response = self._prepare_request('GET', f'/servers/getSetting/{setting}') return self._check_response(response, expect_json=True) - def server_settings(self): + def server_settings(self) -> dict: response = self._prepare_request('GET', f'/servers/serverSettings') return self._check_response(response, expect_json=True) - def restart_workers(self): + def restart_workers(self) -> dict: response = self._prepare_request('POST', f'/servers/restartWorkers') return self._check_response(response, expect_json=True) - def db_schema_diagnostic(self): + def db_schema_diagnostic(self) -> dict: response = self._prepare_request('GET', f'/servers/dbSchemaDiagnostic') return self._check_response(response, expect_json=True) - def toggle_global_pythonify(self): + def toggle_global_pythonify(self) -> None: self.global_pythonify = not self.global_pythonify # ## BEGIN Event ## - def events(self, pythonify: bool=False): - events = self._prepare_request('GET', 'events') - events = self._check_response(events, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in events: - return events + def events(self, pythonify: bool=False) -> List[Union[dict, MISPEvent]]: + r = self._prepare_request('GET', 'events') + events_r = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in events_r: + return events_r to_return = [] - for event in events: + for event in events_r: e = MISPEvent() e.from_dict(**event) to_return.append(e) return to_return - def get_event(self, event: Union[MISPEvent, int, str, UUID], deleted: [bool, int, list]=False, pythonify: bool=False): + def get_event(self, event: Union[MISPEvent, int, str, UUID], deleted: [bool, int, list]=False, pythonify: bool=False) -> Union[dict, MISPEvent]: '''Get an event from a MISP instance''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) if deleted: data = {'deleted': deleted} - event = self._prepare_request('POST', f'events/view/{event_id}', data=data) + r = self._prepare_request('POST', f'events/view/{event_id}', data=data) else: - event = self._prepare_request('GET', f'events/view/{event_id}') - event = self._check_response(event, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in event: - return event + r = self._prepare_request('GET', f'events/view/{event_id}') + event_r = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in event_r: + return event_r e = MISPEvent() - e.load(event) + e.load(event_r) return e - def add_event(self, event: MISPEvent, pythonify: bool=False): + def add_event(self, event: MISPEvent, pythonify: bool=False) -> Union[dict, MISPEvent]: '''Add a new event on a MISP instance''' - new_event = self._prepare_request('POST', 'events', data=event) - new_event = self._check_response(new_event, expect_json=True) + r = self._prepare_request('POST', 'events', data=event) + new_event = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in new_event: return new_event e = MISPEvent() e.load(new_event) return e - def update_event(self, event: MISPEvent, event_id: int=None, pythonify: bool=False): + def update_event(self, event: MISPEvent, event_id: Optional[int]=None, pythonify: bool=False) -> Union[dict, MISPEvent]: '''Update an event on a MISP instance''' if event_id is None: - event_id = self.__get_uuid_or_id_from_abstract_misp(event) + eid = self.__get_uuid_or_id_from_abstract_misp(event) else: - event_id = self.__get_uuid_or_id_from_abstract_misp(event_id) - updated_event = self._prepare_request('POST', f'events/{event_id}', data=event) - updated_event = self._check_response(updated_event, expect_json=True) + eid = self.__get_uuid_or_id_from_abstract_misp(event_id) + r = self._prepare_request('POST', f'events/{eid}', data=event) + updated_event = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in updated_event: return updated_event e = MISPEvent() e.load(updated_event) return e - def delete_event(self, event: Union[MISPEvent, int, str, UUID]): + def delete_event(self, event: Union[MISPEvent, int, str, UUID]) -> dict: '''Delete an event from a MISP instance''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) response = self._prepare_request('DELETE', f'events/delete/{event_id}') return self._check_response(response, expect_json=True) - def publish(self, event: Union[MISPEvent, int, str, UUID], alert: bool=False): + def publish(self, event: Union[MISPEvent, int, str, UUID], alert: bool=False) -> dict: """Publish the event with one single HTTP POST. The default is to not send a mail as it is assumed this method is called on update. """ @@ -253,7 +253,7 @@ class PyMISP: response = self._prepare_request('POST', f'events/publish/{event_id}') return self._check_response(response, expect_json=True) - def contact_event_reporter(self, event: Union[MISPEvent, int, str, UUID], message: str): + def contact_event_reporter(self, event: Union[MISPEvent, int, str, UUID], message: str) -> dict: """Send a message to the reporter of an event""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) to_post = {'message': message} @@ -264,59 +264,59 @@ class PyMISP: # ## BEGIN Object ### - def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool=False): + def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPObject]: '''Get an object from the remote MISP instance''' object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) - misp_object = self._prepare_request('GET', f'objects/view/{object_id}') - misp_object = self._check_response(misp_object, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in misp_object: - return misp_object - o = MISPObject(misp_object['Object']['name']) - o.from_dict(**misp_object) + r = self._prepare_request('GET', f'objects/view/{object_id}') + misp_object_r = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in misp_object_r: + return misp_object_r + o = MISPObject(misp_object_r['Object']['name']) + o.from_dict(**misp_object_r) return o - def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool=False): + def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool=False) -> Union[dict, MISPObject]: '''Add a MISP Object to an existing MISP event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) - new_object = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) - new_object = self._check_response(new_object, expect_json=True) + r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) + new_object = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in new_object: return new_object o = MISPObject(new_object['Object']['name']) o.from_dict(**new_object) return o - def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=False): + def update_object(self, misp_object: MISPObject, object_id: int=None, pythonify: bool=False) -> Union[dict, MISPObject]: '''Update an object on a MISP instance''' if object_id is None: - object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) + oid = self.__get_uuid_or_id_from_abstract_misp(misp_object) else: - object_id = self.__get_uuid_or_id_from_abstract_misp(object_id) - updated_object = self._prepare_request('POST', f'objects/edit/{object_id}', data=misp_object) - updated_object = self._check_response(updated_object, expect_json=True) + oid = self.__get_uuid_or_id_from_abstract_misp(object_id) + r = self._prepare_request('POST', f'objects/edit/{oid}', data=misp_object) + updated_object = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in updated_object: return updated_object o = MISPObject(updated_object['Object']['name']) o.from_dict(**updated_object) return o - def delete_object(self, misp_object: Union[MISPObject, int, str, UUID]): + def delete_object(self, misp_object: Union[MISPObject, int, str, UUID]) -> dict: '''Delete an object from a MISP instance''' object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) response = self._prepare_request('POST', f'objects/delete/{object_id}') return self._check_response(response, expect_json=True) - def add_object_reference(self, misp_object_reference: MISPObjectReference, pythonify: bool=False): + def add_object_reference(self, misp_object_reference: MISPObjectReference, pythonify: bool=False) -> Union[dict, MISPObjectReference]: """Add a reference to an object""" - object_reference = self._prepare_request('POST', 'object_references/add', misp_object_reference) - object_reference = self._check_response(object_reference, expect_json=True) + r = self._prepare_request('POST', 'object_references/add', misp_object_reference) + object_reference = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in object_reference: return object_reference - r = MISPObjectReference() - r.from_dict(**object_reference) - return r + ref = MISPObjectReference() + ref.from_dict(**object_reference) + return ref - def delete_object_reference(self, object_reference: Union[MISPObjectReference, int, str, UUID]): + def delete_object_reference(self, object_reference: Union[MISPObjectReference, int, str, UUID]) -> dict: """Delete a reference to an object""" object_reference_id = self.__get_uuid_or_id_from_abstract_misp(object_reference) response = self._prepare_request('POST', f'object_references/delete/{object_reference_id}') @@ -324,31 +324,31 @@ class PyMISP: # Object templates - def object_templates(self, pythonify: bool=False): + def object_templates(self, pythonify: bool=False) -> List[Union[dict, MISPObjectTemplate]]: """Get all the object templates.""" - object_templates = self._prepare_request('GET', 'objectTemplates') - object_templates = self._check_response(object_templates, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in object_templates: - return object_templates + r = self._prepare_request('GET', 'objectTemplates') + templates = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in templates: + return templates to_return = [] - for object_template in object_templates: + for object_template in templates: o = MISPObjectTemplate() o.from_dict(**object_template) to_return.append(o) return to_return - def get_object_template(self, object_template: Union[MISPObjectTemplate, int, str, UUID], pythonify: bool=False): + def get_object_template(self, object_template: Union[MISPObjectTemplate, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPObjectTemplate]: """Gets the full object template corresponting the UUID passed as parameter""" object_template_id = self.__get_uuid_or_id_from_abstract_misp(object_template) - object_template = self._prepare_request('GET', f'objectTemplates/view/{object_template_id}') - object_template = self._check_response(object_template, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in object_template: - return object_template + r = self._prepare_request('GET', f'objectTemplates/view/{object_template_id}') + object_template_r = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in object_template_r: + return object_template_r t = MISPObjectTemplate() - t.from_dict(**object_template) + t.from_dict(**object_template_r) return t - def update_object_templates(self): + def update_object_templates(self) -> dict: """Trigger an update of the object templates""" response = self._prepare_request('POST', 'objectTemplates/update') return self._check_response(response, expect_json=True) @@ -357,36 +357,36 @@ class PyMISP: # ## BEGIN Attribute ### - def attributes(self, pythonify: bool=False): - attributes = self._prepare_request('GET', f'attributes/index') - attributes = self._check_response(attributes, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in attributes: - return attributes + def attributes(self, pythonify: bool=False) -> List[Union[dict, MISPAttribute]]: + r = self._prepare_request('GET', f'attributes/index') + attributes_r = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in attributes_r: + return attributes_r to_return = [] - for attribute in attributes: + for attribute in attributes_r: a = MISPAttribute() a.from_dict(**attribute) to_return.append(a) return to_return - def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=False): + def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPAttribute]: '''Get an attribute from a MISP instance''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) - attribute = self._prepare_request('GET', f'attributes/view/{attribute_id}') - attribute = self._check_response(attribute, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in attribute: - return attribute + r = self._prepare_request('GET', f'attributes/view/{attribute_id}') + attribute_r = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in attribute_r: + return attribute_r a = MISPAttribute() - a.from_dict(**attribute) + a.from_dict(**attribute_r) return a - def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): + def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False) -> Union[dict, MISPAttribute, MISPShadowAttribute]: '''Add an attribute to an existing MISP event NOTE MISP 2.4.113+: you can pass a list of attributes. In that case, the pythonified response is the following: {'attributes': [MISPAttribute], 'errors': {errors by attributes}}''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) - new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute) - new_attribute = self._check_response(new_attribute, expect_json=True) + r = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute) + new_attribute = self._check_response(r, expect_json=True) if isinstance(attribute, list): # Multiple attributes were passed at once, the handling is totally different if not (self.global_pythonify or pythonify): @@ -412,34 +412,34 @@ class PyMISP: a.from_dict(**new_attribute) return a - def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=False): + def update_attribute(self, attribute: MISPAttribute, attribute_id: int=None, pythonify: bool=False) -> Union[dict, MISPAttribute, MISPShadowAttribute]: '''Update an attribute on a MISP instance''' if attribute_id is None: - attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) + aid = self.__get_uuid_or_id_from_abstract_misp(attribute) else: - attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute_id) - updated_attribute = self._prepare_request('POST', f'attributes/edit/{attribute_id}', data=attribute) - updated_attribute = self._check_response(updated_attribute, expect_json=True) + aid = self.__get_uuid_or_id_from_abstract_misp(attribute_id) + r = self._prepare_request('POST', f'attributes/edit/{aid}', data=attribute) + updated_attribute = self._check_response(r, expect_json=True) if 'errors' in updated_attribute: if (updated_attribute['errors'][0] == 403 and updated_attribute['errors'][1]['message'] == 'You do not have permission to do that.'): # At this point, we assume the user tried to update an attribute on an event they don't own # Re-try with a proposal - return self.update_attribute_proposal(attribute_id, attribute, pythonify) + return self.update_attribute_proposal(aid, attribute, pythonify) if not (self.global_pythonify or pythonify) or 'errors' in updated_attribute: return updated_attribute a = MISPAttribute() a.from_dict(**updated_attribute) return a - def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], hard: bool=False): + def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], hard: bool=False) -> dict: '''Delete an attribute from a MISP instance''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) data = {} if hard: data['hard'] = 1 - response = self._prepare_request('POST', f'attributes/delete/{attribute_id}', data=data) - response = self._check_response(response, expect_json=True) + r = self._prepare_request('POST', f'attributes/delete/{attribute_id}', data=data) + response = self._check_response(r, expect_json=True) if ('errors' in response and response['errors'][0] == 403 and response['errors'][1]['message'] == 'You do not have permission to do that.'): # FIXME: https://github.com/MISP/MISP/issues/4913 @@ -452,13 +452,13 @@ class PyMISP: # ## BEGIN Attribute Proposal ### - def attribute_proposals(self, event: Union[MISPEvent, int, str, UUID]=None, pythonify: bool=False): + def attribute_proposals(self, event: Union[MISPEvent, int, str, UUID]=None, pythonify: bool=False) -> List[Union[dict, MISPShadowAttribute]]: if event: event_id = self.__get_uuid_or_id_from_abstract_misp(event) - attribute_proposals = self._prepare_request('GET', f'shadow_attributes/index/{event_id}') + r = self._prepare_request('GET', f'shadow_attributes/index/{event_id}') else: - attribute_proposals = self._prepare_request('GET', f'shadow_attributes') - attribute_proposals = self._check_response(attribute_proposals, expect_json=True) + r = self._prepare_request('GET', f'shadow_attributes') + attribute_proposals = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposals: return attribute_proposals to_return = [] @@ -468,10 +468,10 @@ class PyMISP: to_return.append(a) return to_return - def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=False): + def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPShadowAttribute]: proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) - attribute_proposal = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}') - attribute_proposal = self._check_response(attribute_proposal, expect_json=True) + r = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}') + attribute_proposal = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposal: return attribute_proposal a = MISPShadowAttribute() @@ -480,29 +480,29 @@ class PyMISP: # NOTE: the tree following method have a very specific meaning, look at the comments - def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): + def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False) -> Union[dict, MISPShadowAttribute]: '''Propose a new attribute in an event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) - new_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute) - new_attribute_proposal = self._check_response(new_attribute_proposal, expect_json=True) + r = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute) + new_attribute_proposal = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in new_attribute_proposal: return new_attribute_proposal a = MISPShadowAttribute() a.from_dict(**new_attribute_proposal) return a - def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): + def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False) -> Union[dict, MISPShadowAttribute]: '''Propose a change for an attribute''' initial_attribute_id = self.__get_uuid_or_id_from_abstract_misp(initial_attribute) - update_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute) - update_attribute_proposal = self._check_response(update_attribute_proposal, expect_json=True) + r = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute) + update_attribute_proposal = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in update_attribute_proposal: return update_attribute_proposal a = MISPShadowAttribute() a.from_dict(**update_attribute_proposal) return a - def delete_attribute_proposal(self, attribute: Union[MISPAttribute, int, str, UUID]): + def delete_attribute_proposal(self, attribute: Union[MISPAttribute, int, str, UUID]) -> dict: '''Propose the deletion of an attribute''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) response = self._prepare_request('POST', f'shadow_attributes/delete/{attribute_id}') @@ -510,13 +510,13 @@ class PyMISP: # NOTE: You cannot modify an existing proposal, only accept/discard - def accept_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID]): + def accept_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID]) -> dict: '''Accept a proposal''' proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) response = self._prepare_request('POST', f'shadow_attributes/accept/{proposal_id}') return self._check_response(response, expect_json=True) - def discard_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID]): + def discard_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID]) -> dict: '''Discard a proposal''' proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) response = self._prepare_request('POST', f'shadow_attributes/discard/{proposal_id}') @@ -526,7 +526,7 @@ class PyMISP: # ## BEGIN Sighting ### - def sightings(self, misp_entity: AbstractMISP=None, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False): + def sightings(self, misp_entity: AbstractMISP=None, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False) -> List[Union[dict, MISPSighting]]: """Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)""" if isinstance(misp_entity, MISPEvent): context = 'event' @@ -559,22 +559,22 @@ class PyMISP: to_return.append(s) return to_return - def add_sighting(self, sighting: MISPSighting, attribute: Union[MISPAttribute, int, str, UUID]=None, pythonify: bool=False): + def add_sighting(self, sighting: MISPSighting, attribute: Union[MISPAttribute, int, str, UUID]=None, pythonify: bool=False) -> Union[dict, MISPSighting]: '''Add a new sighting (globally, or to a specific attribute)''' if attribute: attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) - new_sighting = self._prepare_request('POST', f'sightings/add/{attribute_id}', data=sighting) + r = self._prepare_request('POST', f'sightings/add/{attribute_id}', data=sighting) else: # Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value - new_sighting = self._prepare_request('POST', f'sightings/add', data=sighting) - new_sighting = self._check_response(new_sighting, expect_json=True) + r = self._prepare_request('POST', f'sightings/add', data=sighting) + new_sighting = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in new_sighting: return new_sighting s = MISPSighting() s.from_dict(**new_sighting) return s - def delete_sighting(self, sighting: Union[MISPSighting, int, str, UUID]): + def delete_sighting(self, sighting: Union[MISPSighting, int, str, UUID]) -> dict: '''Delete a sighting from a MISP instance''' sighting_id = self.__get_uuid_or_id_from_abstract_misp(sighting) response = self._prepare_request('POST', f'sightings/delete/{sighting_id}') @@ -584,10 +584,10 @@ class PyMISP: # ## BEGIN Tags ### - def tags(self, pythonify: bool=False): + def tags(self, pythonify: bool=False) -> List[Union[dict, MISPTag]]: """Get the list of existing tags.""" - tags = self._prepare_request('GET', 'tags') - tags = self._check_response(tags, expect_json=True) + r = self._prepare_request('GET', 'tags') + tags = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in tags: return tags['Tag'] to_return = [] @@ -597,42 +597,42 @@ class PyMISP: to_return.append(t) return to_return - def get_tag(self, tag: Union[MISPTag, int, str, UUID], pythonify: bool=False): + def get_tag(self, tag: Union[MISPTag, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPTag]: """Get a tag by id.""" tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) - tag = self._prepare_request('GET', f'tags/view/{tag_id}') - tag = self._check_response(tag, expect_json=True) + r = self._prepare_request('GET', f'tags/view/{tag_id}') + tag = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in tag: return tag t = MISPTag() t.from_dict(**tag) return t - def add_tag(self, tag: MISPTag, pythonify: bool=False): + def add_tag(self, tag: MISPTag, pythonify: bool=False) -> Union[dict, MISPTag]: '''Add a new tag on a MISP instance Notes: * The user calling this method needs the Tag Editor permission * It doesn't add a tag to an event, simply create it on a MISP instance. ''' - new_tag = self._prepare_request('POST', 'tags/add', data=tag) - new_tag = self._check_response(new_tag, expect_json=True) + r = self._prepare_request('POST', 'tags/add', data=tag) + new_tag = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in new_tag: return new_tag t = MISPTag() t.from_dict(**new_tag) return t - def enable_tag(self, tag: MISPTag, pythonify: bool=False): + def enable_tag(self, tag: MISPTag, pythonify: bool=False) -> Union[dict, MISPTag]: """Enable a tag.""" tag.hide_tag = False return self.update_tag(tag, pythonify=pythonify) - def disable_tag(self, tag: MISPTag, pythonify: bool=False): + def disable_tag(self, tag: MISPTag, pythonify: bool=False) -> Union[dict, MISPTag]: """Disable a tag.""" tag.hide_tag = True return self.update_tag(tag, pythonify=pythonify) - def update_tag(self, tag: MISPTag, tag_id: int=None, pythonify: bool=False): + def update_tag(self, tag: MISPTag, tag_id: int=None, pythonify: bool=False) -> Union[dict, MISPTag]: """Edit only the provided parameters of a tag.""" if tag_id is None: tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) @@ -646,7 +646,7 @@ class PyMISP: t.from_dict(**updated_tag) return t - def delete_tag(self, tag: Union[MISPTag, int, str, UUID]): + def delete_tag(self, tag: Union[MISPTag, int, str, UUID]) -> dict: '''Delete an attribute from a MISP instance''' tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) response = self._prepare_request('POST', f'tags/delete/{tag_id}') @@ -656,10 +656,10 @@ class PyMISP: # ## BEGIN Taxonomies ### - def taxonomies(self, pythonify: bool=False): + def taxonomies(self, pythonify: bool=False) -> List[Union[dict, MISPTaxonomy]]: """Get all the taxonomies.""" - taxonomies = self._prepare_request('GET', 'taxonomies') - taxonomies = self._check_response(taxonomies, expect_json=True) + r = self._prepare_request('GET', 'taxonomies') + taxonomies = self._check_response(r, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in taxonomies: return taxonomies to_return = [] @@ -669,37 +669,37 @@ class PyMISP: to_return.append(t) return to_return - def get_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID], pythonify: bool=False): + def get_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPTaxonomy]: """Get a taxonomy from a MISP instance.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) - taxonomy = self._prepare_request('GET', f'taxonomies/view/{taxonomy_id}') - taxonomy = self._check_response(taxonomy, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in taxonomy: - return taxonomy + r = self._prepare_request('GET', f'taxonomies/view/{taxonomy_id}') + taxonomy_r = self._check_response(r, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in taxonomy_r: + return taxonomy_r t = MISPTaxonomy() - t.from_dict(**taxonomy) + t.from_dict(**taxonomy_r) return t - def enable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + def enable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> dict: """Enable a taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) response = self._prepare_request('POST', f'taxonomies/enable/{taxonomy_id}') return self._check_response(response, expect_json=True) - def disable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + def disable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> dict: """Disable a taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) self.disable_taxonomy_tags(taxonomy_id) response = self._prepare_request('POST', f'taxonomies/disable/{taxonomy_id}') return self._check_response(response, expect_json=True) - def disable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + def disable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> dict: """Disable all the tags of a taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) response = self._prepare_request('POST', f'taxonomies/disableTag/{taxonomy_id}') return self._check_response(response, expect_json=True) - def enable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]): + def enable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> dict: """Enable all the tags of a taxonomy. NOTE: this automatically done when you call enable_taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) @@ -710,7 +710,7 @@ class PyMISP: response = self._prepare_request('POST', url) return self._check_response(response, expect_json=True) - def update_taxonomies(self): + def update_taxonomies(self) -> dict: """Update all the taxonomies.""" response = self._prepare_request('POST', 'taxonomies/update') return self._check_response(response, expect_json=True) @@ -719,7 +719,7 @@ class PyMISP: # ## BEGIN Warninglists ### - def warninglists(self, pythonify: bool=False): + def warninglists(self, pythonify: bool=False) -> List[Union[dict, MISPWarninglist]]: """Get all the warninglists.""" warninglists = self._prepare_request('GET', 'warninglists') warninglists = self._check_response(warninglists, expect_json=True) @@ -732,7 +732,7 @@ class PyMISP: to_return.append(w) return to_return - def get_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID], pythonify: bool=False): + def get_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPWarninglist]: """Get a warninglist.""" warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) warninglist = self._prepare_request('GET', f'warninglists/view/{warninglist_id}') @@ -743,8 +743,7 @@ class PyMISP: w.from_dict(**warninglist) return w - def toggle_warninglist(self, warninglist_id: List[int]=None, warninglist_name: List[str]=None, - force_enable: bool=False): + def toggle_warninglist(self, warninglist_id: List[int]=None, warninglist_name: List[str]=None, force_enable: bool=False) -> dict: '''Toggle (enable/disable) the status of a warninglist by ID. :param warninglist_id: ID of the WarningList :param force_enable: Force the warning list in the enabled state (does nothing is already enabled) @@ -765,22 +764,22 @@ class PyMISP: response = self._prepare_request('POST', 'warninglists/toggleEnable', data=json.dumps(query)) return self._check_response(response, expect_json=True) - def enable_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID]): + def enable_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID]) -> dict: """Enable a warninglist.""" warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=True) - def disable_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID]): + def disable_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID]) -> dict: """Disable a warninglist.""" warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False) - def values_in_warninglist(self, value: list): + def values_in_warninglist(self, value: list) -> dict: """Check if IOC values are in warninglist""" response = self._prepare_request('POST', 'warninglists/checkValue', data=json.dumps(value)) return self._check_response(response, expect_json=True) - def update_warninglists(self): + def update_warninglists(self) -> dict: """Update all the warninglists.""" response = self._prepare_request('POST', 'warninglists/update') return self._check_response(response, expect_json=True) @@ -789,7 +788,7 @@ class PyMISP: # ## BEGIN Noticelist ### - def noticelists(self, pythonify: bool=False): + def noticelists(self, pythonify: bool=False) -> List[Union[dict, MISPNoticelist]]: """Get all the noticelists.""" noticelists = self._prepare_request('GET', 'noticelists') noticelists = self._check_response(noticelists, expect_json=True) @@ -802,7 +801,7 @@ class PyMISP: to_return.append(n) return to_return - def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify: bool=False): + def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPNoticelist]: """Get a noticelist by id.""" noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) noticelist = self._prepare_request('GET', f'noticelists/view/{noticelist_id}') @@ -813,7 +812,7 @@ class PyMISP: n.from_dict(**noticelist) return n - def enable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]): + def enable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]) -> dict: """Enable a noticelist by id.""" # FIXME: https://github.com/MISP/MISP/issues/4856 # response = self._prepare_request('POST', f'noticelists/enable/{noticelist_id}') @@ -821,7 +820,7 @@ class PyMISP: response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}/true') return self._check_response(response, expect_json=True) - def disable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]): + def disable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]) -> dict: """Disable a noticelist by id.""" # FIXME: https://github.com/MISP/MISP/issues/4856 # response = self._prepare_request('POST', f'noticelists/disable/{noticelist_id}') @@ -829,7 +828,7 @@ class PyMISP: response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}') return self._check_response(response, expect_json=True) - def update_noticelists(self): + def update_noticelists(self) -> dict: """Update all the noticelists.""" response = self._prepare_request('POST', 'noticelists/update') return self._check_response(response, expect_json=True) @@ -838,7 +837,7 @@ class PyMISP: # ## BEGIN Galaxy ### - def galaxies(self, pythonify: bool=False): + def galaxies(self, pythonify: bool=False) -> List[Union[dict, MISPGalaxy]]: """Get all the galaxies.""" galaxies = self._prepare_request('GET', 'galaxies') galaxies = self._check_response(galaxies, expect_json=True) @@ -851,7 +850,7 @@ class PyMISP: to_return.append(g) return to_return - def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool=False): + def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPGalaxy]: """Get a galaxy by id.""" galaxy_id = self.__get_uuid_or_id_from_abstract_misp(galaxy) galaxy = self._prepare_request('GET', f'galaxies/view/{galaxy_id}') @@ -862,7 +861,7 @@ class PyMISP: g.from_dict(**galaxy) return g - def update_galaxies(self): + def update_galaxies(self) -> dict: """Update all the galaxies.""" response = self._prepare_request('POST', 'galaxies/update') return self._check_response(response, expect_json=True) @@ -871,7 +870,7 @@ class PyMISP: # ## BEGIN Feed ### - def feeds(self, pythonify: bool=False): + def feeds(self, pythonify: bool=False) -> List[Union[dict, MISPFeed]]: """Get the list of existing feeds.""" feeds = self._prepare_request('GET', 'feeds') feeds = self._check_response(feeds, expect_json=True) @@ -884,7 +883,7 @@ class PyMISP: to_return.append(f) return to_return - def get_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + def get_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: """Get a feed by id.""" feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) feed = self._prepare_request('GET', f'feeds/view/{feed_id}') @@ -895,7 +894,7 @@ class PyMISP: f.from_dict(**feed) return f - def add_feed(self, feed: MISPFeed, pythonify: bool=False): + def add_feed(self, feed: MISPFeed, pythonify: bool=False) -> Union[dict, MISPFeed]: '''Add a new feed on a MISP instance''' # FIXME: https://github.com/MISP/MISP/issues/4834 feed = {'Feed': feed} @@ -907,7 +906,7 @@ class PyMISP: f.from_dict(**new_feed) return f - def enable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + def enable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: '''Enable a feed (fetching it will create event(s)''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID @@ -916,7 +915,7 @@ class PyMISP: feed.enabled = True return self.update_feed(feed=feed, pythonify=pythonify) - def disable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + def disable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: '''Disable a feed''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID @@ -925,7 +924,7 @@ class PyMISP: feed.enabled = False return self.update_feed(feed=feed, pythonify=pythonify) - def enable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + def enable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: '''Enable the caching of a feed''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID @@ -934,7 +933,7 @@ class PyMISP: feed.caching_enabled = True return self.update_feed(feed=feed, pythonify=pythonify) - def disable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False): + def disable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: '''Disable the caching of a feed''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID @@ -943,7 +942,7 @@ class PyMISP: feed.caching_enabled = False return self.update_feed(feed=feed, pythonify=pythonify) - def update_feed(self, feed: MISPFeed, feed_id: int=None, pythonify: bool=False): + def update_feed(self, feed: MISPFeed, feed_id: int=None, pythonify: bool=False) -> Union[dict, MISPFeed]: '''Update a feed on a MISP instance''' if feed_id is None: feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) @@ -959,40 +958,40 @@ class PyMISP: f.from_dict(**updated_feed) return f - def delete_feed(self, feed: Union[MISPFeed, int, str, UUID]): + def delete_feed(self, feed: Union[MISPFeed, int, str, UUID]) -> dict: '''Delete a feed from a MISP instance''' feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('POST', f'feeds/delete/{feed_id}') return self._check_response(response, expect_json=True) - def fetch_feed(self, feed: Union[MISPFeed, int, str, UUID]): + def fetch_feed(self, feed: Union[MISPFeed, int, str, UUID]) -> dict: """Fetch one single feed""" feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('GET', f'feeds/fetchFromFeed/{feed_id}') return self._check_response(response) - def cache_all_feeds(self): + def cache_all_feeds(self) -> dict: """ Cache all the feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/all') return self._check_response(response) - def cache_feed(self, feed: Union[MISPFeed, int, str, UUID]): + def cache_feed(self, feed: Union[MISPFeed, int, str, UUID]) -> dict: """Cache a specific feed""" feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('GET', f'feeds/cacheFeeds/{feed_id}') return self._check_response(response) - def cache_freetext_feeds(self): + def cache_freetext_feeds(self) -> dict: """Cache all the freetext feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/freetext') return self._check_response(response) - def cache_misp_feeds(self): + def cache_misp_feeds(self) -> dict: """Cache all the MISP feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/misp') return self._check_response(response) - def compare_feeds(self): + def compare_feeds(self) -> dict: """Generate the comparison matrix for all the MISP feeds""" response = self._prepare_request('GET', 'feeds/compareFeeds') return self._check_response(response) @@ -1001,7 +1000,7 @@ class PyMISP: # ## BEGIN Server ### - def servers(self, pythonify: bool=False): + def servers(self, pythonify: bool=False) -> List[Union[dict, MISPServer]]: """Get the existing servers the MISP instance can synchronise with""" servers = self._prepare_request('GET', 'servers') servers = self._check_response(servers, expect_json=True) @@ -1014,7 +1013,7 @@ class PyMISP: to_return.append(s) return to_return - def get_sync_config(self, pythonify: bool=False): + def get_sync_config(self, pythonify: bool=False) -> Union[dict, MISPServer]: '''WARNING: This method only works if the user calling it is a sync user''' server = self._prepare_request('GET', 'servers/createSync') server = self._check_response(server, expect_json=True) @@ -1024,7 +1023,7 @@ class PyMISP: s.from_dict(**server) return s - def import_server(self, server: MISPServer, pythonify: bool=False): + def import_server(self, server: MISPServer, pythonify: bool=False) -> Union[dict, MISPServer]: """Import a sync server config received from get_sync_config""" server = self._prepare_request('POST', f'servers/import', data=server) server = self._check_response(server, expect_json=True) @@ -1034,7 +1033,7 @@ class PyMISP: s.from_dict(**server) return s - def add_server(self, server: MISPServer, pythonify: bool=False): + def add_server(self, server: MISPServer, pythonify: bool=False) -> Union[dict, MISPServer]: """Add a server to synchronise with. Note: You probably fant to use ExpandedPyMISP.get_sync_config and ExpandedPyMISP.import_server instead""" server = self._prepare_request('POST', f'servers/add', data=server) @@ -1045,7 +1044,7 @@ class PyMISP: s.from_dict(**server) return s - def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=False): + def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=False) -> Union[dict, MISPServer]: '''Update a server to synchronise with''' if server_id is None: server_id = self.__get_uuid_or_id_from_abstract_misp(server) @@ -1059,13 +1058,13 @@ class PyMISP: s.from_dict(**updated_server) return s - def delete_server(self, server: Union[MISPServer, int, str, UUID]): + def delete_server(self, server: Union[MISPServer, int, str, UUID]) -> dict: '''Delete a sync server''' server_id = self.__get_uuid_or_id_from_abstract_misp(server) response = self._prepare_request('POST', f'servers/delete/{server_id}') return self._check_response(response, expect_json=True) - def server_pull(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None): + def server_pull(self, server: Union[MISPServer, int, str, UUID], event: Optional[Union[MISPEvent, int, str, UUID]]=None) -> dict: '''Initialize a pull from a sync server''' server_id = self.__get_uuid_or_id_from_abstract_misp(server) if event: @@ -1077,7 +1076,7 @@ class PyMISP: # FIXME: can we pythonify? return self._check_response(response) - def server_push(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None): + def server_push(self, server: Union[MISPServer, int, str, UUID], event: Optional[Union[MISPEvent, int, str, UUID]]=None) -> dict: '''Initialize a push to a sync server''' server_id = self.__get_uuid_or_id_from_abstract_misp(server) if event: @@ -1089,7 +1088,7 @@ class PyMISP: # FIXME: can we pythonify? return self._check_response(response) - def test_server(self, server: Union[MISPServer, int, str, UUID]): + def test_server(self, server: Union[MISPServer, int, str, UUID]) -> dict: server_id = self.__get_uuid_or_id_from_abstract_misp(server) response = self._prepare_request('POST', f'servers/testConnection/{server_id}') return self._check_response(response, expect_json=True) @@ -1098,7 +1097,7 @@ class PyMISP: # ## BEGIN Sharing group ### - def sharing_groups(self, pythonify: bool=False): + def sharing_groups(self, pythonify: bool=False) -> List[Union[dict, MISPSharingGroup]]: """Get the existing sharing groups""" sharing_groups = self._prepare_request('GET', 'sharing_groups') sharing_groups = self._check_response(sharing_groups, expect_json=True) @@ -1111,7 +1110,7 @@ class PyMISP: to_return.append(s) return to_return - def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=False): + def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=False) -> Union[dict, MISPSharingGroup]: """Add a new sharing group""" sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group) sharing_group = self._check_response(sharing_group, expect_json=True) @@ -1121,14 +1120,14 @@ class PyMISP: s.from_dict(**sharing_group) return s - def delete_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID]): + def delete_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID]) -> dict: """Delete a sharing group""" sharing_group_id = self.__get_uuid_or_id_from_abstract_misp(sharing_group) response = self._prepare_request('POST', f'sharing_groups/delete/{sharing_group_id}') return self._check_response(response, expect_json=True) def add_org_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], - organisation: Union[MISPOrganisation, int, str, UUID], extend: bool=False): + organisation: Union[MISPOrganisation, int, str, UUID], extend: bool=False) -> dict: '''Add an organisation to a sharing group. :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID :organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance @@ -1141,7 +1140,7 @@ class PyMISP: return self._check_response(response) def remove_org_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], - organisation: Union[MISPOrganisation, int, str, UUID]): + organisation: Union[MISPOrganisation, int, str, UUID]) -> dict: '''Remove an organisation from a sharing group. :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID :organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance @@ -1153,7 +1152,7 @@ class PyMISP: return self._check_response(response) def add_server_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], - server: Union[MISPServer, int, str, UUID], all_orgs: bool=False): + server: Union[MISPServer, int, str, UUID], all_orgs: bool=False) -> dict: '''Add a server to a sharing group. :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID :server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance @@ -1166,7 +1165,7 @@ class PyMISP: return self._check_response(response) def remove_server_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], - server: Union[MISPServer, int, str, UUID]): + server: Union[MISPServer, int, str, UUID]) -> dict: '''Remove a server from a sharing group. :sharing_group: Sharing group's local instance ID, or Sharing group's global UUID :server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance @@ -1181,7 +1180,7 @@ class PyMISP: # ## BEGIN Organisation ### - def organisations(self, scope="local", pythonify: bool=False): + def organisations(self, scope="local", pythonify: bool=False) -> List[Union[dict, MISPOrganisation]]: """Get all the organisations.""" organisations = self._prepare_request('GET', f'organisations/index/scope:{scope}') organisations = self._check_response(organisations, expect_json=True) @@ -1194,7 +1193,7 @@ class PyMISP: to_return.append(o) return to_return - def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=False): + def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPOrganisation]: '''Get an organisation.''' organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) organisation = self._prepare_request('GET', f'organisations/view/{organisation_id}') @@ -1205,7 +1204,7 @@ class PyMISP: o.from_dict(**organisation) return o - def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=False): + def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=False) -> Union[dict, MISPOrganisation]: '''Add an organisation''' new_organisation = self._prepare_request('POST', f'admin/organisations/add', data=organisation) new_organisation = self._check_response(new_organisation, expect_json=True) @@ -1215,7 +1214,7 @@ class PyMISP: o.from_dict(**new_organisation) return o - def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=False): + def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=False) -> Union[dict, MISPOrganisation]: '''Update an organisation''' if organisation_id is None: organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) @@ -1229,7 +1228,7 @@ class PyMISP: o.from_dict(**organisation) return o - def delete_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID]): + def delete_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID]) -> dict: '''Delete an organisation''' # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) @@ -1240,7 +1239,7 @@ class PyMISP: # ## BEGIN User ### - def users(self, pythonify: bool=False): + def users(self, pythonify: bool=False) -> List[Union[dict, MISPUser]]: """Get all the users.""" users = self._prepare_request('GET', 'admin/users') users = self._check_response(users, expect_json=True) @@ -1253,7 +1252,7 @@ class PyMISP: to_return.append(u) return to_return - def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False, expanded: bool=False): + def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False, expanded: bool=False) -> Union[dict, MISPUser]: '''Get a user. `me` means the owner of the API key doing the query. expanded also returns a MISPRole and a MISPUserSetting''' user_id = self.__get_uuid_or_id_from_abstract_misp(user) @@ -1276,7 +1275,7 @@ class PyMISP: usersettings.append(us) return u, r, usersettings - def add_user(self, user: MISPUser, pythonify: bool=False): + def add_user(self, user: MISPUser, pythonify: bool=False) -> Union[dict, MISPUser]: '''Add a new user''' user = self._prepare_request('POST', f'admin/users/add', data=user) user = self._check_response(user, expect_json=True) @@ -1286,7 +1285,7 @@ class PyMISP: u.from_dict(**user) return u - def update_user(self, user: MISPUser, user_id: int=None, pythonify: bool=False): + def update_user(self, user: MISPUser, user_id: int=None, pythonify: bool=False) -> Union[dict, MISPUser]: '''Update an event on a MISP instance''' if user_id is None: user_id = self.__get_uuid_or_id_from_abstract_misp(user) @@ -1303,14 +1302,14 @@ class PyMISP: e.from_dict(**updated_user) return e - def delete_user(self, user: Union[MISPUser, int, str, UUID]): + def delete_user(self, user: Union[MISPUser, int, str, UUID]) -> dict: '''Delete a user''' # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE user_id = self.__get_uuid_or_id_from_abstract_misp(user) response = self._prepare_request('POST', f'admin/users/delete/{user_id}') return self._check_response(response, expect_json=True) - def change_user_password(self, new_password: str, user: Union[MISPUser, int, str, UUID]=None): + def change_user_password(self, new_password: str, user: Optional[Union[MISPUser, int, str, UUID]]=None) -> dict: response = self._prepare_request('POST', f'users/change_pw', data={'password': new_password}) return self._check_response(response, expect_json=True) @@ -1318,7 +1317,7 @@ class PyMISP: # ## BEGIN Role ### - def roles(self, pythonify: bool=False): + def roles(self, pythonify: bool=False) -> List[Union[dict, MISPRole]]: """Get the existing roles""" roles = self._prepare_request('GET', 'roles') roles = self._check_response(roles, expect_json=True) @@ -1331,7 +1330,7 @@ class PyMISP: to_return.append(r) return to_return - def set_default_role(self, role: Union[MISPRole, int, str, UUID]): + def set_default_role(self, role: Union[MISPRole, int, str, UUID]) -> dict: role_id = self.__get_uuid_or_id_from_abstract_misp(role) url = urljoin(self.root_url, f'/admin/roles/set_default/{role_id}') response = self._prepare_request('POST', url) @@ -1373,7 +1372,7 @@ class PyMISP: include_sightings: Optional[bool]=None, includeSightings: Optional[bool]=None, include_correlations: Optional[bool]=None, includeCorrelations: Optional[bool]=None, pythonify: Optional[bool]=False, - **kwargs): + **kwargs) -> List[Union[dict, MISPEvent, MISPAttribute]]: '''Search in the MISP instance :param return_format: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload. @@ -1565,7 +1564,7 @@ class PyMISP: analysis: Optional[List[SearchType]]=None, org: Optional[SearchParameterTypes]=None, timestamp: Optional[DateInterval]=None, - pythonify: Optional[bool]=None): + pythonify: Optional[bool]=None) -> List[Union[dict, MISPEvent]]: """Search only at the index level. Using ! in front of a value means NOT (default is OR) :param published: Set whether published or unpublished events should be returned. Do not set the parameter if you want both. @@ -1620,7 +1619,7 @@ class PyMISP: include_attribute: Optional[bool]=None, include_event_meta: Optional[bool]=None, pythonify: Optional[bool]=False - ): + ) -> List[Union[dict, MISPSighting]]: '''Search sightings :param context: The context of the search. Can be either "attribute", "event", or nothing (will then match on events and attributes). @@ -1699,7 +1698,7 @@ class PyMISP: action: Optional[str]=None, user_id: Optional[int]=None, change: Optional[str]=None, email: Optional[str]=None, org: Optional[str]=None, description: Optional[str]=None, - ip: Optional[str]=None, pythonify: Optional[bool]=False): + ip: Optional[str]=None, pythonify: Optional[bool]=False) -> List[Union[dict, MISPLog]]: '''Search in logs Note: to run substring queries simply append/prepend/encapsulate the search term with % @@ -1737,7 +1736,7 @@ class PyMISP: to_return.append(ml) return to_return - def search_feeds(self, value: Optional[SearchParameterTypes]=None, pythonify: Optional[bool]=False): + def search_feeds(self, value: Optional[SearchParameterTypes]=None, pythonify: Optional[bool]=False) -> List[Union[dict, MISPFeed]]: '''Search in the feeds cached on the servers''' response = self._prepare_request('POST', '/feeds/searchCaches', data={'value': value}) normalized_response = self._check_response(response, expect_json=True) @@ -1754,7 +1753,7 @@ class PyMISP: # ## BEGIN Communities ### - def communities(self, pythonify: bool=False): + def communities(self, pythonify: bool=False) -> List[Union[dict, MISPCommunity]]: """Get all the communities.""" communities = self._prepare_request('GET', 'communities') communities = self._check_response(communities, expect_json=True) @@ -1767,7 +1766,7 @@ class PyMISP: to_return.append(c) return to_return - def get_community(self, community: Union[MISPCommunity, int, str, UUID], pythonify: bool=False): + def get_community(self, community: Union[MISPCommunity, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPCommunity]: '''Get an community from a MISP instance''' community_id = self.__get_uuid_or_id_from_abstract_misp(community) community = self._prepare_request('GET', f'communities/view/{community_id}') @@ -1786,7 +1785,7 @@ class PyMISP: requestor_organisation_description: str=None, message: str=None, sync: bool=False, anonymise_requestor_server: bool=False, - mock: bool=False): + mock: bool=False) -> dict: community_id = self.__get_uuid_or_id_from_abstract_misp(community) to_post = {'org_name': requestor_organisation_name, 'org_uuid': requestor_organisation_uuid, @@ -1801,7 +1800,7 @@ class PyMISP: # ## BEGIN Event Delegation ### - def event_delegations(self, pythonify: bool=False): + def event_delegations(self, pythonify: bool=False) -> List[Union[dict, MISPEventDelegation]]: """Get all the event delegations.""" delegations = self._prepare_request('GET', 'event_delegations') delegations = self._check_response(delegations, expect_json=True) @@ -1814,20 +1813,20 @@ class PyMISP: to_return.append(d) return to_return - def accept_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False): + def accept_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False) -> dict: delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation) delegation = self._prepare_request('POST', f'event_delegations/acceptDelegation/{delegation_id}') return self._check_response(delegation, expect_json=True) - def discard_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False): + def discard_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False) -> dict: delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation) delegation = self._prepare_request('POST', f'event_delegations/deleteDelegation/{delegation_id}') return self._check_response(delegation, expect_json=True) - def delegate_event(self, event: Union[MISPEvent, int, str, UUID]=None, - organisation: Union[MISPOrganisation, int, str, UUID]=None, - event_delegation: MISPEventDelegation=None, - distribution: int=-1, message: str='', pythonify: bool=False): + def delegate_event(self, event: Optional[Union[MISPEvent, int, str, UUID]]=None, + organisation: Optional[Union[MISPOrganisation, int, str, UUID]]=None, + event_delegation: Optional[MISPEventDelegation]=None, + distribution: int=-1, message: str='', pythonify: bool=False) -> Union[dict, MISPEventDelegation]: '''Note: distribution == -1 means recipient decides''' if event and organisation: event_id = self.__get_uuid_or_id_from_abstract_misp(event) @@ -1849,13 +1848,13 @@ class PyMISP: # ## BEGIN Others ### - def push_event_to_ZMQ(self, event: Union[MISPEvent, int, str, UUID]): + def push_event_to_ZMQ(self, event: Union[MISPEvent, int, str, UUID]) -> dict: """Force push an event on ZMQ""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) response = self._prepare_request('POST', f'events/pushEventToZMQ/{event_id}.json') return self._check_response(response, expect_json=True) - def direct_call(self, url: str, data: dict=None, params: dict={}, kw_params: dict={}): + def direct_call(self, url: str, data: Optional[dict]=None, params: dict={}, kw_params: dict={}) -> dict: '''Very lightweight call that posts a data blob (python dictionary or json string) on the URL''' if data is None: response = self._prepare_request('GET', url, params=params, kw_params=kw_params) @@ -1864,7 +1863,7 @@ class PyMISP: return self._check_response(response, lenient_response_type=True) def freetext(self, event: Union[MISPEvent, int, str, UUID], string: str, adhereToWarninglists: Union[bool, str]=False, - distribution: int=None, returnMetaAttributes: bool=False, pythonify: bool=False, **kwargs): + distribution: Optional[int]=None, returnMetaAttributes: bool=False, pythonify: bool=False, **kwargs) -> List[Union[dict, MISPAttribute]]: """Pass a text to the freetext importer""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) query = {"value": string} @@ -1888,7 +1887,7 @@ class PyMISP: to_return.append(a) return to_return - def upload_stix(self, path, version: str='2'): + def upload_stix(self, path, version: str='2') -> dict: """Upload a STIX file to MISP. :param path: Path to the STIX on the disk (can be a path-like object, or a pseudofile) :param version: Can be 1 or 2 @@ -1915,7 +1914,7 @@ class PyMISP: # ## BEGIN Statistics ### - def attributes_statistics(self, context: str='type', percentage: bool=False): + def attributes_statistics(self, context: str='type', percentage: bool=False) -> dict: """Get attributes statistics from the MISP instance.""" # FIXME: https://github.com/MISP/MISP/issues/4874 if context not in ['type', 'category']: @@ -1927,7 +1926,7 @@ class PyMISP: response = self._prepare_request('GET', path) return self._check_response(response, expect_json=True) - def tags_statistics(self, percentage: bool=False, name_sort: bool=False): + def tags_statistics(self, percentage: bool=False, name_sort: bool=False) -> dict: """Get tags statistics from the MISP instance""" # FIXME: https://github.com/MISP/MISP/issues/4874 # NOTE: https://github.com/MISP/MISP/issues/4879 @@ -1942,7 +1941,7 @@ class PyMISP: response = self._prepare_request('GET', f'tags/tagStatistics/{percentage}/{name_sort}') return self._check_response(response) - def users_statistics(self, context: str='data'): + def users_statistics(self, context: str='data') -> dict: """Get users statistics from the MISP instance""" availables_contexts = ['data', 'orgs', 'users', 'tags', 'attributehistogram', 'sightings', 'galaxyMatrix'] if context not in availables_contexts: @@ -1954,7 +1953,7 @@ class PyMISP: # ## BEGIN User Settings ### - def user_settings(self, pythonify: bool=False): + def user_settings(self, pythonify: bool=False) -> List[Union[dict, MISPUserSetting]]: """Get all the user settings.""" user_settings = self._prepare_request('GET', 'user_settings') user_settings = self._check_response(user_settings, expect_json=True) @@ -1967,7 +1966,8 @@ class PyMISP: to_return.append(u) return to_return - def get_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None, pythonify: bool=False): + def get_user_setting(self, user_setting: str, user: Optional[Union[MISPUser, int, str, UUID]]=None, + pythonify: bool=False) -> Union[dict, MISPUserSetting]: '''Get an user setting''' query = {'setting': user_setting} if user: @@ -1980,7 +1980,8 @@ class PyMISP: u.from_dict(**user_setting) return u - def set_user_setting(self, user_setting: str, value: Union[str, dict], user: Union[MISPUser, int, str, UUID]=None, pythonify: bool=False): + def set_user_setting(self, user_setting: str, value: Union[str, dict], user: Optional[Union[MISPUser, int, str, UUID]]=None, + pythonify: bool=False) -> Union[dict, MISPUserSetting]: '''Get an user setting''' query = {'setting': user_setting} if isinstance(value, dict): @@ -1996,7 +1997,7 @@ class PyMISP: u.from_dict(**user_setting) return u - def delete_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None): + def delete_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None) -> dict: '''Delete a user setting''' query = {'setting': user_setting} if user: @@ -2008,7 +2009,7 @@ class PyMISP: # ## BEGIN Global helpers ### - def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id, pythonify: bool=False): + def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id, pythonify: bool=False) -> Union[dict, MISPEvent, MISPObject, MISPAttribute]: """Change the sharing group of an event, an attribute, or an object""" misp_entity.distribution = 4 # Needs to be 'Sharing group' if 'SharingGroup' in misp_entity: # Delete former SharingGroup information @@ -2025,7 +2026,7 @@ class PyMISP: raise PyMISPError('The misp_entity must be MISPEvent, MISPObject or MISPAttribute') - def tag(self, misp_entity: Union[AbstractMISP, str], tag: Union[MISPTag, str], local: bool=False): + def tag(self, misp_entity: Union[AbstractMISP, str], tag: Union[MISPTag, str], local: bool=False) -> dict: """Tag an event or an attribute. misp_entity can be a MISPEvent, a MISP Attribute, or a UUID""" if 'uuid' in misp_entity: uuid = misp_entity.uuid @@ -2037,7 +2038,7 @@ class PyMISP: response = self._prepare_request('POST', 'tags/attachTagToObject', data=to_post) return self._check_response(response, expect_json=True) - def untag(self, misp_entity: Union[AbstractMISP, str], tag: Union[MISPTag, str]): + def untag(self, misp_entity: Union[AbstractMISP, str], tag: Union[MISPTag, str]) -> dict: """Untag an event or an attribute. misp_entity can be a UUID""" if 'uuid' in misp_entity: uuid = misp_entity.uuid @@ -2051,7 +2052,7 @@ class PyMISP: def build_complex_query(self, or_parameters: Optional[List[SearchType]]=None, and_parameters: Optional[List[SearchType]]=None, - not_parameters: Optional[List[SearchType]]=None): + not_parameters: Optional[List[SearchType]]=None) -> dict: '''Build a complex search query. MISP expects a dictionary with AND, OR and NOT keys.''' to_return = {} if and_parameters: @@ -2066,7 +2067,7 @@ class PyMISP: # ## Internal methods ### - def _old_misp(self, minimal_version_required: tuple, removal_date: Union[str, date, datetime], method: str=None, message: str=None): + def _old_misp(self, minimal_version_required: tuple, removal_date: Union[str, date, datetime], method: Optional[str]=None, message: Optional[str]=None) -> bool: if self._misp_version >= minimal_version_required: return False if isinstance(removal_date, (datetime, date)): @@ -2077,7 +2078,7 @@ class PyMISP: warnings.warn(to_print, DeprecationWarning) return True - def __get_uuid_or_id_from_abstract_misp(self, obj: Union[AbstractMISP, int, str, UUID]): + def __get_uuid_or_id_from_abstract_misp(self, obj: Union[AbstractMISP, int, str, UUID]) -> Union[str, int]: if isinstance(obj, UUID): return str(obj) if isinstance(obj, (int, str)): @@ -2098,13 +2099,13 @@ class PyMISP: return obj['uuid'] return obj['id'] - def _make_misp_bool(self, parameter: Union[bool, str, None]): + def _make_misp_bool(self, parameter: Optional[Union[bool, str]]=None) -> int: '''MISP wants 0 or 1 for bool, so we avoid True/False '0', '1' ''' if parameter is None: return 0 return 1 if int(parameter) else 0 - def _make_timestamp(self, value: DateTypes): + def _make_timestamp(self, value: DateTypes) -> Union[str, int]: '''Catch-all method to normalize anything that can be converted to a timestamp''' if isinstance(value, datetime): return value.timestamp() @@ -2123,7 +2124,7 @@ class PyMISP: return value return value - def _check_response(self, response, lenient_response_type=False, expect_json=False): + def _check_response(self, response: requests.Response, lenient_response_type: bool=False, expect_json: bool=False) -> dict: """Check if the response from the server is not an unexpected error""" if response.status_code >= 500: logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text)) @@ -2165,8 +2166,8 @@ class PyMISP: def __repr__(self): return f'<{self.__class__.__name__}(url={self.root_url})' - def _prepare_request(self, request_type: str, url: str, data: dict={}, params: dict={}, - kw_params: dict={}, output_type: str='json'): + def _prepare_request(self, request_type: str, url: str, data: Union[dict, AbstractMISP]={}, params: dict={}, + kw_params: dict={}, output_type: str='json') -> requests.Response: '''Prepare a request for python-requests''' url = urljoin(self.root_url, url) if data: @@ -2202,7 +2203,7 @@ class PyMISP: settings = s.merge_environment_settings(req.url, proxies=self.proxies or {}, stream=None, verify=self.ssl, cert=self.cert) return s.send(prepped, **settings) - def _csv_to_dict(self, csv_content: str): + def _csv_to_dict(self, csv_content: str) -> List[dict]: '''Makes a list of dict out of a csv file (requires headers)''' fieldnames, lines = csv_content.split('\n', 1) fieldnames = fieldnames.split(',') diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index e1685b2..9bfaa00 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -926,7 +926,7 @@ class MISPEvent(AbstractMISP): with open(event_path, 'rb') as f: self.load(f, validate, metadata_only) - def load(self, json_event: Union[IO, str, bytes], validate: bool=False, metadata_only: bool=False): + def load(self, json_event: Union[IO, str, bytes, dict], validate: bool=False, metadata_only: bool=False): """Load a JSON dump from a pseudo file or a JSON string""" if hasattr(json_event, 'read'): # python2 and python3 compatible to find if we have a file