chg: Refactorize typing, validate

pull/531/head
Raphaël Vinot 2020-01-23 10:27:40 +01:00
parent adf97dfeff
commit b0e95fd5af
33 changed files with 617 additions and 481 deletions

View File

@ -22,6 +22,7 @@ install:
script:
- bash travis/test_travis.sh
- mypy tests/testlive_comprehensive.py tests/test_mispevent.py tests/testlive_sync.py pymisp
after_success:
- pipenv run codecov

View File

@ -4,7 +4,7 @@
from pymisp import ExpandedPyMISP
from pymisp.tools import EMailObject
import traceback
from keys import misp_url, misp_key, misp_verifycert
from keys import misp_url, misp_key, misp_verifycert # type: ignore
import glob
import argparse

View File

@ -2,7 +2,11 @@
# -*- coding: utf-8 -*-
from pymisp import ExpandedPyMISP
from keys import misp_url, misp_key, misp_verifycert, misp_client_cert
from keys import misp_url, misp_key, misp_verifycert
try:
from keys import misp_client_cert
except ImportError:
misp_client_cert = ''
import argparse
import os

View File

@ -21,7 +21,7 @@ except ImportError:
import logging
from enum import Enum
from typing import Union, Optional
from typing import Union, Optional, List
from .exceptions import PyMISPInvalidFormat, PyMISPError
@ -112,6 +112,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
self.__not_jsonable: list = []
self._fields_for_feed: set = {}
self.__self_defined_describe_types: Union[dict, None] = None
self.uuid: str
if kwargs.get('force_timestamps') is not None:
# Ignore the edited objects and keep the timestamps.
@ -119,14 +120,6 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
else:
self.__force_timestamps: bool = False
# List of classes having tags
from .mispevent import MISPAttribute, MISPEvent
self.__has_tags = (MISPAttribute, MISPEvent)
if isinstance(self, self.__has_tags):
self.Tag = []
setattr(AbstractMISP, 'add_tag', AbstractMISP.__add_tag)
setattr(AbstractMISP, 'tags', property(AbstractMISP.__get_tags, AbstractMISP.__set_tags))
@property
def describe_types(self) -> dict:
if self.__self_defined_describe_types:
@ -288,7 +281,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
return int(d)
return int(d.timestamp())
def __add_tag(self, tag=None, **kwargs):
def _add_tag(self, tag=None, **kwargs):
"""Add a tag to the attribute (by name or a MISPTag object)"""
if isinstance(tag, str):
misp_tag = MISPTag()
@ -308,11 +301,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
self.edited = True
return misp_tag
def __get_tags(self):
"""Returns a lost of tags associated to this Attribute"""
return self.Tag
def __set_tags(self, tags):
def _set_tags(self, tags):
"""Set a list of prepared MISPTag."""
if all(isinstance(x, MISPTag) for x in tags):
self.Tag = tags
@ -337,6 +326,10 @@ class MISPTag(AbstractMISP):
_fields_for_feed: set = {'name', 'colour'}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name: str
def from_dict(self, **kwargs):
if kwargs.get('Tag'):
kwargs = kwargs.get('Tag')

File diff suppressed because it is too large Load Diff

View File

@ -240,6 +240,7 @@
"attachment",
"authentihash",
"cdhash",
"chrome-extension-id",
"comment",
"domain",
"email-attachment",
@ -321,6 +322,7 @@
"attachment",
"authentihash",
"cdhash",
"chrome-extension-id",
"comment",
"filename",
"filename|authentihash",
@ -512,6 +514,10 @@
"default_category": "Payload delivery",
"to_ids": 1
},
"chrome-extension-id": {
"default_category": "Payload delivery",
"to_ids": 1
},
"comment": {
"default_category": "Other",
"to_ids": 0
@ -1121,6 +1127,7 @@
"campaign-name",
"cc-number",
"cdhash",
"chrome-extension-id",
"comment",
"community-id",
"cookie",

View File

@ -77,6 +77,14 @@ class MISPOrganisation(AbstractMISP):
kwargs = kwargs['Organisation']
super(MISPOrganisation, self).from_dict(**kwargs)
class MISPSharingGroup(AbstractMISP):
def from_dict(self, **kwargs):
if 'SharingGroup' in kwargs:
kwargs = kwargs['SharingGroup']
super().from_dict(**kwargs)
class MISPShadowAttribute(AbstractMISP):
@ -133,10 +141,29 @@ class MISPAttribute(AbstractMISP):
self.__category_type_mapping: dict = self.describe_types['category_type_mappings']
self.__sane_default: dict = self.describe_types['sane_defaults']
self.__strict: bool = strict
self._data = None
self._data: Optional[BytesIO] = None
self.uuid: str = str(uuid.uuid4())
self.ShadowAttribute: List[MISPShadowAttribute] = []
self.SharingGroup: MISPSharingGroup
self.Sighting: List[MISPSighting] = []
self.Tag: List[MISPTag] = []
# For search
self.Event: MISPEvent
self.RelatedAttribute: List[MISPAttribute]
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]], **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@property
def tags(self) -> List[MISPTag]:
"""Returns a lost of tags associated to this Attribute"""
return self.Tag
@tags.setter
def tags(self, tags: List[MISPTag]):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
def hash_values(self, algorithm: str='sha512') -> List[str]:
"""Compute the hash of every values for fast lookups"""
@ -333,6 +360,10 @@ class MISPAttribute(AbstractMISP):
if kwargs.get('ShadowAttribute'):
[self.add_shadow_attribute(s_attr) for s_attr in kwargs.pop('ShadowAttribute')]
if kwargs.get('SharingGroup'):
for sg in kwargs.pop('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**sg)
# If the user wants to disable correlation, let them. Defaults to False.
self.disable_correlation = kwargs.pop("disable_correlation", False)
if self.disable_correlation is None:
@ -383,8 +414,8 @@ class MISPAttribute(AbstractMISP):
@data.setter
def data(self, data: Union[Path, str, bytes, BytesIO]):
if isinstance(data, Path):
with data.open('rb') as f:
self._data = BytesIO(f.read())
with data.open('rb') as f_temp:
self._data = BytesIO(f_temp.read())
if isinstance(data, (str, bytes)):
self._data = BytesIO(base64.b64decode(data))
elif isinstance(data, BytesIO):
@ -451,6 +482,9 @@ class MISPObjectReference(AbstractMISP):
def __init__(self):
super().__init__()
self.uuid = str(uuid.uuid4())
self.object_uuid: str
self.referenced_uuid: str
self.relationship_type: str
def _set_default(self):
if not hasattr(self, 'comment'):
@ -492,6 +526,7 @@ class MISPObject(AbstractMISP):
self._strict: bool = strict
self.name: str = name
self._known_template: bool = False
self.id: int
self._set_template(kwargs.get('misp_objects_path_custom'))
@ -499,11 +534,13 @@ class MISPObject(AbstractMISP):
self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes]
self.ObjectReference: List[MISPObjectReference] = []
self.Attribute: List[MISPAttribute] = []
self.SharingGroup: MISPSharingGroup
self._default_attributes_parameters: dict
if isinstance(default_attributes_parameters, MISPAttribute):
# Just make sure we're not modifying an existing MISPAttribute
self._default_attributes_parameters: dict = default_attributes_parameters.to_dict()
self._default_attributes_parameters = default_attributes_parameters.to_dict()
else:
self._default_attributes_parameters: dict = default_attributes_parameters
self._default_attributes_parameters = default_attributes_parameters
if self._default_attributes_parameters:
# Let's clean that up
self._default_attributes_parameters.pop('value', None) # duh
@ -558,7 +595,10 @@ class MISPObject(AbstractMISP):
def _set_template(self, misp_objects_path_custom: Optional[Union[Path, str]]=None):
if misp_objects_path_custom:
# If misp_objects_path_custom is given, and an object with the given name exists, use that.
self.misp_objects_path = misp_objects_path_custom
if isinstance(misp_objects_path_custom, str):
self.misp_objects_path = Path(misp_objects_path_custom)
else:
self.misp_objects_path = misp_objects_path_custom
# Try to get the template
self._known_template = self._load_template_path(self.misp_objects_path / self.name / 'definition.json')
@ -628,6 +668,10 @@ class MISPObject(AbstractMISP):
if kwargs.get('ObjectReference'):
[self.add_reference(**r) for r in kwargs.pop('ObjectReference')]
if kwargs.get('SharingGroup'):
for sg in kwargs.pop('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**sg)
# Not supported yet - https://github.com/MISP/PyMISP/issues/168
# if kwargs.get('Tag'):
# for tag in kwargs.pop('Tag'):
@ -763,6 +807,21 @@ class MISPEvent(AbstractMISP):
self.Object: List[MISPObject] = []
self.RelatedEvent: List[MISPEvent] = []
self.ShadowAttribute: List[MISPShadowAttribute] = []
self.SharingGroup: MISPSharingGroup
self.Tag: List[MISPTag] = []
def add_tag(self, tag: Optional[Union[str, MISPTag, dict]], **kwargs) -> MISPTag:
return super()._add_tag(tag, **kwargs)
@property
def tags(self) -> List[MISPTag]:
"""Returns a lost of tags associated to this Event"""
return self.Tag
@tags.setter
def tags(self, tags: List[MISPTag]):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
def _set_default(self):
"""There are a few keys that could, or need to be set by default for the feed generator"""
@ -928,13 +987,14 @@ class MISPEvent(AbstractMISP):
def load(self, json_event: Union[IO, str, bytes, dict], validate: bool=False, metadata_only: bool=False):
"""Load a JSON dump from a pseudo file or a JSON string"""
if hasattr(json_event, 'read'):
if isinstance(json_event, IO):
# python2 and python3 compatible to find if we have a file
json_event = json_event.read()
if isinstance(json_event, (str, bytes)):
json_event = json.loads(json_event)
if json_event.get('response'):
event = json_event.get('response')[0]
if isinstance(json_event, dict) and 'response' in json_event and isinstance(json_event['response'], list):
event = json_event['response'][0]
else:
event = json_event
if not event:
@ -1028,7 +1088,9 @@ class MISPEvent(AbstractMISP):
if kwargs.get('Orgc'):
self.Orgc = MISPOrganisation()
self.Orgc.from_dict(**kwargs.pop('Orgc'))
if kwargs.get('SharingGroup'):
self.SharingGroup = MISPSharingGroup()
self.SharingGroup.from_dict(**kwargs.pop('SharingGroup'))
super(MISPEvent, self).from_dict(**kwargs)
def to_dict(self) -> dict:
@ -1276,6 +1338,10 @@ class MISPObjectTemplate(AbstractMISP):
class MISPUser(AbstractMISP):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.email: str
def from_dict(self, **kwargs):
if 'User' in kwargs:
kwargs = kwargs['User']
@ -1331,6 +1397,11 @@ class MISPNoticelist(AbstractMISP):
class MISPRole(AbstractMISP):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.perm_admin: int
self.perm_site_admin: int
def from_dict(self, **kwargs):
if 'Role' in kwargs:
kwargs = kwargs['Role']
@ -1345,16 +1416,14 @@ class MISPServer(AbstractMISP):
super().from_dict(**kwargs)
class MISPSharingGroup(AbstractMISP):
def from_dict(self, **kwargs):
if 'SharingGroup' in kwargs:
kwargs = kwargs['SharingGroup']
super().from_dict(**kwargs)
class MISPLog(AbstractMISP):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.model: str
self.action: str
self.title: str
def from_dict(self, **kwargs):
if 'Log' in kwargs:
kwargs = kwargs['Log']
@ -1366,6 +1435,12 @@ class MISPLog(AbstractMISP):
class MISPEventDelegation(AbstractMISP):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.org_id: int
self.requester_org_id: int
self.event_id: int
def from_dict(self, **kwargs):
if 'EventDelegation' in kwargs:
kwargs = kwargs['EventDelegation']
@ -1384,7 +1459,9 @@ class MISPObjectAttribute(MISPAttribute):
super().__init__()
self._definition = definition
def from_dict(self, object_relation: str, value: Union[str, int, float], **kwargs):
def from_dict(self, object_relation: str, value: Union[str, int, float], **kwargs): # type: ignore
# NOTE: Signature of "from_dict" incompatible with supertype "MISPAttribute"
self.object_relation = object_relation
self.value = value
if 'Attribute' in kwargs:

0
pymisp/py.typed Normal file
View File

View File

@ -5,11 +5,11 @@ from .. import MISPObject
from ..exceptions import InvalidMISPObject
from datetime import datetime, date
from dateutil.parser import parse
from typing import Union, Optional
class AbstractMISPObjectGenerator(MISPObject):
def _detect_epoch(self, timestamp):
def _detect_epoch(self, timestamp: Union[str, int, float]) -> bool:
try:
tmp = float(timestamp)
if tmp < 30000000:
@ -20,7 +20,7 @@ class AbstractMISPObjectGenerator(MISPObject):
except ValueError:
return False
def _sanitize_timestamp(self, timestamp):
def _sanitize_timestamp(self, timestamp: Optional[Union[datetime, date, dict, str, int, float]]=None) -> datetime:
if not timestamp:
return datetime.now()
@ -31,12 +31,14 @@ class AbstractMISPObjectGenerator(MISPObject):
elif isinstance(timestamp, dict):
if not isinstance(timestamp['value'], datetime):
timestamp['value'] = parse(timestamp['value'])
return timestamp
elif not isinstance(timestamp, datetime): # Supported: float/int/string
if self._detect_epoch(timestamp):
return timestamp['value']
else: # Supported: float/int/string
if isinstance(timestamp, (str, int, float)) and self._detect_epoch(timestamp):
return datetime.fromtimestamp(float(timestamp))
return parse(timestamp)
return timestamp
elif isinstance(timestamp, str):
return parse(timestamp)
else:
raise Exception(f'Unable to convert {timestamp} to a datetime.')
def generate_attributes(self):
"""Contains the logic where all the values of the object are gathered"""

View File

@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp')
class ASNObject(AbstractMISPObjectGenerator):
def __init__(self, parameters, strict=True, standalone=True, **kwargs):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs):
super(ASNObject, self).__init__('asn', strict=strict, standalone=standalone, **kwargs)
self._parameters = parameters
self.generate_attributes()

View File

@ -2,16 +2,18 @@
# -*- coding: utf-8 -*-
import sys
from io import BytesIO
from . import FileObject, PEObject, ELFObject, MachOObject
from ..exceptions import MISPObjectException
import logging
from typing import Optional
logger = logging.getLogger('pymisp')
try:
import lief
from lief import Logger
import lief # type: ignore
from lief import Logger # type: ignore
Logger.disable()
HAS_LIEF = True
except ImportError:
@ -22,7 +24,7 @@ class FileTypeNotImplemented(MISPObjectException):
pass
def make_pe_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}):
def make_pe_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone: bool=True, default_attributes_parameters: dict={}):
pe_object = PEObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters)
misp_file.add_reference(pe_object.uuid, 'includes', 'PE indicators')
pe_sections = []
@ -31,7 +33,7 @@ def make_pe_objects(lief_parsed, misp_file, standalone=True, default_attributes_
return misp_file, pe_object, pe_sections
def make_elf_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}):
def make_elf_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone: bool=True, default_attributes_parameters: dict={}):
elf_object = ELFObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters)
misp_file.add_reference(elf_object.uuid, 'includes', 'ELF indicators')
elf_sections = []
@ -40,7 +42,7 @@ def make_elf_objects(lief_parsed, misp_file, standalone=True, default_attributes
return misp_file, elf_object, elf_sections
def make_macho_objects(lief_parsed, misp_file, standalone=True, default_attributes_parameters={}):
def make_macho_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone: bool=True, default_attributes_parameters: dict={}):
macho_object = MachOObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters)
misp_file.add_reference(macho_object.uuid, 'includes', 'MachO indicators')
macho_sections = []
@ -49,19 +51,22 @@ def make_macho_objects(lief_parsed, misp_file, standalone=True, default_attribut
return misp_file, macho_object, macho_sections
def make_binary_objects(filepath=None, pseudofile=None, filename=None, standalone=True, default_attributes_parameters={}):
def make_binary_objects(filepath: Optional[str]=None, pseudofile: Optional[BytesIO]=None, filename: Optional[str]=None, standalone: bool=True, default_attributes_parameters: dict={}):
misp_file = FileObject(filepath=filepath, pseudofile=pseudofile, filename=filename,
standalone=standalone, default_attributes_parameters=default_attributes_parameters)
if HAS_LIEF and (filepath or (pseudofile and filename)):
try:
if filepath:
lief_parsed = lief.parse(filepath=filepath)
else:
elif pseudofile and filename:
if sys.version_info < (3, 0):
logger.critical('Pseudofile is not supported in python2. Just update.')
lief_parsed = None
else:
lief_parsed = lief.parse(raw=pseudofile.getvalue(), name=filename)
else:
logger.critical('You need either a filepath, or a pseudofile and a filename.')
lief_parsed = None
if isinstance(lief_parsed, lief.PE.Binary):
return make_pe_objects(lief_parsed, misp_file, standalone, default_attributes_parameters)
elif isinstance(lief_parsed, lief.ELF.Binary):

View File

@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp')
class DomainIPObject(AbstractMISPObjectGenerator):
def __init__(self, parameters, strict=True, standalone=True, **kwargs):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs):
super(DomainIPObject, self).__init__('domain-ip', strict=strict, standalone=standalone, **kwargs)
self._parameters = parameters
self.generate_attributes()

View File

@ -6,17 +6,19 @@ from ..exceptions import InvalidMISPObject
from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
import logging
from typing import Union
from pathlib import Path
logger = logging.getLogger('pymisp')
try:
import lief
import lief # type: ignore
HAS_LIEF = True
except ImportError:
HAS_LIEF = False
try:
import pydeep
import pydeep # type: ignore
HAS_PYDEEP = True
except ImportError:
HAS_PYDEEP = False
@ -24,7 +26,7 @@ except ImportError:
class ELFObject(AbstractMISPObjectGenerator):
def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs):
def __init__(self, parsed: lief.ELF.Binary=None, filepath: Union[Path, str]=None, pseudofile: Union[BytesIO, bytes]=None, standalone: bool=True, **kwargs):
super(ELFObject, self).__init__('elf', standalone=standalone, **kwargs)
if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
@ -67,7 +69,7 @@ class ELFObject(AbstractMISPObjectGenerator):
class ELFSectionObject(AbstractMISPObjectGenerator):
def __init__(self, section, standalone=True, **kwargs):
def __init__(self, section: lief.ELF.Section, standalone: bool=True, **kwargs):
# Python3 way
# super().__init__('pe-section')
super(ELFSectionObject, self).__init__('elf-section', standalone=standalone, **kwargs)

View File

@ -6,13 +6,15 @@ from .abstractgenerator import AbstractMISPObjectGenerator
from io import BytesIO
import logging
from email import message_from_bytes, policy
from pathlib import Path
from typing import Union
logger = logging.getLogger('pymisp')
class EMailObject(AbstractMISPObjectGenerator):
def __init__(self, filepath=None, pseudofile=None, attach_original_email=True, standalone=True, **kwargs):
def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, attach_original_email: bool=True, standalone: bool=True, **kwargs):
# PY3 way:
# super().__init__('file')
super(EMailObject, self).__init__('email', standalone=standalone, **kwargs)

View File

@ -2,13 +2,13 @@
# -*- coding: utf-8 -*-
try:
from pymispgalaxies import Clusters
from pymispgalaxies import Clusters # type: ignore
has_pymispgalaxies = True
except ImportError:
has_pymispgalaxies = False
try:
from pytaxonomies import Taxonomies
from pytaxonomies import Taxonomies # type: ignore
has_pymispgalaxies = True
except ImportError:
has_pymispgalaxies = False

View File

@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp')
class Fail2BanObject(AbstractMISPObjectGenerator):
def __init__(self, parameters, strict=True, standalone=True, **kwargs):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs):
super(Fail2BanObject, self).__init__('fail2ban', strict=strict, standalone=standalone, **kwargs)
self._parameters = parameters
self.generate_attributes()

View File

@ -4,11 +4,12 @@
from pathlib import Path
from pymisp import MISPEvent
import json
from typing import List
def feed_meta_generator(path: Path):
manifests = {}
hashes = []
hashes: List[str] = []
for f_name in path.glob('*.json'):
if str(f_name.name) == 'manifest.json':

View File

@ -9,18 +9,20 @@ from hashlib import md5, sha1, sha256, sha512
import math
from collections import Counter
import logging
from pathlib import Path
from typing import Union
logger = logging.getLogger('pymisp')
try:
import pydeep
import pydeep # type: ignore
HAS_PYDEEP = True
except ImportError:
HAS_PYDEEP = False
try:
import magic
import magic # type: ignore
HAS_MAGIC = True
except ImportError:
HAS_MAGIC = False
@ -28,7 +30,7 @@ except ImportError:
class FileObject(AbstractMISPObjectGenerator):
def __init__(self, filepath=None, pseudofile=None, filename=None, standalone=True, **kwargs):
def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, filename: str=None, standalone: bool=True, **kwargs):
# PY3 way:
# super().__init__('file')
super(FileObject, self).__init__('file', standalone=standalone, **kwargs)
@ -70,7 +72,7 @@ class FileObject(AbstractMISPObjectGenerator):
if HAS_PYDEEP:
self.add_attribute('ssdeep', value=pydeep.hash_buf(self.__data).decode())
def __entropy_H(self, data):
def __entropy_H(self, data: bytes) -> float:
"""Calculate the entropy of a chunk of data."""
# NOTE: copy of the entropy function from pefile
@ -79,7 +81,7 @@ class FileObject(AbstractMISPObjectGenerator):
occurences = Counter(bytearray(data))
entropy = 0
entropy = 0.0
for x in occurences.values():
p_x = float(x) / len(data)
entropy -= p_x * math.log(p_x, 2)

View File

@ -2,11 +2,13 @@
# -*- coding: utf-8 -*-
from .abstractgenerator import AbstractMISPObjectGenerator
from typing import List
class GenericObjectGenerator(AbstractMISPObjectGenerator):
def generate_attributes(self, attributes):
# FIXME: this method is different from the master one, and that's probably not a good idea.
def generate_attributes(self, attributes: List[dict]): # type: ignore
"""Generates MISPObjectAttributes from a list of dictionaries.
Each entry if the list must be in one of the two following formats:
* {<object_relation>: <value>}

View File

@ -9,7 +9,7 @@ logger = logging.getLogger('pymisp')
class GeolocationObject(AbstractMISPObjectGenerator):
def __init__(self, parameters, strict=True, standalone=True, **kwargs):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs):
super(GeolocationObject, self).__init__('asn', strict=strict, standalone=standalone, **kwargs)
self._parameters = parameters
self.generate_attributes()

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
try:
from pymispwarninglists import WarningLists
from pymispwarninglists import WarningLists # type: ignore
has_pymispwarninglists = True
except ImportError:
has_pymispwarninglists = False

View File

@ -6,18 +6,20 @@ from .abstractgenerator import AbstractMISPObjectGenerator
from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
import logging
from typing import Optional, Union
from pathlib import Path
logger = logging.getLogger('pymisp')
try:
import lief
import lief # type: ignore
HAS_LIEF = True
except ImportError:
HAS_LIEF = False
try:
import pydeep
import pydeep # type: ignore
HAS_PYDEEP = True
except ImportError:
HAS_PYDEEP = False
@ -25,7 +27,7 @@ except ImportError:
class MachOObject(AbstractMISPObjectGenerator):
def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs):
def __init__(self, parsed: Optional[lief.MachO.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, standalone: bool=True, **kwargs):
# Python3 way
# super().__init__('elf')
super(MachOObject, self).__init__('macho', standalone=standalone, **kwargs)
@ -70,7 +72,7 @@ class MachOObject(AbstractMISPObjectGenerator):
class MachOSectionObject(AbstractMISPObjectGenerator):
def __init__(self, section, standalone=True, **kwargs):
def __init__(self, section: lief.MachO.Section, standalone: bool=True, **kwargs):
# Python3 way
# super().__init__('pe-section')
super(MachOSectionObject, self).__init__('macho-section', standalone=standalone, **kwargs)

View File

@ -5,7 +5,7 @@ import os
from .. import MISPEvent
try:
from py2neo import authenticate, Graph, Node, Relationship
from py2neo import authenticate, Graph, Node, Relationship # type: ignore
has_py2neo = True
except ImportError:
has_py2neo = False

View File

@ -5,7 +5,7 @@ import os
from .. import MISPEvent
try:
from bs4 import BeautifulSoup
from bs4 import BeautifulSoup # type: ignore
has_bs4 = True
except ImportError:
has_bs4 = False

View File

@ -7,17 +7,19 @@ from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
from datetime import datetime
import logging
from typing import Optional, Union
from pathlib import Path
logger = logging.getLogger('pymisp')
try:
import lief
import lief # type: ignore
HAS_LIEF = True
except ImportError:
HAS_LIEF = False
try:
import pydeep
import pydeep # type: ignore
HAS_PYDEEP = True
except ImportError:
HAS_PYDEEP = False
@ -25,7 +27,7 @@ except ImportError:
class PEObject(AbstractMISPObjectGenerator):
def __init__(self, parsed=None, filepath=None, pseudofile=None, standalone=True, **kwargs):
def __init__(self, parsed: Optional[lief.PE.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, standalone: bool=True, **kwargs):
# Python3 way
# super().__init__('pe')
super(PEObject, self).__init__('pe', standalone=standalone, **kwargs)
@ -117,7 +119,7 @@ class PEObject(AbstractMISPObjectGenerator):
class PESectionObject(AbstractMISPObjectGenerator):
def __init__(self, section, standalone=True, **kwargs):
def __init__(self, section: lief.PE.Section, standalone: bool=True, **kwargs):
# Python3 way
# super().__init__('pe-section')
super(PESectionObject, self).__init__('pe-section', standalone=standalone, **kwargs)

View File

@ -20,17 +20,17 @@ logger = logging.getLogger('pymisp')
# Potentially not installed imports
try:
from reportlab.pdfgen import canvas
from reportlab.pdfbase.pdfmetrics import stringWidth, registerFont
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas # type: ignore
from reportlab.pdfbase.pdfmetrics import stringWidth, registerFont # type: ignore
from reportlab.pdfbase.ttfonts import TTFont # type: ignore
from reportlab.lib import colors # type: ignore
from reportlab.lib.pagesizes import A4 # type: ignore
from reportlab.platypus import SimpleDocTemplate, Paragraph, PageBreak, Table, TableStyle, Flowable, Image, Indenter
from reportlab.platypus import SimpleDocTemplate, Paragraph, PageBreak, Table, TableStyle, Flowable, Image, Indenter # type: ignore
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import mm
from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY, TA_LEFT
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle # type: ignore
from reportlab.lib.units import mm # type: ignore
from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY, TA_LEFT # type: ignore
HAS_REPORTLAB = True
except ImportError:

View File

@ -8,7 +8,7 @@ class SBSignatureObject(AbstractMISPObjectGenerator):
'''
Sandbox Analyzer
'''
def __init__(self, software, report, standalone=True, **kwargs):
def __init__(self, software: str, report: list, standalone: bool=True, **kwargs):
super(SBSignatureObject, self).__init__("sb-signature", **kwargs)
self._software = software
self._report = report

View File

@ -5,13 +5,15 @@ from ..exceptions import InvalidMISPObject
from .abstractgenerator import AbstractMISPObjectGenerator
from io import StringIO
import logging
from typing import Optional, Union
from pathlib import Path
logger = logging.getLogger('pymisp')
class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator):
def __init__(self, authorized_keys_path=None, authorized_keys_pseudofile=None, standalone=True, **kwargs):
def __init__(self, authorized_keys_path: Optional[Union[Path, str]]=None, authorized_keys_pseudofile: Optional[StringIO]=None, standalone: bool=True, **kwargs):
# PY3 way:
# super().__init__('file')
super(SSHAuthorizedKeysObject, self).__init__('ssh-authorized-keys', standalone=standalone, **kwargs)
@ -19,7 +21,7 @@ class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator):
with open(authorized_keys_path, 'r') as f:
self.__pseudofile = StringIO(f.read())
elif authorized_keys_pseudofile and isinstance(authorized_keys_pseudofile, StringIO):
self.__pseudofile = authorized_keys_path
self.__pseudofile = authorized_keys_pseudofile
else:
raise InvalidMISPObject('File buffer (StringIO) or a path is required.')
self.__data = self.__pseudofile.getvalue()

View File

@ -1,15 +1,15 @@
# -*- coding: utf-8 -*-
try:
from misp_stix_converter.converters.buildMISPAttribute import buildEvent
from misp_stix_converter.converters import convert
from misp_stix_converter.converters.convert import MISPtoSTIX
from misp_stix_converter.converters.buildMISPAttribute import buildEvent # type: ignore
from misp_stix_converter.converters import convert # type: ignore
from misp_stix_converter.converters.convert import MISPtoSTIX # type: ignore
has_misp_stix_converter = True
except ImportError:
has_misp_stix_converter = False
def load_stix(stix, distribution=3, threat_level_id=2, analysis=0):
def load_stix(stix, distribution: int=3, threat_level_id: int=2, analysis: int=0):
'''Returns a MISPEvent object from a STIX package'''
if not has_misp_stix_converter:
raise Exception('You need to install misp_stix_converter: pip install git+https://github.com/MISP/MISP-STIX-Converter.git')
@ -18,7 +18,7 @@ def load_stix(stix, distribution=3, threat_level_id=2, analysis=0):
threat_level_id=threat_level_id, analysis=analysis)
def make_stix_package(misp_event, to_json=False, to_xml=False):
def make_stix_package(misp_event, to_json: bool=False, to_xml: bool=False):
'''Returns a STIXPackage from a MISPEvent.
Optionally can return the package in json or xml.

View File

@ -3,7 +3,7 @@
from .abstractgenerator import AbstractMISPObjectGenerator
import logging
from pyfaup.faup import Faup
from pyfaup.faup import Faup # type: ignore
from urllib.parse import unquote_plus
logger = logging.getLogger('pymisp')
@ -13,7 +13,7 @@ faup = Faup()
class URLObject(AbstractMISPObjectGenerator):
def __init__(self, url, standalone=True, **kwargs):
def __init__(self, url: str, standalone: bool=True, **kwargs):
# PY3 way:
# super().__init__('file')
super(URLObject, self).__init__('url', standalone=standalone, **kwargs)

View File

@ -2,10 +2,11 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
import requests
try:
import validators
import validators # type: ignore
has_validators = True
except ImportError:
has_validators = False
@ -23,7 +24,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
:indicator: IOC to search VirusTotal for
'''
def __init__(self, apikey, indicator, vt_proxies=None, standalone=True, **kwargs):
def __init__(self, apikey: str, indicator: str, vt_proxies: Optional[dict]=None, standalone: bool=True, **kwargs):
# PY3 way:
# super().__init__("virustotal-report")
super(VTReportObject, self).__init__("virustotal-report", standalone=standalone, **kwargs)
@ -47,7 +48,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
ratio = "{}/{}".format(self._report["positives"], self._report["total"])
self.add_attribute("detection-ratio", value=ratio)
def __validate_resource(self, ioc):
def __validate_resource(self, ioc: str):
'''
Validate the data type of an indicator.
Domains and IP addresses aren't supported because
@ -63,7 +64,7 @@ class VTReportObject(AbstractMISPObjectGenerator):
return "file"
return False
def __query_virustotal(self, apikey, resource):
def __query_virustotal(self, apikey: str, resource: str):
'''
Query VirusTotal for information about an indicator
@ -78,9 +79,9 @@ class VTReportObject(AbstractMISPObjectGenerator):
report = requests.get(url, params=params, proxies=self._proxies)
else:
report = requests.get(url, params=params)
report = report.json()
if report["response_code"] == 1:
report_json = report.json()
if report_json["response_code"] == 1:
return report
else:
error_msg = "{}: {}".format(resource, report["verbose_msg"])
error_msg = "{}: {}".format(resource, report_json["verbose_msg"])
raise InvalidMISPObject(error_msg)

View File

@ -12,7 +12,7 @@ from io import BytesIO
import json
from pathlib import Path
import urllib3
import urllib3 # type: ignore
import time
from uuid import uuid4
@ -37,7 +37,7 @@ except ImportError:
raise
try:
from keys import url, key
from keys import url, key # type: ignore
verifycert = False
except ImportError as e:
print(e)

View File

@ -6,7 +6,7 @@ import sys
import unittest
import subprocess
import urllib3
import urllib3 # type: ignore
import logging
logging.disable(logging.CRITICAL)