From b0e95fd5af08891c0829c8482179548231ad1843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Thu, 23 Jan 2020 10:27:40 +0100 Subject: [PATCH] chg: Refactorize typing, validate --- .travis.yml | 1 + examples/add_email_object.py | 2 +- examples/last.py | 6 +- pymisp/abstract.py | 23 +- pymisp/api.py | 775 +++++++++++++++------------- pymisp/data/describeTypes.json | 7 + pymisp/mispevent.py | 115 ++++- pymisp/py.typed | 0 pymisp/tools/abstractgenerator.py | 18 +- pymisp/tools/asnobject.py | 2 +- pymisp/tools/create_misp_object.py | 19 +- pymisp/tools/domainipobject.py | 2 +- pymisp/tools/elfobject.py | 10 +- pymisp/tools/emailobject.py | 4 +- pymisp/tools/ext_lookups.py | 4 +- pymisp/tools/fail2banobject.py | 2 +- pymisp/tools/feed.py | 3 +- pymisp/tools/fileobject.py | 12 +- pymisp/tools/genericgenerator.py | 4 +- pymisp/tools/geolocationobject.py | 2 +- pymisp/tools/load_warninglists.py | 2 +- pymisp/tools/machoobject.py | 10 +- pymisp/tools/neo4j.py | 2 +- pymisp/tools/openioc.py | 2 +- pymisp/tools/peobject.py | 10 +- pymisp/tools/reportlab_generator.py | 18 +- pymisp/tools/sbsignatureobject.py | 2 +- pymisp/tools/sshauthkeyobject.py | 6 +- pymisp/tools/stix.py | 10 +- pymisp/tools/urlobject.py | 4 +- pymisp/tools/vtreportobject.py | 15 +- tests/testlive_comprehensive.py | 4 +- tests/testlive_sync.py | 2 +- 33 files changed, 617 insertions(+), 481 deletions(-) create mode 100644 pymisp/py.typed diff --git a/.travis.yml b/.travis.yml index 951befd..40a3227 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,6 +22,7 @@ install: script: - bash travis/test_travis.sh + - mypy tests/testlive_comprehensive.py tests/test_mispevent.py tests/testlive_sync.py pymisp after_success: - pipenv run codecov diff --git a/examples/add_email_object.py b/examples/add_email_object.py index 298b140..756a562 100755 --- a/examples/add_email_object.py +++ b/examples/add_email_object.py @@ -4,7 +4,7 @@ from pymisp import ExpandedPyMISP from pymisp.tools import EMailObject import traceback -from keys import misp_url, misp_key, misp_verifycert +from keys import misp_url, misp_key, misp_verifycert # type: ignore import glob import argparse diff --git a/examples/last.py b/examples/last.py index 207b0f8..3fabf2c 100755 --- a/examples/last.py +++ b/examples/last.py @@ -2,7 +2,11 @@ # -*- coding: utf-8 -*- from pymisp import ExpandedPyMISP -from keys import misp_url, misp_key, misp_verifycert, misp_client_cert +from keys import misp_url, misp_key, misp_verifycert +try: + from keys import misp_client_cert +except ImportError: + misp_client_cert = '' import argparse import os diff --git a/pymisp/abstract.py b/pymisp/abstract.py index 437021f..51b232a 100644 --- a/pymisp/abstract.py +++ b/pymisp/abstract.py @@ -21,7 +21,7 @@ except ImportError: import logging from enum import Enum -from typing import Union, Optional +from typing import Union, Optional, List from .exceptions import PyMISPInvalidFormat, PyMISPError @@ -112,6 +112,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): self.__not_jsonable: list = [] self._fields_for_feed: set = {} self.__self_defined_describe_types: Union[dict, None] = None + self.uuid: str if kwargs.get('force_timestamps') is not None: # Ignore the edited objects and keep the timestamps. @@ -119,14 +120,6 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): else: self.__force_timestamps: bool = False - # List of classes having tags - from .mispevent import MISPAttribute, MISPEvent - self.__has_tags = (MISPAttribute, MISPEvent) - if isinstance(self, self.__has_tags): - self.Tag = [] - setattr(AbstractMISP, 'add_tag', AbstractMISP.__add_tag) - setattr(AbstractMISP, 'tags', property(AbstractMISP.__get_tags, AbstractMISP.__set_tags)) - @property def describe_types(self) -> dict: if self.__self_defined_describe_types: @@ -288,7 +281,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): return int(d) return int(d.timestamp()) - def __add_tag(self, tag=None, **kwargs): + def _add_tag(self, tag=None, **kwargs): """Add a tag to the attribute (by name or a MISPTag object)""" if isinstance(tag, str): misp_tag = MISPTag() @@ -308,11 +301,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta): self.edited = True return misp_tag - def __get_tags(self): - """Returns a lost of tags associated to this Attribute""" - return self.Tag - - def __set_tags(self, tags): + def _set_tags(self, tags): """Set a list of prepared MISPTag.""" if all(isinstance(x, MISPTag) for x in tags): self.Tag = tags @@ -337,6 +326,10 @@ class MISPTag(AbstractMISP): _fields_for_feed: set = {'name', 'colour'} + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.name: str + def from_dict(self, **kwargs): if kwargs.get('Tag'): kwargs = kwargs.get('Tag') diff --git a/pymisp/api.py b/pymisp/api.py index 27c337a..1789777 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from typing import TypeVar, Optional, Tuple, List, Dict, Union +from typing import TypeVar, Optional, Tuple, List, Dict, Union, Any from datetime import date, datetime import csv from pathlib import Path @@ -14,6 +14,7 @@ import re from uuid import UUID import warnings import sys +import traceback from . import __version__, everything_broken from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey @@ -25,9 +26,7 @@ from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types SearchType = TypeVar('SearchType', str, int) # str: string to search / list: values to search (OR) / dict: {'OR': [list], 'NOT': [list], 'AND': [list]} -SearchParameterTypes = TypeVar('SearchParameterTypes', str, List[SearchType], Dict[str, SearchType]) -DateTypes = TypeVar('DateTypes', datetime, date, SearchType, float) -DateInterval = TypeVar('DateInterval', DateTypes, Tuple[DateTypes, DateTypes]) +SearchParameterTypes = TypeVar('SearchParameterTypes', str, List[Union[str, int]], Dict[str, Union[str, int]]) ToIDSType = TypeVar('ToIDSType', str, int, bool) @@ -72,8 +71,8 @@ class PyMISP: try: # Make sure the MISP instance is working and the URL is valid response = self.recommended_pymisp_version - if response.get('errors'): - logger.warning(response.get('errors')[0]) + if 'errors' in response: + logger.warning(response['errors'][0]) else: pymisp_version_tup = tuple(int(x) for x in __version__.split('.')) recommended_version_tup = tuple(int(x) for x in response['version'].split('.')) @@ -87,8 +86,12 @@ class PyMISP: self._misp_version = tuple(int(v) for v in misp_version['version'].split('.')) # Get the user information + self._current_user: MISPUser + self._current_role: MISPRole + self._current_user_settings: List[MISPUserSetting] self._current_user, self._current_role, self._current_user_settings = self.get_user(pythonify=True, expanded=True) except Exception as e: + traceback.print_exc() raise PyMISPError(f'Unable to connect to MISP ({self.root_url}). Please make sure the API key and the URL are correct (http/https is required): {e}') try: @@ -101,12 +104,12 @@ class PyMISP: self.category_type_mapping = self.describe_types['category_type_mappings'] self.sane_default = self.describe_types['sane_defaults'] - def remote_acl(self, debug_type: str='findMissingFunctionNames'): + def remote_acl(self, debug_type: str='findMissingFunctionNames') -> dict: """This should return an empty list, unless the ACL is outdated. debug_type can only be printAllFunctionNames, findMissingFunctionNames, or printRoleAccess """ response = self._prepare_request('GET', f'events/queryACL/{debug_type}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) @property def describe_types_local(self) -> dict: @@ -117,14 +120,14 @@ class PyMISP: def describe_types_remote(self) -> dict: '''Returns the content of describe types from the remote instance''' response = self._prepare_request('GET', 'attributes/describeTypes.json') - remote_describe_types = self._check_response(response, expect_json=True) + remote_describe_types = self._check_json_response(response) return remote_describe_types['result'] @property def recommended_pymisp_version(self) -> dict: """Returns the recommended API version from the server""" response = self._prepare_request('GET', 'servers/getPyMISPVersion.json') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) @property def version(self) -> dict: @@ -144,7 +147,7 @@ class PyMISP: def misp_instance_version(self) -> dict: """Returns the version of the instance.""" response = self._prepare_request('GET', 'servers/getVersion.json') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) @property def misp_instance_version_master(self) -> dict: @@ -157,37 +160,37 @@ class PyMISP: def update_misp(self) -> dict: response = self._prepare_request('POST', '/servers/update') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def set_server_setting(self, setting: str, value: Union[str, int, bool], force: bool=False) -> dict: data = {'value': value, 'force': force} response = self._prepare_request('POST', f'/servers/serverSettingsEdit/{setting}', data=data) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def get_server_setting(self, setting: str) -> dict: response = self._prepare_request('GET', f'/servers/getSetting/{setting}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def server_settings(self) -> dict: response = self._prepare_request('GET', f'/servers/serverSettings') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def restart_workers(self) -> dict: response = self._prepare_request('POST', f'/servers/restartWorkers') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def db_schema_diagnostic(self) -> dict: response = self._prepare_request('GET', f'/servers/dbSchemaDiagnostic') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def toggle_global_pythonify(self) -> None: self.global_pythonify = not self.global_pythonify # ## BEGIN Event ## - def events(self, pythonify: bool=False) -> List[Union[dict, MISPEvent]]: + def events(self, pythonify: bool=False) -> Union[dict, List[MISPEvent]]: r = self._prepare_request('GET', 'events') - events_r = self._check_response(r, expect_json=True) + events_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in events_r: return events_r to_return = [] @@ -197,7 +200,7 @@ class PyMISP: to_return.append(e) return to_return - def get_event(self, event: Union[MISPEvent, int, str, UUID], deleted: [bool, int, list]=False, pythonify: bool=False) -> Union[dict, MISPEvent]: + def get_event(self, event: Union[MISPEvent, int, str, UUID], deleted: Union[bool, int, list]=False, pythonify: bool=False) -> Union[dict, MISPEvent]: '''Get an event from a MISP instance''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) if deleted: @@ -205,7 +208,7 @@ class PyMISP: r = self._prepare_request('POST', f'events/view/{event_id}', data=data) else: r = self._prepare_request('GET', f'events/view/{event_id}') - event_r = self._check_response(r, expect_json=True) + event_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in event_r: return event_r e = MISPEvent() @@ -215,7 +218,7 @@ class PyMISP: def add_event(self, event: MISPEvent, pythonify: bool=False) -> Union[dict, MISPEvent]: '''Add a new event on a MISP instance''' r = self._prepare_request('POST', 'events', data=event) - new_event = self._check_response(r, expect_json=True) + new_event = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_event: return new_event e = MISPEvent() @@ -229,7 +232,7 @@ class PyMISP: else: eid = self.__get_uuid_or_id_from_abstract_misp(event_id) r = self._prepare_request('POST', f'events/{eid}', data=event) - updated_event = self._check_response(r, expect_json=True) + updated_event = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_event: return updated_event e = MISPEvent() @@ -240,7 +243,7 @@ class PyMISP: '''Delete an event from a MISP instance''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) response = self._prepare_request('DELETE', f'events/delete/{event_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def publish(self, event: Union[MISPEvent, int, str, UUID], alert: bool=False) -> dict: """Publish the event with one single HTTP POST. @@ -251,14 +254,14 @@ class PyMISP: response = self._prepare_request('POST', f'events/alert/{event_id}') else: response = self._prepare_request('POST', f'events/publish/{event_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def contact_event_reporter(self, event: Union[MISPEvent, int, str, UUID], message: str) -> dict: """Send a message to the reporter of an event""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) to_post = {'message': message} response = self._prepare_request('POST', f'events/contact/{event_id}', data=to_post) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Event ### @@ -268,7 +271,7 @@ class PyMISP: '''Get an object from the remote MISP instance''' object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) r = self._prepare_request('GET', f'objects/view/{object_id}') - misp_object_r = self._check_response(r, expect_json=True) + misp_object_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in misp_object_r: return misp_object_r o = MISPObject(misp_object_r['Object']['name']) @@ -279,7 +282,7 @@ class PyMISP: '''Add a MISP Object to an existing MISP event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) r = self._prepare_request('POST', f'objects/add/{event_id}', data=misp_object) - new_object = self._check_response(r, expect_json=True) + new_object = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_object: return new_object o = MISPObject(new_object['Object']['name']) @@ -293,7 +296,7 @@ class PyMISP: else: oid = self.__get_uuid_or_id_from_abstract_misp(object_id) r = self._prepare_request('POST', f'objects/edit/{oid}', data=misp_object) - updated_object = self._check_response(r, expect_json=True) + updated_object = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_object: return updated_object o = MISPObject(updated_object['Object']['name']) @@ -304,12 +307,12 @@ class PyMISP: '''Delete an object from a MISP instance''' object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) response = self._prepare_request('POST', f'objects/delete/{object_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def add_object_reference(self, misp_object_reference: MISPObjectReference, pythonify: bool=False) -> Union[dict, MISPObjectReference]: """Add a reference to an object""" r = self._prepare_request('POST', 'object_references/add', misp_object_reference) - object_reference = self._check_response(r, expect_json=True) + object_reference = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in object_reference: return object_reference ref = MISPObjectReference() @@ -320,14 +323,14 @@ class PyMISP: """Delete a reference to an object""" object_reference_id = self.__get_uuid_or_id_from_abstract_misp(object_reference) response = self._prepare_request('POST', f'object_references/delete/{object_reference_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # Object templates - def object_templates(self, pythonify: bool=False) -> List[Union[dict, MISPObjectTemplate]]: + def object_templates(self, pythonify: bool=False) -> Union[dict, List[MISPObjectTemplate]]: """Get all the object templates.""" r = self._prepare_request('GET', 'objectTemplates') - templates = self._check_response(r, expect_json=True) + templates = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in templates: return templates to_return = [] @@ -341,7 +344,7 @@ class PyMISP: """Gets the full object template corresponting the UUID passed as parameter""" object_template_id = self.__get_uuid_or_id_from_abstract_misp(object_template) r = self._prepare_request('GET', f'objectTemplates/view/{object_template_id}') - object_template_r = self._check_response(r, expect_json=True) + object_template_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in object_template_r: return object_template_r t = MISPObjectTemplate() @@ -351,15 +354,15 @@ class PyMISP: def update_object_templates(self) -> dict: """Trigger an update of the object templates""" response = self._prepare_request('POST', 'objectTemplates/update') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Object ### # ## BEGIN Attribute ### - def attributes(self, pythonify: bool=False) -> List[Union[dict, MISPAttribute]]: + def attributes(self, pythonify: bool=False) -> Union[dict, List[MISPAttribute]]: r = self._prepare_request('GET', f'attributes/index') - attributes_r = self._check_response(r, expect_json=True) + attributes_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in attributes_r: return attributes_r to_return = [] @@ -373,7 +376,7 @@ class PyMISP: '''Get an attribute from a MISP instance''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) r = self._prepare_request('GET', f'attributes/view/{attribute_id}') - attribute_r = self._check_response(r, expect_json=True) + attribute_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in attribute_r: return attribute_r a = MISPAttribute() @@ -386,7 +389,7 @@ class PyMISP: In that case, the pythonified response is the following: {'attributes': [MISPAttribute], 'errors': {errors by attributes}}''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) r = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute) - new_attribute = self._check_response(r, expect_json=True) + new_attribute = self._check_json_response(r) if isinstance(attribute, list): # Multiple attributes were passed at once, the handling is totally different if not (self.global_pythonify or pythonify): @@ -419,7 +422,7 @@ class PyMISP: else: aid = self.__get_uuid_or_id_from_abstract_misp(attribute_id) r = self._prepare_request('POST', f'attributes/edit/{aid}', data=attribute) - updated_attribute = self._check_response(r, expect_json=True) + updated_attribute = self._check_json_response(r) if 'errors' in updated_attribute: if (updated_attribute['errors'][0] == 403 and updated_attribute['errors'][1]['message'] == 'You do not have permission to do that.'): @@ -439,7 +442,7 @@ class PyMISP: if hard: data['hard'] = 1 r = self._prepare_request('POST', f'attributes/delete/{attribute_id}', data=data) - response = self._check_response(r, expect_json=True) + response = self._check_json_response(r) if ('errors' in response and response['errors'][0] == 403 and response['errors'][1]['message'] == 'You do not have permission to do that.'): # FIXME: https://github.com/MISP/MISP/issues/4913 @@ -452,13 +455,13 @@ class PyMISP: # ## BEGIN Attribute Proposal ### - def attribute_proposals(self, event: Union[MISPEvent, int, str, UUID]=None, pythonify: bool=False) -> List[Union[dict, MISPShadowAttribute]]: + def attribute_proposals(self, event: Union[MISPEvent, int, str, UUID]=None, pythonify: bool=False) -> Union[dict, List[MISPShadowAttribute]]: if event: event_id = self.__get_uuid_or_id_from_abstract_misp(event) r = self._prepare_request('GET', f'shadow_attributes/index/{event_id}') else: r = self._prepare_request('GET', f'shadow_attributes') - attribute_proposals = self._check_response(r, expect_json=True) + attribute_proposals = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposals: return attribute_proposals to_return = [] @@ -471,7 +474,7 @@ class PyMISP: def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPShadowAttribute]: proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) r = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}') - attribute_proposal = self._check_response(r, expect_json=True) + attribute_proposal = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposal: return attribute_proposal a = MISPShadowAttribute() @@ -484,7 +487,7 @@ class PyMISP: '''Propose a new attribute in an event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) r = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute) - new_attribute_proposal = self._check_response(r, expect_json=True) + new_attribute_proposal = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_attribute_proposal: return new_attribute_proposal a = MISPShadowAttribute() @@ -495,7 +498,7 @@ class PyMISP: '''Propose a change for an attribute''' initial_attribute_id = self.__get_uuid_or_id_from_abstract_misp(initial_attribute) r = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute) - update_attribute_proposal = self._check_response(r, expect_json=True) + update_attribute_proposal = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in update_attribute_proposal: return update_attribute_proposal a = MISPShadowAttribute() @@ -506,7 +509,7 @@ class PyMISP: '''Propose the deletion of an attribute''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) response = self._prepare_request('POST', f'shadow_attributes/delete/{attribute_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # NOTE: You cannot modify an existing proposal, only accept/discard @@ -514,42 +517,36 @@ class PyMISP: '''Accept a proposal''' proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) response = self._prepare_request('POST', f'shadow_attributes/accept/{proposal_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def discard_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID]) -> dict: '''Discard a proposal''' proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) response = self._prepare_request('POST', f'shadow_attributes/discard/{proposal_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Attribute Proposal ### # ## BEGIN Sighting ### - def sightings(self, misp_entity: AbstractMISP=None, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False) -> List[Union[dict, MISPSighting]]: + def sightings(self, misp_entity: AbstractMISP=None, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False) -> Union[dict, List[MISPSighting]]: """Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)""" if isinstance(misp_entity, MISPEvent): - context = 'event' + url = 'sightings/listSightings' + to_post = {'context': 'event', 'id': misp_entity.id} elif isinstance(misp_entity, MISPAttribute): - context = 'attribute' + url = 'sightings/listSightings' + to_post = {'context': 'attribute', 'id': misp_entity.id} else: - context = None - if org is not None: - org_id = self.__get_uuid_or_id_from_abstract_misp(org) - else: - org_id = None - - if context is None: url = 'sightings' to_post = {} - else: - url = 'sightings/listSightings' - to_post = {'id': misp_entity.id, 'context': context} - if org_id: - to_post['org_id'] = org_id - sightings = self._prepare_request('POST', url, data=to_post) - sightings = self._check_response(sightings, expect_json=True) + if org is not None: + org_id = self.__get_uuid_or_id_from_abstract_misp(org) + to_post['org_id'] = org_id + + r = self._prepare_request('POST', url, data=to_post) + sightings = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in sightings: return sightings to_return = [] @@ -567,7 +564,7 @@ class PyMISP: else: # Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value r = self._prepare_request('POST', f'sightings/add', data=sighting) - new_sighting = self._check_response(r, expect_json=True) + new_sighting = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_sighting: return new_sighting s = MISPSighting() @@ -578,16 +575,16 @@ class PyMISP: '''Delete a sighting from a MISP instance''' sighting_id = self.__get_uuid_or_id_from_abstract_misp(sighting) response = self._prepare_request('POST', f'sightings/delete/{sighting_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Sighting ### # ## BEGIN Tags ### - def tags(self, pythonify: bool=False) -> List[Union[dict, MISPTag]]: + def tags(self, pythonify: bool=False) -> Union[dict, List[MISPTag]]: """Get the list of existing tags.""" r = self._prepare_request('GET', 'tags') - tags = self._check_response(r, expect_json=True) + tags = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in tags: return tags['Tag'] to_return = [] @@ -601,11 +598,11 @@ class PyMISP: """Get a tag by id.""" tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) r = self._prepare_request('GET', f'tags/view/{tag_id}') - tag = self._check_response(r, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in tag: - return tag + tag_r = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in tag_r: + return tag_r t = MISPTag() - t.from_dict(**tag) + t.from_dict(**tag_r) return t def add_tag(self, tag: MISPTag, pythonify: bool=False) -> Union[dict, MISPTag]: @@ -615,7 +612,7 @@ class PyMISP: * It doesn't add a tag to an event, simply create it on a MISP instance. ''' r = self._prepare_request('POST', 'tags/add', data=tag) - new_tag = self._check_response(r, expect_json=True) + new_tag = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_tag: return new_tag t = MISPTag() @@ -632,14 +629,14 @@ class PyMISP: tag.hide_tag = True return self.update_tag(tag, pythonify=pythonify) - def update_tag(self, tag: MISPTag, tag_id: int=None, pythonify: bool=False) -> Union[dict, MISPTag]: + def update_tag(self, tag: MISPTag, tag_id: Optional[int]=None, pythonify: bool=False) -> Union[dict, MISPTag]: """Edit only the provided parameters of a tag.""" if tag_id is None: - tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) + tid = self.__get_uuid_or_id_from_abstract_misp(tag) else: - tag_id = self.__get_uuid_or_id_from_abstract_misp(tag_id) - updated_tag = self._prepare_request('POST', f'tags/edit/{tag_id}', data=tag) - updated_tag = self._check_response(updated_tag, expect_json=True) + tid = self.__get_uuid_or_id_from_abstract_misp(tag_id) + r = self._prepare_request('POST', f'tags/edit/{tid}', data=tag) + updated_tag = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_tag: return updated_tag t = MISPTag() @@ -650,16 +647,16 @@ class PyMISP: '''Delete an attribute from a MISP instance''' tag_id = self.__get_uuid_or_id_from_abstract_misp(tag) response = self._prepare_request('POST', f'tags/delete/{tag_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Tags ### # ## BEGIN Taxonomies ### - def taxonomies(self, pythonify: bool=False) -> List[Union[dict, MISPTaxonomy]]: + def taxonomies(self, pythonify: bool=False) -> Union[dict, List[MISPTaxonomy]]: """Get all the taxonomies.""" r = self._prepare_request('GET', 'taxonomies') - taxonomies = self._check_response(r, expect_json=True) + taxonomies = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in taxonomies: return taxonomies to_return = [] @@ -673,7 +670,7 @@ class PyMISP: """Get a taxonomy from a MISP instance.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) r = self._prepare_request('GET', f'taxonomies/view/{taxonomy_id}') - taxonomy_r = self._check_response(r, expect_json=True) + taxonomy_r = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in taxonomy_r: return taxonomy_r t = MISPTaxonomy() @@ -684,45 +681,45 @@ class PyMISP: """Enable a taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) response = self._prepare_request('POST', f'taxonomies/enable/{taxonomy_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def disable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> dict: """Disable a taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) self.disable_taxonomy_tags(taxonomy_id) response = self._prepare_request('POST', f'taxonomies/disable/{taxonomy_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def disable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> dict: """Disable all the tags of a taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) response = self._prepare_request('POST', f'taxonomies/disableTag/{taxonomy_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def enable_taxonomy_tags(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> dict: """Enable all the tags of a taxonomy. NOTE: this automatically done when you call enable_taxonomy.""" taxonomy_id = self.__get_uuid_or_id_from_abstract_misp(taxonomy) - taxonomy = self.get_taxonomy(taxonomy_id) - if not taxonomy['Taxonomy']['enabled']: - raise PyMISPError(f"The taxonomy {taxonomy['Taxonomy']['name']} is not enabled.") + t = self.get_taxonomy(taxonomy_id) + if not t['Taxonomy']['enabled']: + raise PyMISPError(f"The taxonomy {t['Taxonomy']['name']} is not enabled.") url = urljoin(self.root_url, 'taxonomies/addTag/{}'.format(taxonomy_id)) response = self._prepare_request('POST', url) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def update_taxonomies(self) -> dict: """Update all the taxonomies.""" response = self._prepare_request('POST', 'taxonomies/update') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Taxonomies ### # ## BEGIN Warninglists ### - def warninglists(self, pythonify: bool=False) -> List[Union[dict, MISPWarninglist]]: + def warninglists(self, pythonify: bool=False) -> Union[dict, List[MISPWarninglist]]: """Get all the warninglists.""" - warninglists = self._prepare_request('GET', 'warninglists') - warninglists = self._check_response(warninglists, expect_json=True) + r = self._prepare_request('GET', 'warninglists') + warninglists = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in warninglists: return warninglists['Warninglists'] to_return = [] @@ -735,34 +732,36 @@ class PyMISP: def get_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPWarninglist]: """Get a warninglist.""" warninglist_id = self.__get_uuid_or_id_from_abstract_misp(warninglist) - warninglist = self._prepare_request('GET', f'warninglists/view/{warninglist_id}') - warninglist = self._check_response(warninglist, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in warninglist: - return warninglist + r = self._prepare_request('GET', f'warninglists/view/{warninglist_id}') + wl = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in wl: + return wl w = MISPWarninglist() - w.from_dict(**warninglist) + w.from_dict(**wl) return w - def toggle_warninglist(self, warninglist_id: List[int]=None, warninglist_name: List[str]=None, force_enable: bool=False) -> dict: + def toggle_warninglist(self, warninglist_id: Optional[Union[str, int, List[int]]]=None, warninglist_name: Optional[Union[str, List[str]]]=None, force_enable: bool=False) -> dict: '''Toggle (enable/disable) the status of a warninglist by ID. :param warninglist_id: ID of the WarningList :param force_enable: Force the warning list in the enabled state (does nothing is already enabled) ''' if warninglist_id is None and warninglist_name is None: raise PyMISPError('Either warninglist_id or warninglist_name is required.') - query = {} + query: Dict[str, Union[List[str], List[int], bool]] = {} if warninglist_id is not None: - if not isinstance(warninglist_id, list): - warninglist_id = [warninglist_id] - query['id'] = warninglist_id + if isinstance(warninglist_id, list): + query['id'] = warninglist_id + else: + query['id'] = [warninglist_id] # type: ignore if warninglist_name is not None: - if not isinstance(warninglist_name, list): - warninglist_name = [warninglist_name] - query['name'] = warninglist_name + if isinstance(warninglist_name, list): + query['name'] = warninglist_name + else: + query['name'] = [warninglist_name] if force_enable: query['enabled'] = force_enable - response = self._prepare_request('POST', 'warninglists/toggleEnable', data=json.dumps(query)) - return self._check_response(response, expect_json=True) + response = self._prepare_request('POST', 'warninglists/toggleEnable', data=query) + return self._check_json_response(response) def enable_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID]) -> dict: """Enable a warninglist.""" @@ -776,22 +775,22 @@ class PyMISP: def values_in_warninglist(self, value: list) -> dict: """Check if IOC values are in warninglist""" - response = self._prepare_request('POST', 'warninglists/checkValue', data=json.dumps(value)) - return self._check_response(response, expect_json=True) + response = self._prepare_request('POST', 'warninglists/checkValue', data=value) + return self._check_json_response(response) def update_warninglists(self) -> dict: """Update all the warninglists.""" response = self._prepare_request('POST', 'warninglists/update') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Warninglists ### # ## BEGIN Noticelist ### - def noticelists(self, pythonify: bool=False) -> List[Union[dict, MISPNoticelist]]: + def noticelists(self, pythonify: bool=False) -> Union[dict, List[MISPNoticelist]]: """Get all the noticelists.""" - noticelists = self._prepare_request('GET', 'noticelists') - noticelists = self._check_response(noticelists, expect_json=True) + r = self._prepare_request('GET', 'noticelists') + noticelists = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in noticelists: return noticelists to_return = [] @@ -804,12 +803,12 @@ class PyMISP: def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPNoticelist]: """Get a noticelist by id.""" noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) - noticelist = self._prepare_request('GET', f'noticelists/view/{noticelist_id}') - noticelist = self._check_response(noticelist, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in noticelist: - return noticelist + r = self._prepare_request('GET', f'noticelists/view/{noticelist_id}') + noticelist_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in noticelist_j: + return noticelist_j n = MISPNoticelist() - n.from_dict(**noticelist) + n.from_dict(**noticelist_j) return n def enable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]) -> dict: @@ -818,7 +817,7 @@ class PyMISP: # response = self._prepare_request('POST', f'noticelists/enable/{noticelist_id}') noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}/true') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def disable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]) -> dict: """Disable a noticelist by id.""" @@ -826,21 +825,21 @@ class PyMISP: # response = self._prepare_request('POST', f'noticelists/disable/{noticelist_id}') noticelist_id = self.__get_uuid_or_id_from_abstract_misp(noticelist) response = self._prepare_request('POST', f'noticelists/enableNoticelist/{noticelist_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def update_noticelists(self) -> dict: """Update all the noticelists.""" response = self._prepare_request('POST', 'noticelists/update') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Noticelist ### # ## BEGIN Galaxy ### - def galaxies(self, pythonify: bool=False) -> List[Union[dict, MISPGalaxy]]: + def galaxies(self, pythonify: bool=False) -> Union[dict, List[MISPGalaxy]]: """Get all the galaxies.""" - galaxies = self._prepare_request('GET', 'galaxies') - galaxies = self._check_response(galaxies, expect_json=True) + r = self._prepare_request('GET', 'galaxies') + galaxies = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in galaxies: return galaxies to_return = [] @@ -853,27 +852,27 @@ class PyMISP: def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPGalaxy]: """Get a galaxy by id.""" galaxy_id = self.__get_uuid_or_id_from_abstract_misp(galaxy) - galaxy = self._prepare_request('GET', f'galaxies/view/{galaxy_id}') - galaxy = self._check_response(galaxy, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in galaxy: - return galaxy + r = self._prepare_request('GET', f'galaxies/view/{galaxy_id}') + galaxy_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in galaxy_j: + return galaxy_j g = MISPGalaxy() - g.from_dict(**galaxy) + g.from_dict(**galaxy_j) return g def update_galaxies(self) -> dict: """Update all the galaxies.""" response = self._prepare_request('POST', 'galaxies/update') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Galaxy ### # ## BEGIN Feed ### - def feeds(self, pythonify: bool=False) -> List[Union[dict, MISPFeed]]: + def feeds(self, pythonify: bool=False) -> Union[dict, List[MISPFeed]]: """Get the list of existing feeds.""" - feeds = self._prepare_request('GET', 'feeds') - feeds = self._check_response(feeds, expect_json=True) + r = self._prepare_request('GET', 'feeds') + feeds = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in feeds: return feeds to_return = [] @@ -886,20 +885,19 @@ class PyMISP: def get_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: """Get a feed by id.""" feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) - feed = self._prepare_request('GET', f'feeds/view/{feed_id}') - feed = self._check_response(feed, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in feed: - return feed + r = self._prepare_request('GET', f'feeds/view/{feed_id}') + feed_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in feed_j: + return feed_j f = MISPFeed() - f.from_dict(**feed) + f.from_dict(**feed_j) return f def add_feed(self, feed: MISPFeed, pythonify: bool=False) -> Union[dict, MISPFeed]: '''Add a new feed on a MISP instance''' # FIXME: https://github.com/MISP/MISP/issues/4834 - feed = {'Feed': feed} - new_feed = self._prepare_request('POST', 'feeds/add', data=feed) - new_feed = self._check_response(new_feed, expect_json=True) + r = self._prepare_request('POST', 'feeds/add', data={'Feed': feed}) + new_feed = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_feed: return new_feed f = MISPFeed() @@ -910,48 +908,47 @@ class PyMISP: '''Enable a feed (fetching it will create event(s)''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID - feed = MISPFeed() - feed.id = feed_id - feed.enabled = True - return self.update_feed(feed=feed, pythonify=pythonify) + f = MISPFeed() + f.id = feed_id + f.enabled = True + return self.update_feed(feed=f, pythonify=pythonify) def disable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: '''Disable a feed''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID - feed = MISPFeed() - feed.id = feed_id - feed.enabled = False - return self.update_feed(feed=feed, pythonify=pythonify) + f = MISPFeed() + f.id = feed_id + f.enabled = False + return self.update_feed(feed=f, pythonify=pythonify) def enable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: '''Enable the caching of a feed''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID - feed = MISPFeed() - feed.id = feed_id - feed.caching_enabled = True - return self.update_feed(feed=feed, pythonify=pythonify) + f = MISPFeed() + f.id = feed_id + f.caching_enabled = True + return self.update_feed(feed=f, pythonify=pythonify) def disable_feed_cache(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPFeed]: '''Disable the caching of a feed''' if not isinstance(feed, MISPFeed): feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) # In case we have a UUID - feed = MISPFeed() - feed.id = feed_id - feed.caching_enabled = False - return self.update_feed(feed=feed, pythonify=pythonify) + f = MISPFeed() + f.id = feed_id + f.caching_enabled = False + return self.update_feed(feed=f, pythonify=pythonify) def update_feed(self, feed: MISPFeed, feed_id: int=None, pythonify: bool=False) -> Union[dict, MISPFeed]: '''Update a feed on a MISP instance''' if feed_id is None: - feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) + fid = self.__get_uuid_or_id_from_abstract_misp(feed) else: - feed_id = self.__get_uuid_or_id_from_abstract_misp(feed_id) + fid = self.__get_uuid_or_id_from_abstract_misp(feed_id) # FIXME: https://github.com/MISP/MISP/issues/4834 - feed = {'Feed': feed} - updated_feed = self._prepare_request('POST', f'feeds/edit/{feed_id}', data=feed) - updated_feed = self._check_response(updated_feed, expect_json=True) + r = self._prepare_request('POST', f'feeds/edit/{fid}', data={'Feed': feed}) + updated_feed = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_feed: return updated_feed f = MISPFeed() @@ -962,48 +959,48 @@ class PyMISP: '''Delete a feed from a MISP instance''' feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('POST', f'feeds/delete/{feed_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def fetch_feed(self, feed: Union[MISPFeed, int, str, UUID]) -> dict: """Fetch one single feed""" feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('GET', f'feeds/fetchFromFeed/{feed_id}') - return self._check_response(response) + return self._check_json_response(response) def cache_all_feeds(self) -> dict: """ Cache all the feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/all') - return self._check_response(response) + return self._check_json_response(response) def cache_feed(self, feed: Union[MISPFeed, int, str, UUID]) -> dict: """Cache a specific feed""" feed_id = self.__get_uuid_or_id_from_abstract_misp(feed) response = self._prepare_request('GET', f'feeds/cacheFeeds/{feed_id}') - return self._check_response(response) + return self._check_json_response(response) def cache_freetext_feeds(self) -> dict: """Cache all the freetext feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/freetext') - return self._check_response(response) + return self._check_json_response(response) def cache_misp_feeds(self) -> dict: """Cache all the MISP feeds""" response = self._prepare_request('GET', 'feeds/cacheFeeds/misp') - return self._check_response(response) + return self._check_json_response(response) def compare_feeds(self) -> dict: """Generate the comparison matrix for all the MISP feeds""" response = self._prepare_request('GET', 'feeds/compareFeeds') - return self._check_response(response) + return self._check_json_response(response) # ## END Feed ### # ## BEGIN Server ### - def servers(self, pythonify: bool=False) -> List[Union[dict, MISPServer]]: + def servers(self, pythonify: bool=False) -> Union[dict, List[MISPServer]]: """Get the existing servers the MISP instance can synchronise with""" - servers = self._prepare_request('GET', 'servers') - servers = self._check_response(servers, expect_json=True) + r = self._prepare_request('GET', 'servers') + servers = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in servers: return servers to_return = [] @@ -1015,8 +1012,8 @@ class PyMISP: def get_sync_config(self, pythonify: bool=False) -> Union[dict, MISPServer]: '''WARNING: This method only works if the user calling it is a sync user''' - server = self._prepare_request('GET', 'servers/createSync') - server = self._check_response(server, expect_json=True) + r = self._prepare_request('GET', 'servers/createSync') + server = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in server: return server s = MISPServer() @@ -1025,33 +1022,33 @@ class PyMISP: def import_server(self, server: MISPServer, pythonify: bool=False) -> Union[dict, MISPServer]: """Import a sync server config received from get_sync_config""" - server = self._prepare_request('POST', f'servers/import', data=server) - server = self._check_response(server, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in server: - return server + r = self._prepare_request('POST', f'servers/import', data=server) + server_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in server_j: + return server_j s = MISPServer() - s.from_dict(**server) + s.from_dict(**server_j) return s def add_server(self, server: MISPServer, pythonify: bool=False) -> Union[dict, MISPServer]: """Add a server to synchronise with. Note: You probably want to use ExpandedPyMISP.get_sync_config and ExpandedPyMISP.import_server instead""" - server = self._prepare_request('POST', f'servers/add', data=server) - server = self._check_response(server, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in server: - return server + r = self._prepare_request('POST', f'servers/add', data=server) + server_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in server_j: + return server_j s = MISPServer() - s.from_dict(**server) + s.from_dict(**server_j) return s def update_server(self, server: MISPServer, server_id: int=None, pythonify: bool=False) -> Union[dict, MISPServer]: '''Update a server to synchronise with''' if server_id is None: - server_id = self.__get_uuid_or_id_from_abstract_misp(server) + sid = self.__get_uuid_or_id_from_abstract_misp(server) else: - server_id = self.__get_uuid_or_id_from_abstract_misp(server_id) - updated_server = self._prepare_request('POST', f'servers/edit/{server_id}', data=server) - updated_server = self._check_response(updated_server, expect_json=True) + sid = self.__get_uuid_or_id_from_abstract_misp(server_id) + r = self._prepare_request('POST', f'servers/edit/{sid}', data=server) + updated_server = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_server: return updated_server s = MISPServer() @@ -1062,7 +1059,7 @@ class PyMISP: '''Delete a sync server''' server_id = self.__get_uuid_or_id_from_abstract_misp(server) response = self._prepare_request('POST', f'servers/delete/{server_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def server_pull(self, server: Union[MISPServer, int, str, UUID], event: Optional[Union[MISPEvent, int, str, UUID]]=None) -> dict: '''Initialize a pull from a sync server''' @@ -1074,7 +1071,7 @@ class PyMISP: url = f'servers/pull/{server_id}' response = self._prepare_request('GET', url) # FIXME: can we pythonify? - return self._check_response(response) + return self._check_json_response(response) def server_push(self, server: Union[MISPServer, int, str, UUID], event: Optional[Union[MISPEvent, int, str, UUID]]=None) -> dict: '''Initialize a push to a sync server''' @@ -1086,21 +1083,21 @@ class PyMISP: url = f'servers/push/{server_id}' response = self._prepare_request('GET', url) # FIXME: can we pythonify? - return self._check_response(response) + return self._check_json_response(response) def test_server(self, server: Union[MISPServer, int, str, UUID]) -> dict: server_id = self.__get_uuid_or_id_from_abstract_misp(server) response = self._prepare_request('POST', f'servers/testConnection/{server_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Server ### # ## BEGIN Sharing group ### - def sharing_groups(self, pythonify: bool=False) -> List[Union[dict, MISPSharingGroup]]: + def sharing_groups(self, pythonify: bool=False) -> Union[dict, List[MISPSharingGroup]]: """Get the existing sharing groups""" - sharing_groups = self._prepare_request('GET', 'sharing_groups') - sharing_groups = self._check_response(sharing_groups, expect_json=True) + r = self._prepare_request('GET', 'sharing_groups') + sharing_groups = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in sharing_groups: return sharing_groups to_return = [] @@ -1112,19 +1109,19 @@ class PyMISP: def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=False) -> Union[dict, MISPSharingGroup]: """Add a new sharing group""" - sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group) - sharing_group = self._check_response(sharing_group, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in sharing_group: - return sharing_group + r = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group) + sharing_group_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in sharing_group_j: + return sharing_group_j s = MISPSharingGroup() - s.from_dict(**sharing_group) + s.from_dict(**sharing_group_j) return s def delete_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID]) -> dict: """Delete a sharing group""" sharing_group_id = self.__get_uuid_or_id_from_abstract_misp(sharing_group) response = self._prepare_request('POST', f'sharing_groups/delete/{sharing_group_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def add_org_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], organisation: Union[MISPOrganisation, int, str, UUID], extend: bool=False) -> dict: @@ -1137,7 +1134,7 @@ class PyMISP: organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) to_jsonify = {'sg_id': sharing_group_id, 'org_id': organisation_id, 'extend': extend} response = self._prepare_request('POST', 'sharingGroups/addOrg', data=to_jsonify) - return self._check_response(response) + return self._check_json_response(response) def remove_org_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], organisation: Union[MISPOrganisation, int, str, UUID]) -> dict: @@ -1149,7 +1146,7 @@ class PyMISP: organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) to_jsonify = {'sg_id': sharing_group_id, 'org_id': organisation_id} response = self._prepare_request('POST', 'sharingGroups/removeOrg', data=to_jsonify) - return self._check_response(response) + return self._check_json_response(response) def add_server_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], server: Union[MISPServer, int, str, UUID], all_orgs: bool=False) -> dict: @@ -1162,7 +1159,7 @@ class PyMISP: server_id = self.__get_uuid_or_id_from_abstract_misp(server) to_jsonify = {'sg_id': sharing_group_id, 'server_id': server_id, 'all_orgs': all_orgs} response = self._prepare_request('POST', 'sharingGroups/addServer', data=to_jsonify) - return self._check_response(response) + return self._check_json_response(response) def remove_server_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], server: Union[MISPServer, int, str, UUID]) -> dict: @@ -1174,16 +1171,16 @@ class PyMISP: server_id = self.__get_uuid_or_id_from_abstract_misp(server) to_jsonify = {'sg_id': sharing_group_id, 'server_id': server_id} response = self._prepare_request('POST', 'sharingGroups/removeServer', data=to_jsonify) - return self._check_response(response) + return self._check_json_response(response) # ## END Sharing groups ### # ## BEGIN Organisation ### - def organisations(self, scope="local", pythonify: bool=False) -> List[Union[dict, MISPOrganisation]]: + def organisations(self, scope="local", pythonify: bool=False) -> Union[dict, List[MISPOrganisation]]: """Get all the organisations.""" - organisations = self._prepare_request('GET', f'organisations/index/scope:{scope}') - organisations = self._check_response(organisations, expect_json=True) + r = self._prepare_request('GET', f'organisations/index/scope:{scope}') + organisations = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in organisations: return organisations to_return = [] @@ -1196,18 +1193,18 @@ class PyMISP: def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPOrganisation]: '''Get an organisation.''' organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) - organisation = self._prepare_request('GET', f'organisations/view/{organisation_id}') - organisation = self._check_response(organisation, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in organisation: - return organisation + r = self._prepare_request('GET', f'organisations/view/{organisation_id}') + organisation_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in organisation_j: + return organisation_j o = MISPOrganisation() - o.from_dict(**organisation) + o.from_dict(**organisation_j) return o def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=False) -> Union[dict, MISPOrganisation]: '''Add an organisation''' - new_organisation = self._prepare_request('POST', f'admin/organisations/add', data=organisation) - new_organisation = self._check_response(new_organisation, expect_json=True) + r = self._prepare_request('POST', f'admin/organisations/add', data=organisation) + new_organisation = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in new_organisation: return new_organisation o = MISPOrganisation() @@ -1217,11 +1214,11 @@ class PyMISP: def update_organisation(self, organisation: MISPOrganisation, organisation_id: int=None, pythonify: bool=False) -> Union[dict, MISPOrganisation]: '''Update an organisation''' if organisation_id is None: - organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) + oid = self.__get_uuid_or_id_from_abstract_misp(organisation) else: - organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation_id) - updated_organisation = self._prepare_request('POST', f'admin/organisations/edit/{organisation_id}', data=organisation) - updated_organisation = self._check_response(updated_organisation, expect_json=True) + oid = self.__get_uuid_or_id_from_abstract_misp(organisation_id) + r = self._prepare_request('POST', f'admin/organisations/edit/{oid}', data=organisation) + updated_organisation = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_organisation: return updated_organisation o = MISPOrganisation() @@ -1233,16 +1230,16 @@ class PyMISP: # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) response = self._prepare_request('POST', f'admin/organisations/delete/{organisation_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Organisation ### # ## BEGIN User ### - def users(self, pythonify: bool=False) -> List[Union[dict, MISPUser]]: + def users(self, pythonify: bool=False) -> Union[dict, List[MISPUser]]: """Get all the users.""" - users = self._prepare_request('GET', 'admin/users') - users = self._check_response(users, expect_json=True) + r = self._prepare_request('GET', 'admin/users') + users = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in users: return users to_return = [] @@ -1252,50 +1249,50 @@ class PyMISP: to_return.append(u) return to_return - def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False, expanded: bool=False) -> Union[dict, MISPUser]: + def get_user(self, user: Union[MISPUser, int, str, UUID]='me', pythonify: bool=False, expanded: bool=False) -> Union[dict, MISPUser, Tuple[MISPUser, MISPRole, List[MISPUserSetting]]]: '''Get a user. `me` means the owner of the API key doing the query. expanded also returns a MISPRole and a MISPUserSetting''' user_id = self.__get_uuid_or_id_from_abstract_misp(user) - user = self._prepare_request('GET', f'users/view/{user_id}') - user = self._check_response(user, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in user: - return user + r = self._prepare_request('GET', f'users/view/{user_id}') + user_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in user_j: + return user_j u = MISPUser() - u.from_dict(**user) + u.from_dict(**user_j) if not expanded: return u else: - r = MISPRole() - r.from_dict(**user['Role']) + role = MISPRole() + role.from_dict(**user_j['Role']) usersettings = [] - if user['UserSetting']: - for name, value in user['UserSetting'].items(): + if user_j['UserSetting']: + for name, value in user_j['UserSetting'].items(): us = MISPUserSetting() us.from_dict(**{'name': name, 'value': value}) usersettings.append(us) - return u, r, usersettings + return u, role, usersettings def add_user(self, user: MISPUser, pythonify: bool=False) -> Union[dict, MISPUser]: '''Add a new user''' - user = self._prepare_request('POST', f'admin/users/add', data=user) - user = self._check_response(user, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in user: - return user + r = self._prepare_request('POST', f'admin/users/add', data=user) + user_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in user_j: + return user_j u = MISPUser() - u.from_dict(**user) + u.from_dict(**user_j) return u def update_user(self, user: MISPUser, user_id: int=None, pythonify: bool=False) -> Union[dict, MISPUser]: '''Update an event on a MISP instance''' if user_id is None: - user_id = self.__get_uuid_or_id_from_abstract_misp(user) + uid = self.__get_uuid_or_id_from_abstract_misp(user) else: - user_id = self.__get_uuid_or_id_from_abstract_misp(user_id) - url = f'users/edit/{user_id}' + uid = self.__get_uuid_or_id_from_abstract_misp(user_id) + url = f'users/edit/{uid}' if self._current_role.perm_admin or self._current_role.perm_site_admin: url = f'admin/{url}' - updated_user = self._prepare_request('POST', url, data=user) - updated_user = self._check_response(updated_user, expect_json=True) + r = self._prepare_request('POST', url, data=user) + updated_user = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in updated_user: return updated_user e = MISPUser() @@ -1307,34 +1304,34 @@ class PyMISP: # NOTE: MISP in inconsistent and currently require "delete" in the path and doesn't support HTTP DELETE user_id = self.__get_uuid_or_id_from_abstract_misp(user) response = self._prepare_request('POST', f'admin/users/delete/{user_id}') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def change_user_password(self, new_password: str, user: Optional[Union[MISPUser, int, str, UUID]]=None) -> dict: response = self._prepare_request('POST', f'users/change_pw', data={'password': new_password}) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END User ### # ## BEGIN Role ### - def roles(self, pythonify: bool=False) -> List[Union[dict, MISPRole]]: + def roles(self, pythonify: bool=False) -> Union[dict, List[MISPRole]]: """Get the existing roles""" - roles = self._prepare_request('GET', 'roles') - roles = self._check_response(roles, expect_json=True) + r = self._prepare_request('GET', 'roles') + roles = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in roles: return roles to_return = [] for role in roles: - r = MISPRole() - r.from_dict(**role) - to_return.append(r) + nr = MISPRole() + nr.from_dict(**role) + to_return.append(nr) return to_return def set_default_role(self, role: Union[MISPRole, int, str, UUID]) -> dict: role_id = self.__get_uuid_or_id_from_abstract_misp(role) url = urljoin(self.root_url, f'/admin/roles/set_default/{role_id}') response = self._prepare_request('POST', url) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END Role ### @@ -1348,21 +1345,31 @@ class PyMISP: org: Optional[SearchParameterTypes]=None, tags: Optional[SearchParameterTypes]=None, quick_filter: Optional[str]=None, quickFilter: Optional[str]=None, - date_from: Optional[DateTypes]=None, - date_to: Optional[DateTypes]=None, + date_from: Optional[Union[datetime, date, int, str, float, None]]=None, + date_to: Optional[Union[datetime, date, int, str, float, None]]=None, eventid: Optional[SearchType]=None, with_attachments: Optional[bool]=None, withAttachments: Optional[bool]=None, metadata: Optional[bool]=None, uuid: Optional[str]=None, - publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None, - timestamp: Optional[DateInterval]=None, + publish_timestamp: Optional[Union[Union[datetime, date, int, str, float, None], + Tuple[Union[datetime, date, int, str, float, None], + Union[datetime, date, int, str, float, None]] + ]]=None, + last: Optional[Union[Union[datetime, date, int, str, float, None], + Tuple[Union[datetime, date, int, str, float, None], + Union[datetime, date, int, str, float, None]] + ]]=None, + timestamp: Optional[Union[Union[datetime, date, int, str, float, None], + Tuple[Union[datetime, date, int, str, float, None], + Union[datetime, date, int, str, float, None]] + ]]=None, published: Optional[bool]=None, enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None, to_ids: Optional[Union[ToIDSType, List[ToIDSType]]]=None, deleted: Optional[str]=None, include_event_uuid: Optional[bool]=None, includeEventUuid: Optional[bool]=None, include_event_tags: Optional[bool]=None, includeEventTags: Optional[bool]=None, - event_timestamp: Optional[DateTypes]=None, + event_timestamp: Optional[Union[datetime, date, int, str, float, None]]=None, sg_reference_only: Optional[bool]=None, eventinfo: Optional[str]=None, searchall: Optional[bool]=None, @@ -1372,7 +1379,7 @@ class PyMISP: include_sightings: Optional[bool]=None, includeSightings: Optional[bool]=None, include_correlations: Optional[bool]=None, includeCorrelations: Optional[bool]=None, pythonify: Optional[bool]=False, - **kwargs) -> List[Union[dict, MISPEvent, MISPAttribute]]: + **kwargs) -> Union[dict, str, List[Union[MISPEvent, MISPAttribute]]]: '''Search in the MISP instance :param return_format: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload. @@ -1480,8 +1487,8 @@ class PyMISP: query['published'] = published query['enforceWarninglist'] = self._make_misp_bool(enforce_warninglist) if to_ids is not None: - if int(to_ids) not in [0, 1]: - raise ValueError('to_ids has to be in {}'.format(', '.join([0, 1]))) + if to_ids not in [0, 1, '0', '1']: + raise ValueError('to_ids has to be in 0 or 1') query['to_ids'] = to_ids query['deleted'] = deleted query['includeEventUuid'] = self._make_misp_bool(include_event_uuid) @@ -1501,20 +1508,22 @@ class PyMISP: query['includeCorrelations'] = self._make_misp_bool(include_correlations) url = urljoin(self.root_url, f'{controller}/restSearch') response = self._prepare_request('POST', url, data=query) - if return_format == 'json': - normalized_response = self._check_response(response, expect_json=True) - else: - normalized_response = self._check_response(response) - if return_format == 'csv' and (self.global_pythonify or pythonify) and not headerless: - return self._csv_to_dict(normalized_response) + if return_format == 'csv': + normalized_response_text = self._check_response(response) + if (self.global_pythonify or pythonify) and not headerless: + return self._csv_to_dict(normalized_response_text) # type: ignore + else: + return normalized_response_text + + normalized_response = self._check_json_response(response) if 'errors' in normalized_response: return normalized_response if return_format == 'json' and self.global_pythonify or pythonify: # The response is in json, we can convert it to a list of pythonic MISP objects - to_return = [] + to_return: List[Union[MISPEvent, MISPAttribute]] = [] if controller == 'events': for e in normalized_response: me = MISPEvent() @@ -1522,7 +1531,7 @@ class PyMISP: to_return.append(me) elif controller == 'attributes': # FIXME: obvs, this is hurting my soul. We need something generic. - for a in normalized_response.get('Attribute'): + for a in normalized_response['Attribute']: ma = MISPAttribute() ma.from_dict(**a) if 'Event' in ma: @@ -1556,15 +1565,18 @@ class PyMISP: def search_index(self, published: Optional[bool]=None, eventid: Optional[SearchType]=None, tags: Optional[SearchParameterTypes]=None, - date_from: Optional[DateTypes]=None, - date_to: Optional[DateTypes]=None, + date_from: Optional[Union[datetime, date, int, str, float, None]]=None, + date_to: Optional[Union[datetime, date, int, str, float, None]]=None, eventinfo: Optional[str]=None, threatlevel: Optional[List[SearchType]]=None, distribution: Optional[List[SearchType]]=None, analysis: Optional[List[SearchType]]=None, org: Optional[SearchParameterTypes]=None, - timestamp: Optional[DateInterval]=None, - pythonify: Optional[bool]=None) -> List[Union[dict, MISPEvent]]: + timestamp: Optional[Union[Union[datetime, date, int, str, float, None], + Tuple[Union[datetime, date, int, str, float, None], + Union[datetime, date, int, str, float, None]] + ]]=None, + pythonify: Optional[bool]=None) -> Union[dict, List[MISPEvent]]: """Search only at the index level. Using ! in front of a value means NOT (default is OR) :param published: Set whether published or unpublished events should be returned. Do not set the parameter if you want both. @@ -1597,7 +1609,7 @@ class PyMISP: url = urljoin(self.root_url, 'events/index') response = self._prepare_request('POST', url, data=query) - normalized_response = self._check_response(response, expect_json=True) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify): return normalized_response @@ -1611,15 +1623,22 @@ class PyMISP: def search_sightings(self, context: Optional[str]=None, context_id: Optional[SearchType]=None, type_sighting: Optional[str]=None, - date_from: Optional[DateTypes]=None, - date_to: Optional[DateTypes]=None, - publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None, + date_from: Optional[Union[datetime, date, int, str, float, None]]=None, + date_to: Optional[Union[datetime, date, int, str, float, None]]=None, + publish_timestamp: Optional[Union[Union[datetime, date, int, str, float, None], + Tuple[Union[datetime, date, int, str, float, None], + Union[datetime, date, int, str, float, None]] + ]]=None, + last: Optional[Union[Union[datetime, date, int, str, float, None], + Tuple[Union[datetime, date, int, str, float, None], + Union[datetime, date, int, str, float, None]] + ]]=None, org: Optional[SearchType]=None, source: Optional[str]=None, include_attribute: Optional[bool]=None, include_event_meta: Optional[bool]=None, pythonify: Optional[bool]=False - ) -> List[Union[dict, MISPSighting]]: + ) -> Union[dict, List[Dict[str, Union[MISPEvent, MISPAttribute, MISPSighting]]]]: '''Search sightings :param context: The context of the search. Can be either "attribute", "event", or nothing (will then match on events and attributes). @@ -1645,7 +1664,7 @@ class PyMISP: [ ... ] >>> misp.search_sightings(context='event', context_id=17, include_event_meta=True, org=2) # return list of sighting for event 17 filtered with org id 2 ''' - query = {'returnFormat': 'json'} + query: Dict[str, Any] = {'returnFormat': 'json'} if context is not None: if context not in ['attribute', 'event']: raise ValueError('context has to be in {}'.format(', '.join(['attribute', 'event']))) @@ -1666,14 +1685,14 @@ class PyMISP: url = urljoin(self.root_url, url_path) response = self._prepare_request('POST', url, data=query) - normalized_response = self._check_response(response, expect_json=True) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or 'errors' in normalized_response: return normalized_response if self.global_pythonify or pythonify: to_return = [] for s in normalized_response: - entries = {} + entries: Dict[str, Union[MISPEvent, MISPAttribute, MISPSighting]] = {} s_data = s['Sighting'] if include_event_meta: e = s_data.pop('Event') @@ -1694,11 +1713,11 @@ class PyMISP: def search_logs(self, limit: Optional[int]=None, page: Optional[int]=None, log_id: Optional[int]=None, title: Optional[str]=None, - created: Optional[DateTypes]=None, model: Optional[str]=None, + created: Optional[Union[datetime, date, int, str, float, None]]=None, model: Optional[str]=None, action: Optional[str]=None, user_id: Optional[int]=None, change: Optional[str]=None, email: Optional[str]=None, org: Optional[str]=None, description: Optional[str]=None, - ip: Optional[str]=None, pythonify: Optional[bool]=False) -> List[Union[dict, MISPLog]]: + ip: Optional[str]=None, pythonify: Optional[bool]=False) -> Union[dict, List[MISPLog]]: '''Search in logs Note: to run substring queries simply append/prepend/encapsulate the search term with % @@ -1725,7 +1744,7 @@ class PyMISP: query['id'] = query.pop('log_id') response = self._prepare_request('POST', 'admin/logs/index', data=query) - normalized_response = self._check_response(response, expect_json=True) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or 'errors' in normalized_response: return normalized_response @@ -1736,10 +1755,10 @@ class PyMISP: to_return.append(ml) return to_return - def search_feeds(self, value: Optional[SearchParameterTypes]=None, pythonify: Optional[bool]=False) -> List[Union[dict, MISPFeed]]: + def search_feeds(self, value: Optional[SearchParameterTypes]=None, pythonify: Optional[bool]=False) -> Union[dict, List[MISPFeed]]: '''Search in the feeds cached on the servers''' response = self._prepare_request('POST', '/feeds/searchCaches', data={'value': value}) - normalized_response = self._check_response(response, expect_json=True) + normalized_response = self._check_json_response(response) if not (self.global_pythonify or pythonify) or 'errors' in normalized_response: return normalized_response to_return = [] @@ -1753,10 +1772,10 @@ class PyMISP: # ## BEGIN Communities ### - def communities(self, pythonify: bool=False) -> List[Union[dict, MISPCommunity]]: + def communities(self, pythonify: bool=False) -> Union[dict, List[MISPCommunity]]: """Get all the communities.""" - communities = self._prepare_request('GET', 'communities') - communities = self._check_response(communities, expect_json=True) + r = self._prepare_request('GET', 'communities') + communities = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in communities: return communities to_return = [] @@ -1769,12 +1788,12 @@ class PyMISP: def get_community(self, community: Union[MISPCommunity, int, str, UUID], pythonify: bool=False) -> Union[dict, MISPCommunity]: '''Get an community from a MISP instance''' community_id = self.__get_uuid_or_id_from_abstract_misp(community) - community = self._prepare_request('GET', f'communities/view/{community_id}') - community = self._check_response(community, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in community: - return community + r = self._prepare_request('GET', f'communities/view/{community_id}') + community_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in community_j: + return community_j c = MISPCommunity() - c.from_dict(**community) + c.from_dict(**community_j) return c def request_community_access(self, community: Union[MISPCommunity, int, str, UUID], @@ -1794,16 +1813,16 @@ class PyMISP: 'message': message, 'anonymise': anonymise_requestor_server, 'sync': sync, 'mock': mock} r = self._prepare_request('POST', f'communities/requestAccess/{community_id}', data=to_post) - return self._check_response(r, expect_json=True) + return self._check_json_response(r) # ## END Communities ### # ## BEGIN Event Delegation ### - def event_delegations(self, pythonify: bool=False) -> List[Union[dict, MISPEventDelegation]]: + def event_delegations(self, pythonify: bool=False) -> Union[dict, List[MISPEventDelegation]]: """Get all the event delegations.""" - delegations = self._prepare_request('GET', 'event_delegations') - delegations = self._check_response(delegations, expect_json=True) + r = self._prepare_request('GET', 'event_delegations') + delegations = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in delegations: return delegations to_return = [] @@ -1815,13 +1834,13 @@ class PyMISP: def accept_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False) -> dict: delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation) - delegation = self._prepare_request('POST', f'event_delegations/acceptDelegation/{delegation_id}') - return self._check_response(delegation, expect_json=True) + r = self._prepare_request('POST', f'event_delegations/acceptDelegation/{delegation_id}') + return self._check_json_response(r) def discard_event_delegation(self, delegation: Union[MISPEventDelegation, int, str], pythonify: bool=False) -> dict: delegation_id = self.__get_uuid_or_id_from_abstract_misp(delegation) - delegation = self._prepare_request('POST', f'event_delegations/deleteDelegation/{delegation_id}') - return self._check_response(delegation, expect_json=True) + r = self._prepare_request('POST', f'event_delegations/deleteDelegation/{delegation_id}') + return self._check_json_response(r) def delegate_event(self, event: Optional[Union[MISPEvent, int, str, UUID]]=None, organisation: Optional[Union[MISPOrganisation, int, str, UUID]]=None, @@ -1832,16 +1851,16 @@ class PyMISP: event_id = self.__get_uuid_or_id_from_abstract_misp(event) organisation_id = self.__get_uuid_or_id_from_abstract_misp(organisation) data = {'event_id': event_id, 'org_id': organisation_id, 'distribution': distribution, 'message': message} + r = self._prepare_request('POST', f'event_delegations/delegateEvent/{event_id}', data=data) elif event_delegation: - data = event_delegation + r = self._prepare_request('POST', f'event_delegations/delegateEvent/{event_id}', data=event_delegation) else: raise PyMISPError('Either event and organisation OR event_delegation are required.') - delegation = self._prepare_request('POST', f'event_delegations/delegateEvent/{event_id}', data=data) - delegation = self._check_response(delegation, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in delegation: - return delegation + delegation_j = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in delegation_j: + return delegation_j d = MISPEventDelegation() - d.from_dict(**delegation) + d.from_dict(**delegation_j) return d # ## END Event Delegation ### @@ -1852,9 +1871,9 @@ class PyMISP: """Force push an event on ZMQ""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) response = self._prepare_request('POST', f'events/pushEventToZMQ/{event_id}.json') - return self._check_response(response, expect_json=True) + return self._check_json_response(response) - def direct_call(self, url: str, data: Optional[dict]=None, params: dict={}, kw_params: dict={}) -> dict: + def direct_call(self, url: str, data: Optional[dict]=None, params: dict={}, kw_params: dict={}) -> Any: '''Very lightweight call that posts a data blob (python dictionary or json string) on the URL''' if data is None: response = self._prepare_request('GET', url, params=params, kw_params=kw_params) @@ -1863,21 +1882,21 @@ class PyMISP: return self._check_response(response, lenient_response_type=True) def freetext(self, event: Union[MISPEvent, int, str, UUID], string: str, adhereToWarninglists: Union[bool, str]=False, - distribution: Optional[int]=None, returnMetaAttributes: bool=False, pythonify: bool=False, **kwargs) -> List[Union[dict, MISPAttribute]]: + distribution: Optional[int]=None, returnMetaAttributes: bool=False, pythonify: bool=False, **kwargs) -> Union[dict, List[MISPAttribute]]: """Pass a text to the freetext importer""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) - query = {"value": string} + query: Dict[str, Any] = {"value": string} wl_params = [False, True, 'soft'] if adhereToWarninglists in wl_params: query['adhereToWarninglists'] = adhereToWarninglists else: - raise PyMISPError('Invalid parameter, adhereToWarninglists Can only be {}'.format(', '.join(wl_params))) + raise PyMISPError('Invalid parameter, adhereToWarninglists Can only be False, True, or soft') if distribution is not None: query['distribution'] = distribution if returnMetaAttributes: query['returnMetaAttributes'] = returnMetaAttributes - attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query, **kwargs) - attributes = self._check_response(attributes, expect_json=True) + r = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query, **kwargs) + attributes = self._check_json_response(r) if returnMetaAttributes or not (self.global_pythonify or pythonify) or 'errors' in attributes: return attributes to_return = [] @@ -1887,7 +1906,7 @@ class PyMISP: to_return.append(a) return to_return - def upload_stix(self, path, version: str='2') -> dict: + def upload_stix(self, path, version: str='2'): """Upload a STIX file to MISP. :param path: Path to the STIX on the disk (can be a path-like object, or a pseudofile) :param version: Can be 1 or 2 @@ -1899,15 +1918,14 @@ class PyMISP: to_post = path.read() if isinstance(to_post, bytes): - to_post = to_post.decode() + to_post = to_post.decode() # type: ignore if str(version) == '1': url = urljoin(self.root_url, '/events/upload_stix') - response = self._prepare_request('POST', url, data=to_post, output_type='xml') + response = self._prepare_request('POST', url, data=to_post, output_type='xml') # type: ignore else: url = urljoin(self.root_url, '/events/upload_stix/2') - response = self._prepare_request('POST', url, data=to_post) - + response = self._prepare_request('POST', url, data=to_post) # type: ignore return response # ## END Others ### @@ -1924,22 +1942,22 @@ class PyMISP: else: path = f'attributes/attributeStatistics/{context}' response = self._prepare_request('GET', path) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def tags_statistics(self, percentage: bool=False, name_sort: bool=False) -> dict: """Get tags statistics from the MISP instance""" # FIXME: https://github.com/MISP/MISP/issues/4874 # NOTE: https://github.com/MISP/MISP/issues/4879 if percentage: - percentage = 'true' + p = 'true' else: - percentage = 'false' + p = 'false' if name_sort: - name_sort = 'true' + ns = 'true' else: - name_sort = 'false' - response = self._prepare_request('GET', f'tags/tagStatistics/{percentage}/{name_sort}') - return self._check_response(response) + ns = 'false' + response = self._prepare_request('GET', f'tags/tagStatistics/{p}/{ns}') + return self._check_json_response(response) def users_statistics(self, context: str='data') -> dict: """Get users statistics from the MISP instance""" @@ -1947,16 +1965,16 @@ class PyMISP: if context not in availables_contexts: raise PyMISPError("context can only be {','.join(availables_contexts)}") response = self._prepare_request('GET', f'users/statistics/{context}') - return self._check_response(response) + return self._check_json_response(response) # ## END Statistics ### # ## BEGIN User Settings ### - def user_settings(self, pythonify: bool=False) -> List[Union[dict, MISPUserSetting]]: + def user_settings(self, pythonify: bool=False) -> Union[dict, List[MISPUserSetting]]: """Get all the user settings.""" - user_settings = self._prepare_request('GET', 'user_settings') - user_settings = self._check_response(user_settings, expect_json=True) + r = self._prepare_request('GET', 'user_settings') + user_settings = self._check_json_response(r) if not (self.global_pythonify or pythonify) or 'errors' in user_settings: return user_settings to_return = [] @@ -1969,47 +1987,47 @@ class PyMISP: def get_user_setting(self, user_setting: str, user: Optional[Union[MISPUser, int, str, UUID]]=None, pythonify: bool=False) -> Union[dict, MISPUserSetting]: '''Get an user setting''' - query = {'setting': user_setting} + query: Dict[str, Any] = {'setting': user_setting} if user: query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user) response = self._prepare_request('POST', f'user_settings/getSetting') - user_setting = self._check_response(response, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in user_setting: - return user_setting + user_setting_j = self._check_json_response(response) + if not (self.global_pythonify or pythonify) or 'errors' in user_setting_j: + return user_setting_j u = MISPUserSetting() - u.from_dict(**user_setting) + u.from_dict(**user_setting_j) return u def set_user_setting(self, user_setting: str, value: Union[str, dict], user: Optional[Union[MISPUser, int, str, UUID]]=None, pythonify: bool=False) -> Union[dict, MISPUserSetting]: '''Get an user setting''' - query = {'setting': user_setting} + query: Dict[str, Any] = {'setting': user_setting} if isinstance(value, dict): value = json.dumps(value) query['value'] = value if user: query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user) response = self._prepare_request('POST', f'user_settings/setSetting', data=query) - user_setting = self._check_response(response, expect_json=True) - if not (self.global_pythonify or pythonify) or 'errors' in user_setting: - return user_setting + user_setting_j = self._check_json_response(response) + if not (self.global_pythonify or pythonify) or 'errors' in user_setting_j: + return user_setting_j u = MISPUserSetting() - u.from_dict(**user_setting) + u.from_dict(**user_setting_j) return u def delete_user_setting(self, user_setting: str, user: Union[MISPUser, int, str, UUID]=None) -> dict: '''Delete a user setting''' - query = {'setting': user_setting} + query: Dict[str, Any] = {'setting': user_setting} if user: query['user_id'] = self.__get_uuid_or_id_from_abstract_misp(user) response = self._prepare_request('POST', f'user_settings/delete', data=query) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) # ## END User Settings ### # ## BEGIN Global helpers ### - def change_sharing_group_on_entity(self, misp_entity: AbstractMISP, sharing_group_id, pythonify: bool=False) -> Union[dict, MISPEvent, MISPObject, MISPAttribute]: + def change_sharing_group_on_entity(self, misp_entity: Union[MISPEvent, MISPAttribute, MISPObject], sharing_group_id, pythonify: bool=False) -> Union[dict, MISPEvent, MISPObject, MISPAttribute, MISPShadowAttribute]: """Change the sharing group of an event, an attribute, or an object""" misp_entity.distribution = 4 # Needs to be 'Sharing group' if 'SharingGroup' in misp_entity: # Delete former SharingGroup information @@ -2028,27 +2046,30 @@ class PyMISP: def tag(self, misp_entity: Union[AbstractMISP, str], tag: Union[MISPTag, str], local: bool=False) -> dict: """Tag an event or an attribute. misp_entity can be a MISPEvent, a MISP Attribute, or a UUID""" - if 'uuid' in misp_entity: + if isinstance(misp_entity, AbstractMISP) and 'uuid' in misp_entity: uuid = misp_entity.uuid - else: + elif isinstance(misp_entity, str): uuid = misp_entity if isinstance(tag, MISPTag): tag = tag.name to_post = {'uuid': uuid, 'tag': tag, 'local': local} response = self._prepare_request('POST', 'tags/attachTagToObject', data=to_post) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def untag(self, misp_entity: Union[AbstractMISP, str], tag: Union[MISPTag, str]) -> dict: """Untag an event or an attribute. misp_entity can be a UUID""" - if 'uuid' in misp_entity: + if isinstance(misp_entity, AbstractMISP) and 'uuid' in misp_entity: uuid = misp_entity.uuid - else: + elif isinstance(misp_entity, str): uuid = misp_entity if isinstance(tag, MISPTag): - tag = tag.name - to_post = {'uuid': uuid, 'tag': tag} + if 'name' in tag: + tag_name = tag.name + else: + tag_name = tag + to_post = {'uuid': uuid, 'tag': tag_name} response = self._prepare_request('POST', 'tags/removeTagFromObject', data=to_post) - return self._check_response(response, expect_json=True) + return self._check_json_response(response) def build_complex_query(self, or_parameters: Optional[List[SearchType]]=None, and_parameters: Optional[List[SearchType]]=None, @@ -2105,8 +2126,10 @@ class PyMISP: return 0 return 1 if int(parameter) else 0 - def _make_timestamp(self, value: DateTypes) -> Union[str, int]: + def _make_timestamp(self, value: Union[datetime, date, int, str, float, None]) -> Union[str, int, float, None]: '''Catch-all method to normalize anything that can be converted to a timestamp''' + if not value: + return None if isinstance(value, datetime): return value.timestamp() @@ -2124,7 +2147,13 @@ class PyMISP: return value return value - def _check_response(self, response: requests.Response, lenient_response_type: bool=False, expect_json: bool=False) -> dict: + def _check_json_response(self, response: requests.Response) -> dict: # type: ignore + r = self._check_response(response, expect_json=True) + if isinstance(r, (dict, list)): + return r + # Else: an exception was raised anyway + + def _check_response(self, response: requests.Response, lenient_response_type: bool=False, expect_json: bool=False) -> Union[dict, str]: """Check if the response from the server is not an unexpected error""" if response.status_code >= 500: logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text)) @@ -2143,19 +2172,19 @@ class PyMISP: # At this point, we had no error. try: - response = response.json() + response_json = response.json() if logger.isEnabledFor(logging.DEBUG): - logger.debug(response) - if isinstance(response, dict) and response.get('response') is not None: + logger.debug(response_json) + if isinstance(response_json, dict) and response_json.get('response') is not None: # Cleanup. - response = response['response'] - return response + response_json = response_json['response'] + return response_json except Exception: if logger.isEnabledFor(logging.DEBUG): logger.debug(response.text) if expect_json: raise PyMISPUnexpectedResponse(f'Unexpected response from server: {response.text}') - if lenient_response_type and not response.headers.get('content-type').startswith('application/json'): + if lenient_response_type and not response.headers['content-type'].startswith('application/json'): return response.text if not response.content: # Empty response @@ -2166,27 +2195,29 @@ class PyMISP: def __repr__(self): return f'<{self.__class__.__name__}(url={self.root_url})' - def _prepare_request(self, request_type: str, url: str, data: Union[dict, AbstractMISP]={}, params: dict={}, + def _prepare_request(self, request_type: str, url: str, data: Union[str, list, dict, AbstractMISP]={}, params: dict={}, kw_params: dict={}, output_type: str='json') -> requests.Response: '''Prepare a request for python-requests''' url = urljoin(self.root_url, url) - if data: + if data == {} or isinstance(data, str): + d = data + elif data: if not isinstance(data, str): # Else, we already have a text blob to send if isinstance(data, dict): # Else, we can directly json encode. # Remove None values. data = {k: v for k, v in data.items() if v is not None} - data = json.dumps(data, default=pymisp_json_default) + d = json.dumps(data, default=pymisp_json_default) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'{request_type} - {url}') - if data is not None: - logger.debug(data) + if d is not None: + logger.debug(d) if kw_params: # CakePHP params in URL to_append_url = '/'.join([f'{k}:{v}' for k, v in kw_params.items()]) url = f'{url}/{to_append_url}' - req = requests.Request(request_type, url, data=data, params=params) + req = requests.Request(request_type, url, data=d, params=params) with requests.Session() as s: user_agent = f'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}' if self.tool: @@ -2206,9 +2237,9 @@ class PyMISP: def _csv_to_dict(self, csv_content: str) -> List[dict]: '''Makes a list of dict out of a csv file (requires headers)''' fieldnames, lines = csv_content.split('\n', 1) - fieldnames = fieldnames.split(',') + fields = fieldnames.split(',') to_return = [] for line in csv.reader(lines.split('\n')): if line: - to_return.append({fname: value for fname, value in zip(fieldnames, line)}) + to_return.append({fname: value for fname, value in zip(fields, line)}) return to_return diff --git a/pymisp/data/describeTypes.json b/pymisp/data/describeTypes.json index eb6fda5..3440529 100644 --- a/pymisp/data/describeTypes.json +++ b/pymisp/data/describeTypes.json @@ -240,6 +240,7 @@ "attachment", "authentihash", "cdhash", + "chrome-extension-id", "comment", "domain", "email-attachment", @@ -321,6 +322,7 @@ "attachment", "authentihash", "cdhash", + "chrome-extension-id", "comment", "filename", "filename|authentihash", @@ -512,6 +514,10 @@ "default_category": "Payload delivery", "to_ids": 1 }, + "chrome-extension-id": { + "default_category": "Payload delivery", + "to_ids": 1 + }, "comment": { "default_category": "Other", "to_ids": 0 @@ -1121,6 +1127,7 @@ "campaign-name", "cc-number", "cdhash", + "chrome-extension-id", "comment", "community-id", "cookie", diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 9bfaa00..9489fb1 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -77,6 +77,14 @@ class MISPOrganisation(AbstractMISP): kwargs = kwargs['Organisation'] super(MISPOrganisation, self).from_dict(**kwargs) +class MISPSharingGroup(AbstractMISP): + + def from_dict(self, **kwargs): + if 'SharingGroup' in kwargs: + kwargs = kwargs['SharingGroup'] + super().from_dict(**kwargs) + + class MISPShadowAttribute(AbstractMISP): @@ -133,10 +141,29 @@ class MISPAttribute(AbstractMISP): self.__category_type_mapping: dict = self.describe_types['category_type_mappings'] self.__sane_default: dict = self.describe_types['sane_defaults'] self.__strict: bool = strict - self._data = None + self._data: Optional[BytesIO] = None self.uuid: str = str(uuid.uuid4()) self.ShadowAttribute: List[MISPShadowAttribute] = [] + self.SharingGroup: MISPSharingGroup self.Sighting: List[MISPSighting] = [] + self.Tag: List[MISPTag] = [] + + # For search + self.Event: MISPEvent + self.RelatedAttribute: List[MISPAttribute] + + def add_tag(self, tag: Optional[Union[str, MISPTag, dict]], **kwargs) -> MISPTag: + return super()._add_tag(tag, **kwargs) + + @property + def tags(self) -> List[MISPTag]: + """Returns a lost of tags associated to this Attribute""" + return self.Tag + + @tags.setter + def tags(self, tags: List[MISPTag]): + """Set a list of prepared MISPTag.""" + super()._set_tags(tags) def hash_values(self, algorithm: str='sha512') -> List[str]: """Compute the hash of every values for fast lookups""" @@ -333,6 +360,10 @@ class MISPAttribute(AbstractMISP): if kwargs.get('ShadowAttribute'): [self.add_shadow_attribute(s_attr) for s_attr in kwargs.pop('ShadowAttribute')] + if kwargs.get('SharingGroup'): + for sg in kwargs.pop('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**sg) # If the user wants to disable correlation, let them. Defaults to False. self.disable_correlation = kwargs.pop("disable_correlation", False) if self.disable_correlation is None: @@ -383,8 +414,8 @@ class MISPAttribute(AbstractMISP): @data.setter def data(self, data: Union[Path, str, bytes, BytesIO]): if isinstance(data, Path): - with data.open('rb') as f: - self._data = BytesIO(f.read()) + with data.open('rb') as f_temp: + self._data = BytesIO(f_temp.read()) if isinstance(data, (str, bytes)): self._data = BytesIO(base64.b64decode(data)) elif isinstance(data, BytesIO): @@ -451,6 +482,9 @@ class MISPObjectReference(AbstractMISP): def __init__(self): super().__init__() self.uuid = str(uuid.uuid4()) + self.object_uuid: str + self.referenced_uuid: str + self.relationship_type: str def _set_default(self): if not hasattr(self, 'comment'): @@ -492,6 +526,7 @@ class MISPObject(AbstractMISP): self._strict: bool = strict self.name: str = name self._known_template: bool = False + self.id: int self._set_template(kwargs.get('misp_objects_path_custom')) @@ -499,11 +534,13 @@ class MISPObject(AbstractMISP): self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes] self.ObjectReference: List[MISPObjectReference] = [] self.Attribute: List[MISPAttribute] = [] + self.SharingGroup: MISPSharingGroup + self._default_attributes_parameters: dict if isinstance(default_attributes_parameters, MISPAttribute): # Just make sure we're not modifying an existing MISPAttribute - self._default_attributes_parameters: dict = default_attributes_parameters.to_dict() + self._default_attributes_parameters = default_attributes_parameters.to_dict() else: - self._default_attributes_parameters: dict = default_attributes_parameters + self._default_attributes_parameters = default_attributes_parameters if self._default_attributes_parameters: # Let's clean that up self._default_attributes_parameters.pop('value', None) # duh @@ -558,7 +595,10 @@ class MISPObject(AbstractMISP): def _set_template(self, misp_objects_path_custom: Optional[Union[Path, str]]=None): if misp_objects_path_custom: # If misp_objects_path_custom is given, and an object with the given name exists, use that. - self.misp_objects_path = misp_objects_path_custom + if isinstance(misp_objects_path_custom, str): + self.misp_objects_path = Path(misp_objects_path_custom) + else: + self.misp_objects_path = misp_objects_path_custom # Try to get the template self._known_template = self._load_template_path(self.misp_objects_path / self.name / 'definition.json') @@ -628,6 +668,10 @@ class MISPObject(AbstractMISP): if kwargs.get('ObjectReference'): [self.add_reference(**r) for r in kwargs.pop('ObjectReference')] + if kwargs.get('SharingGroup'): + for sg in kwargs.pop('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**sg) # Not supported yet - https://github.com/MISP/PyMISP/issues/168 # if kwargs.get('Tag'): # for tag in kwargs.pop('Tag'): @@ -763,6 +807,21 @@ class MISPEvent(AbstractMISP): self.Object: List[MISPObject] = [] self.RelatedEvent: List[MISPEvent] = [] self.ShadowAttribute: List[MISPShadowAttribute] = [] + self.SharingGroup: MISPSharingGroup + self.Tag: List[MISPTag] = [] + + def add_tag(self, tag: Optional[Union[str, MISPTag, dict]], **kwargs) -> MISPTag: + return super()._add_tag(tag, **kwargs) + + @property + def tags(self) -> List[MISPTag]: + """Returns a lost of tags associated to this Event""" + return self.Tag + + @tags.setter + def tags(self, tags: List[MISPTag]): + """Set a list of prepared MISPTag.""" + super()._set_tags(tags) def _set_default(self): """There are a few keys that could, or need to be set by default for the feed generator""" @@ -928,13 +987,14 @@ class MISPEvent(AbstractMISP): def load(self, json_event: Union[IO, str, bytes, dict], validate: bool=False, metadata_only: bool=False): """Load a JSON dump from a pseudo file or a JSON string""" - if hasattr(json_event, 'read'): + if isinstance(json_event, IO): # python2 and python3 compatible to find if we have a file json_event = json_event.read() if isinstance(json_event, (str, bytes)): json_event = json.loads(json_event) - if json_event.get('response'): - event = json_event.get('response')[0] + + if isinstance(json_event, dict) and 'response' in json_event and isinstance(json_event['response'], list): + event = json_event['response'][0] else: event = json_event if not event: @@ -1028,7 +1088,9 @@ class MISPEvent(AbstractMISP): if kwargs.get('Orgc'): self.Orgc = MISPOrganisation() self.Orgc.from_dict(**kwargs.pop('Orgc')) - + if kwargs.get('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) super(MISPEvent, self).from_dict(**kwargs) def to_dict(self) -> dict: @@ -1276,6 +1338,10 @@ class MISPObjectTemplate(AbstractMISP): class MISPUser(AbstractMISP): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.email: str + def from_dict(self, **kwargs): if 'User' in kwargs: kwargs = kwargs['User'] @@ -1331,6 +1397,11 @@ class MISPNoticelist(AbstractMISP): class MISPRole(AbstractMISP): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.perm_admin: int + self.perm_site_admin: int + def from_dict(self, **kwargs): if 'Role' in kwargs: kwargs = kwargs['Role'] @@ -1345,16 +1416,14 @@ class MISPServer(AbstractMISP): super().from_dict(**kwargs) -class MISPSharingGroup(AbstractMISP): - - def from_dict(self, **kwargs): - if 'SharingGroup' in kwargs: - kwargs = kwargs['SharingGroup'] - super().from_dict(**kwargs) - - class MISPLog(AbstractMISP): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.model: str + self.action: str + self.title: str + def from_dict(self, **kwargs): if 'Log' in kwargs: kwargs = kwargs['Log'] @@ -1366,6 +1435,12 @@ class MISPLog(AbstractMISP): class MISPEventDelegation(AbstractMISP): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.org_id: int + self.requester_org_id: int + self.event_id: int + def from_dict(self, **kwargs): if 'EventDelegation' in kwargs: kwargs = kwargs['EventDelegation'] @@ -1384,7 +1459,9 @@ class MISPObjectAttribute(MISPAttribute): super().__init__() self._definition = definition - def from_dict(self, object_relation: str, value: Union[str, int, float], **kwargs): + + def from_dict(self, object_relation: str, value: Union[str, int, float], **kwargs): # type: ignore + # NOTE: Signature of "from_dict" incompatible with supertype "MISPAttribute" self.object_relation = object_relation self.value = value if 'Attribute' in kwargs: diff --git a/pymisp/py.typed b/pymisp/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pymisp/tools/abstractgenerator.py b/pymisp/tools/abstractgenerator.py index 194d029..9985ccd 100644 --- a/pymisp/tools/abstractgenerator.py +++ b/pymisp/tools/abstractgenerator.py @@ -5,11 +5,11 @@ from .. import MISPObject from ..exceptions import InvalidMISPObject from datetime import datetime, date from dateutil.parser import parse - +from typing import Union, Optional class AbstractMISPObjectGenerator(MISPObject): - def _detect_epoch(self, timestamp): + def _detect_epoch(self, timestamp: Union[str, int, float]) -> bool: try: tmp = float(timestamp) if tmp < 30000000: @@ -20,7 +20,7 @@ class AbstractMISPObjectGenerator(MISPObject): except ValueError: return False - def _sanitize_timestamp(self, timestamp): + def _sanitize_timestamp(self, timestamp: Optional[Union[datetime, date, dict, str, int, float]]=None) -> datetime: if not timestamp: return datetime.now() @@ -31,12 +31,14 @@ class AbstractMISPObjectGenerator(MISPObject): elif isinstance(timestamp, dict): if not isinstance(timestamp['value'], datetime): timestamp['value'] = parse(timestamp['value']) - return timestamp - elif not isinstance(timestamp, datetime): # Supported: float/int/string - if self._detect_epoch(timestamp): + return timestamp['value'] + else: # Supported: float/int/string + if isinstance(timestamp, (str, int, float)) and self._detect_epoch(timestamp): return datetime.fromtimestamp(float(timestamp)) - return parse(timestamp) - return timestamp + elif isinstance(timestamp, str): + return parse(timestamp) + else: + raise Exception(f'Unable to convert {timestamp} to a datetime.') def generate_attributes(self): """Contains the logic where all the values of the object are gathered""" diff --git a/pymisp/tools/asnobject.py b/pymisp/tools/asnobject.py index ce80029..11033a8 100644 --- a/pymisp/tools/asnobject.py +++ b/pymisp/tools/asnobject.py @@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp') class ASNObject(AbstractMISPObjectGenerator): - def __init__(self, parameters, strict=True, standalone=True, **kwargs): + def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): super(ASNObject, self).__init__('asn', strict=strict, standalone=standalone, **kwargs) self._parameters = parameters self.generate_attributes() diff --git a/pymisp/tools/create_misp_object.py b/pymisp/tools/create_misp_object.py index 8326f5d..f7a6eee 100644 --- a/pymisp/tools/create_misp_object.py +++ b/pymisp/tools/create_misp_object.py @@ -2,16 +2,18 @@ # -*- coding: utf-8 -*- import sys +from io import BytesIO from . import FileObject, PEObject, ELFObject, MachOObject from ..exceptions import MISPObjectException import logging +from typing import Optional logger = logging.getLogger('pymisp') try: - import lief - from lief import Logger + import lief # type: ignore + from lief import Logger # type: ignore Logger.disable() HAS_LIEF = True except ImportError: @@ -22,7 +24,7 @@ class FileTypeNotImplemented(MISPObjectException): pass -def make_pe_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}): +def make_pe_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone: bool=True, default_attributes_parameters: dict={}): pe_object = PEObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters) misp_file.add_reference(pe_object.uuid, 'includes', 'PE indicators') pe_sections = [] @@ -31,7 +33,7 @@ def make_pe_objects(lief_parsed, misp_file, standalone=True, default_attributes_ return misp_file, pe_object, pe_sections -def make_elf_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}): +def make_elf_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone: bool=True, default_attributes_parameters: dict={}): elf_object = ELFObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters) misp_file.add_reference(elf_object.uuid, 'includes', 'ELF indicators') elf_sections = [] @@ -40,7 +42,7 @@ def make_elf_objects(lief_parsed, misp_file, standalone=True, default_attributes return misp_file, elf_object, elf_sections -def make_macho_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}): +def make_macho_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone: bool=True, default_attributes_parameters: dict={}): macho_object = MachOObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters) misp_file.add_reference(macho_object.uuid, 'includes', 'MachO indicators') macho_sections = [] @@ -49,19 +51,22 @@ def make_macho_objects(lief_parsed, misp_file, standalone=True, default_attribut return misp_file, macho_object, macho_sections -def make_binary_objects(filepath=None, pseudofile=None, filename=None, standalone=True, default_attributes_parameters={}): +def make_binary_objects(filepath: Optional[str]=None, pseudofile: Optional[BytesIO]=None, filename: Optional[str]=None, standalone: bool=True, default_attributes_parameters: dict={}): misp_file = FileObject(filepath=filepath, pseudofile=pseudofile, filename=filename, standalone=standalone, default_attributes_parameters=default_attributes_parameters) if HAS_LIEF and (filepath or (pseudofile and filename)): try: if filepath: lief_parsed = lief.parse(filepath=filepath) - else: + elif pseudofile and filename: if sys.version_info < (3, 0): logger.critical('Pseudofile is not supported in python2. Just update.') lief_parsed = None else: lief_parsed = lief.parse(raw=pseudofile.getvalue(), name=filename) + else: + logger.critical('You need either a filepath, or a pseudofile and a filename.') + lief_parsed = None if isinstance(lief_parsed, lief.PE.Binary): return make_pe_objects(lief_parsed, misp_file, standalone, default_attributes_parameters) elif isinstance(lief_parsed, lief.ELF.Binary): diff --git a/pymisp/tools/domainipobject.py b/pymisp/tools/domainipobject.py index 62dc554..0fe7a86 100644 --- a/pymisp/tools/domainipobject.py +++ b/pymisp/tools/domainipobject.py @@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp') class DomainIPObject(AbstractMISPObjectGenerator): - def __init__(self, parameters, strict=True, standalone=True, **kwargs): + def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): super(DomainIPObject, self).__init__('domain-ip', strict=strict, standalone=standalone, **kwargs) self._parameters = parameters self.generate_attributes() diff --git a/pymisp/tools/elfobject.py b/pymisp/tools/elfobject.py index e57654b..210b806 100644 --- a/pymisp/tools/elfobject.py +++ b/pymisp/tools/elfobject.py @@ -6,17 +6,19 @@ from ..exceptions import InvalidMISPObject from io import BytesIO from hashlib import md5, sha1, sha256, sha512 import logging +from typing import Union +from pathlib import Path logger = logging.getLogger('pymisp') try: - import lief + import lief # type: ignore HAS_LIEF = True except ImportError: HAS_LIEF = False try: - import pydeep + import pydeep # type: ignore HAS_PYDEEP = True except ImportError: HAS_PYDEEP = False @@ -24,7 +26,7 @@ except ImportError: class ELFObject(AbstractMISPObjectGenerator): - def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs): + def __init__(self, parsed: lief.ELF.Binary=None, filepath: Union[Path, str]=None, pseudofile: Union[BytesIO, bytes]=None, standalone: bool=True, **kwargs): super(ELFObject, self).__init__('elf', standalone=standalone, **kwargs) if not HAS_PYDEEP: logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git") @@ -67,7 +69,7 @@ class ELFObject(AbstractMISPObjectGenerator): class ELFSectionObject(AbstractMISPObjectGenerator): - def __init__(self, section, standalone=True, **kwargs): + def __init__(self, section: lief.ELF.Section, standalone: bool=True, **kwargs): # Python3 way # super().__init__('pe-section') super(ELFSectionObject, self).__init__('elf-section', standalone=standalone, **kwargs) diff --git a/pymisp/tools/emailobject.py b/pymisp/tools/emailobject.py index c665ad5..758d7c8 100644 --- a/pymisp/tools/emailobject.py +++ b/pymisp/tools/emailobject.py @@ -6,13 +6,15 @@ from .abstractgenerator import AbstractMISPObjectGenerator from io import BytesIO import logging from email import message_from_bytes, policy +from pathlib import Path +from typing import Union logger = logging.getLogger('pymisp') class EMailObject(AbstractMISPObjectGenerator): - def __init__(self, filepath=None, pseudofile=None, attach_original_email=True, standalone=True, **kwargs): + def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, attach_original_email: bool=True, standalone: bool=True, **kwargs): # PY3 way: # super().__init__('file') super(EMailObject, self).__init__('email', standalone=standalone, **kwargs) diff --git a/pymisp/tools/ext_lookups.py b/pymisp/tools/ext_lookups.py index 7a439c4..75ca0e1 100644 --- a/pymisp/tools/ext_lookups.py +++ b/pymisp/tools/ext_lookups.py @@ -2,13 +2,13 @@ # -*- coding: utf-8 -*- try: - from pymispgalaxies import Clusters + from pymispgalaxies import Clusters # type: ignore has_pymispgalaxies = True except ImportError: has_pymispgalaxies = False try: - from pytaxonomies import Taxonomies + from pytaxonomies import Taxonomies # type: ignore has_pymispgalaxies = True except ImportError: has_pymispgalaxies = False diff --git a/pymisp/tools/fail2banobject.py b/pymisp/tools/fail2banobject.py index e49cd50..3077ee9 100644 --- a/pymisp/tools/fail2banobject.py +++ b/pymisp/tools/fail2banobject.py @@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp') class Fail2BanObject(AbstractMISPObjectGenerator): - def __init__(self, parameters, strict=True, standalone=True, **kwargs): + def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): super(Fail2BanObject, self).__init__('fail2ban', strict=strict, standalone=standalone, **kwargs) self._parameters = parameters self.generate_attributes() diff --git a/pymisp/tools/feed.py b/pymisp/tools/feed.py index 9ff53d2..f3d937a 100644 --- a/pymisp/tools/feed.py +++ b/pymisp/tools/feed.py @@ -4,11 +4,12 @@ from pathlib import Path from pymisp import MISPEvent import json +from typing import List def feed_meta_generator(path: Path): manifests = {} - hashes = [] + hashes: List[str] = [] for f_name in path.glob('*.json'): if str(f_name.name) == 'manifest.json': diff --git a/pymisp/tools/fileobject.py b/pymisp/tools/fileobject.py index 55e946c..5350a67 100644 --- a/pymisp/tools/fileobject.py +++ b/pymisp/tools/fileobject.py @@ -9,18 +9,20 @@ from hashlib import md5, sha1, sha256, sha512 import math from collections import Counter import logging +from pathlib import Path +from typing import Union logger = logging.getLogger('pymisp') try: - import pydeep + import pydeep # type: ignore HAS_PYDEEP = True except ImportError: HAS_PYDEEP = False try: - import magic + import magic # type: ignore HAS_MAGIC = True except ImportError: HAS_MAGIC = False @@ -28,7 +30,7 @@ except ImportError: class FileObject(AbstractMISPObjectGenerator): - def __init__(self, filepath=None, pseudofile=None, filename=None, standalone=True, **kwargs): + def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, filename: str=None, standalone: bool=True, **kwargs): # PY3 way: # super().__init__('file') super(FileObject, self).__init__('file', standalone=standalone, **kwargs) @@ -70,7 +72,7 @@ class FileObject(AbstractMISPObjectGenerator): if HAS_PYDEEP: self.add_attribute('ssdeep', value=pydeep.hash_buf(self.__data).decode()) - def __entropy_H(self, data): + def __entropy_H(self, data: bytes) -> float: """Calculate the entropy of a chunk of data.""" # NOTE: copy of the entropy function from pefile @@ -79,7 +81,7 @@ class FileObject(AbstractMISPObjectGenerator): occurences = Counter(bytearray(data)) - entropy = 0 + entropy = 0.0 for x in occurences.values(): p_x = float(x) / len(data) entropy -= p_x * math.log(p_x, 2) diff --git a/pymisp/tools/genericgenerator.py b/pymisp/tools/genericgenerator.py index cb339a2..eeed742 100644 --- a/pymisp/tools/genericgenerator.py +++ b/pymisp/tools/genericgenerator.py @@ -2,11 +2,13 @@ # -*- coding: utf-8 -*- from .abstractgenerator import AbstractMISPObjectGenerator +from typing import List class GenericObjectGenerator(AbstractMISPObjectGenerator): - def generate_attributes(self, attributes): + # FIXME: this method is different from the master one, and that's probably not a good idea. + def generate_attributes(self, attributes: List[dict]): # type: ignore """Generates MISPObjectAttributes from a list of dictionaries. Each entry if the list must be in one of the two following formats: * {: } diff --git a/pymisp/tools/geolocationobject.py b/pymisp/tools/geolocationobject.py index b8b1c24..3f59417 100644 --- a/pymisp/tools/geolocationobject.py +++ b/pymisp/tools/geolocationobject.py @@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp') class GeolocationObject(AbstractMISPObjectGenerator): - def __init__(self, parameters, strict=True, standalone=True, **kwargs): + def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): super(GeolocationObject, self).__init__('asn', strict=strict, standalone=standalone, **kwargs) self._parameters = parameters self.generate_attributes() diff --git a/pymisp/tools/load_warninglists.py b/pymisp/tools/load_warninglists.py index b94b936..89ec192 100644 --- a/pymisp/tools/load_warninglists.py +++ b/pymisp/tools/load_warninglists.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- try: - from pymispwarninglists import WarningLists + from pymispwarninglists import WarningLists # type: ignore has_pymispwarninglists = True except ImportError: has_pymispwarninglists = False diff --git a/pymisp/tools/machoobject.py b/pymisp/tools/machoobject.py index 984798b..a3b3ae3 100644 --- a/pymisp/tools/machoobject.py +++ b/pymisp/tools/machoobject.py @@ -6,18 +6,20 @@ from .abstractgenerator import AbstractMISPObjectGenerator from io import BytesIO from hashlib import md5, sha1, sha256, sha512 import logging +from typing import Optional, Union +from pathlib import Path logger = logging.getLogger('pymisp') try: - import lief + import lief # type: ignore HAS_LIEF = True except ImportError: HAS_LIEF = False try: - import pydeep + import pydeep # type: ignore HAS_PYDEEP = True except ImportError: HAS_PYDEEP = False @@ -25,7 +27,7 @@ except ImportError: class MachOObject(AbstractMISPObjectGenerator): - def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs): + def __init__(self, parsed: Optional[lief.MachO.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, standalone: bool=True, **kwargs): # Python3 way # super().__init__('elf') super(MachOObject, self).__init__('macho', standalone=standalone, **kwargs) @@ -70,7 +72,7 @@ class MachOObject(AbstractMISPObjectGenerator): class MachOSectionObject(AbstractMISPObjectGenerator): - def __init__(self, section, standalone=True, **kwargs): + def __init__(self, section: lief.MachO.Section, standalone: bool=True, **kwargs): # Python3 way # super().__init__('pe-section') super(MachOSectionObject, self).__init__('macho-section', standalone=standalone, **kwargs) diff --git a/pymisp/tools/neo4j.py b/pymisp/tools/neo4j.py index e77d0c0..2656a5b 100644 --- a/pymisp/tools/neo4j.py +++ b/pymisp/tools/neo4j.py @@ -5,7 +5,7 @@ import os from .. import MISPEvent try: - from py2neo import authenticate, Graph, Node, Relationship + from py2neo import authenticate, Graph, Node, Relationship # type: ignore has_py2neo = True except ImportError: has_py2neo = False diff --git a/pymisp/tools/openioc.py b/pymisp/tools/openioc.py index 9d337b1..662b444 100755 --- a/pymisp/tools/openioc.py +++ b/pymisp/tools/openioc.py @@ -5,7 +5,7 @@ import os from .. import MISPEvent try: - from bs4 import BeautifulSoup + from bs4 import BeautifulSoup # type: ignore has_bs4 = True except ImportError: has_bs4 = False diff --git a/pymisp/tools/peobject.py b/pymisp/tools/peobject.py index fa1b4e5..f93ecea 100644 --- a/pymisp/tools/peobject.py +++ b/pymisp/tools/peobject.py @@ -7,17 +7,19 @@ from io import BytesIO from hashlib import md5, sha1, sha256, sha512 from datetime import datetime import logging +from typing import Optional, Union +from pathlib import Path logger = logging.getLogger('pymisp') try: - import lief + import lief # type: ignore HAS_LIEF = True except ImportError: HAS_LIEF = False try: - import pydeep + import pydeep # type: ignore HAS_PYDEEP = True except ImportError: HAS_PYDEEP = False @@ -25,7 +27,7 @@ except ImportError: class PEObject(AbstractMISPObjectGenerator): - def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs): + def __init__(self, parsed: Optional[lief.PE.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, standalone: bool=True, **kwargs): # Python3 way # super().__init__('pe') super(PEObject, self).__init__('pe', standalone=standalone, **kwargs) @@ -117,7 +119,7 @@ class PEObject(AbstractMISPObjectGenerator): class PESectionObject(AbstractMISPObjectGenerator): - def __init__(self, section, standalone=True, **kwargs): + def __init__(self, section: lief.PE.Section, standalone: bool=True, **kwargs): # Python3 way # super().__init__('pe-section') super(PESectionObject, self).__init__('pe-section', standalone=standalone, **kwargs) diff --git a/pymisp/tools/reportlab_generator.py b/pymisp/tools/reportlab_generator.py index 98127e5..ed6c52c 100644 --- a/pymisp/tools/reportlab_generator.py +++ b/pymisp/tools/reportlab_generator.py @@ -20,17 +20,17 @@ logger = logging.getLogger('pymisp') # Potentially not installed imports try: - from reportlab.pdfgen import canvas - from reportlab.pdfbase.pdfmetrics import stringWidth, registerFont - from reportlab.pdfbase.ttfonts import TTFont - from reportlab.lib import colors - from reportlab.lib.pagesizes import A4 + from reportlab.pdfgen import canvas # type: ignore + from reportlab.pdfbase.pdfmetrics import stringWidth, registerFont # type: ignore + from reportlab.pdfbase.ttfonts import TTFont # type: ignore + from reportlab.lib import colors # type: ignore + from reportlab.lib.pagesizes import A4 # type: ignore - from reportlab.platypus import SimpleDocTemplate, Paragraph, PageBreak, Table, TableStyle, Flowable, Image, Indenter + from reportlab.platypus import SimpleDocTemplate, Paragraph, PageBreak, Table, TableStyle, Flowable, Image, Indenter # type: ignore - from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle - from reportlab.lib.units import mm - from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY, TA_LEFT + from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle # type: ignore + from reportlab.lib.units import mm # type: ignore + from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY, TA_LEFT # type: ignore HAS_REPORTLAB = True except ImportError: diff --git a/pymisp/tools/sbsignatureobject.py b/pymisp/tools/sbsignatureobject.py index 8b7f3c1..66b6667 100644 --- a/pymisp/tools/sbsignatureobject.py +++ b/pymisp/tools/sbsignatureobject.py @@ -8,7 +8,7 @@ class SBSignatureObject(AbstractMISPObjectGenerator): ''' Sandbox Analyzer ''' - def __init__(self, software, report, standalone=True, **kwargs): + def __init__(self, software: str, report: list, standalone: bool=True, **kwargs): super(SBSignatureObject, self).__init__("sb-signature", **kwargs) self._software = software self._report = report diff --git a/pymisp/tools/sshauthkeyobject.py b/pymisp/tools/sshauthkeyobject.py index 95bb937..aac8dda 100644 --- a/pymisp/tools/sshauthkeyobject.py +++ b/pymisp/tools/sshauthkeyobject.py @@ -5,13 +5,15 @@ from ..exceptions import InvalidMISPObject from .abstractgenerator import AbstractMISPObjectGenerator from io import StringIO import logging +from typing import Optional, Union +from pathlib import Path logger = logging.getLogger('pymisp') class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator): - def __init__(self, authorized_keys_path=None, authorized_keys_pseudofile=None, standalone=True, **kwargs): + def __init__(self, authorized_keys_path: Optional[Union[Path, str]]=None, authorized_keys_pseudofile: Optional[StringIO]=None, standalone: bool=True, **kwargs): # PY3 way: # super().__init__('file') super(SSHAuthorizedKeysObject, self).__init__('ssh-authorized-keys', standalone=standalone, **kwargs) @@ -19,7 +21,7 @@ class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator): with open(authorized_keys_path, 'r') as f: self.__pseudofile = StringIO(f.read()) elif authorized_keys_pseudofile and isinstance(authorized_keys_pseudofile, StringIO): - self.__pseudofile = authorized_keys_path + self.__pseudofile = authorized_keys_pseudofile else: raise InvalidMISPObject('File buffer (StringIO) or a path is required.') self.__data = self.__pseudofile.getvalue() diff --git a/pymisp/tools/stix.py b/pymisp/tools/stix.py index fe8430a..1e6cfb2 100644 --- a/pymisp/tools/stix.py +++ b/pymisp/tools/stix.py @@ -1,15 +1,15 @@ # -*- coding: utf-8 -*- try: - from misp_stix_converter.converters.buildMISPAttribute import buildEvent - from misp_stix_converter.converters import convert - from misp_stix_converter.converters.convert import MISPtoSTIX + from misp_stix_converter.converters.buildMISPAttribute import buildEvent # type: ignore + from misp_stix_converter.converters import convert # type: ignore + from misp_stix_converter.converters.convert import MISPtoSTIX # type: ignore has_misp_stix_converter = True except ImportError: has_misp_stix_converter = False -def load_stix(stix, distribution=3, threat_level_id=2, analysis=0): +def load_stix(stix, distribution: int=3, threat_level_id: int=2, analysis: int=0): '''Returns a MISPEvent object from a STIX package''' if not has_misp_stix_converter: raise Exception('You need to install misp_stix_converter: pip install git+https://github.com/MISP/MISP-STIX-Converter.git') @@ -18,7 +18,7 @@ def load_stix(stix, distribution=3, threat_level_id=2, analysis=0): threat_level_id=threat_level_id, analysis=analysis) -def make_stix_package(misp_event, to_json=False, to_xml=False): +def make_stix_package(misp_event, to_json: bool=False, to_xml: bool=False): '''Returns a STIXPackage from a MISPEvent. Optionally can return the package in json or xml. diff --git a/pymisp/tools/urlobject.py b/pymisp/tools/urlobject.py index 1e8df2d..f499a3d 100644 --- a/pymisp/tools/urlobject.py +++ b/pymisp/tools/urlobject.py @@ -3,7 +3,7 @@ from .abstractgenerator import AbstractMISPObjectGenerator import logging -from pyfaup.faup import Faup +from pyfaup.faup import Faup # type: ignore from urllib.parse import unquote_plus logger = logging.getLogger('pymisp') @@ -13,7 +13,7 @@ faup = Faup() class URLObject(AbstractMISPObjectGenerator): - def __init__(self, url, standalone=True, **kwargs): + def __init__(self, url: str, standalone: bool=True, **kwargs): # PY3 way: # super().__init__('file') super(URLObject, self).__init__('url', standalone=standalone, **kwargs) diff --git a/pymisp/tools/vtreportobject.py b/pymisp/tools/vtreportobject.py index 69e856e..336225e 100644 --- a/pymisp/tools/vtreportobject.py +++ b/pymisp/tools/vtreportobject.py @@ -2,10 +2,11 @@ # -*- coding: utf-8 -*- import re +from typing import Optional import requests try: - import validators + import validators # type: ignore has_validators = True except ImportError: has_validators = False @@ -23,7 +24,7 @@ class VTReportObject(AbstractMISPObjectGenerator): :indicator: IOC to search VirusTotal for ''' - def __init__(self, apikey, indicator, vt_proxies=None, standalone=True, **kwargs): + def __init__(self, apikey: str, indicator: str, vt_proxies: Optional[dict]=None, standalone: bool=True, **kwargs): # PY3 way: # super().__init__("virustotal-report") super(VTReportObject, self).__init__("virustotal-report", standalone=standalone, **kwargs) @@ -47,7 +48,7 @@ class VTReportObject(AbstractMISPObjectGenerator): ratio = "{}/{}".format(self._report["positives"], self._report["total"]) self.add_attribute("detection-ratio", value=ratio) - def __validate_resource(self, ioc): + def __validate_resource(self, ioc: str): ''' Validate the data type of an indicator. Domains and IP addresses aren't supported because @@ -63,7 +64,7 @@ class VTReportObject(AbstractMISPObjectGenerator): return "file" return False - def __query_virustotal(self, apikey, resource): + def __query_virustotal(self, apikey: str, resource: str): ''' Query VirusTotal for information about an indicator @@ -78,9 +79,9 @@ class VTReportObject(AbstractMISPObjectGenerator): report = requests.get(url, params=params, proxies=self._proxies) else: report = requests.get(url, params=params) - report = report.json() - if report["response_code"] == 1: + report_json = report.json() + if report_json["response_code"] == 1: return report else: - error_msg = "{}: {}".format(resource, report["verbose_msg"]) + error_msg = "{}: {}".format(resource, report_json["verbose_msg"]) raise InvalidMISPObject(error_msg) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index cc4dfab..5346035 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -12,7 +12,7 @@ from io import BytesIO import json from pathlib import Path -import urllib3 +import urllib3 # type: ignore import time from uuid import uuid4 @@ -37,7 +37,7 @@ except ImportError: raise try: - from keys import url, key + from keys import url, key # type: ignore verifycert = False except ImportError as e: print(e) diff --git a/tests/testlive_sync.py b/tests/testlive_sync.py index 9435dd2..24971c4 100644 --- a/tests/testlive_sync.py +++ b/tests/testlive_sync.py @@ -6,7 +6,7 @@ import sys import unittest import subprocess -import urllib3 +import urllib3 # type: ignore import logging logging.disable(logging.CRITICAL)