Merge branch 'ldbo-issue-593-change-standalone-policy' into main

pull/604/head
Raphaël Vinot 2020-07-01 18:25:22 +02:00
commit ec28820cf4
20 changed files with 85 additions and 49 deletions

View File

@ -170,6 +170,14 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
"""Set __not_jsonable to a new list""" """Set __not_jsonable to a new list"""
self.__not_jsonable = args self.__not_jsonable = args
def _remove_from_not_jsonable(self, *args) -> None:
"""Remove the entries that are in the __not_jsonable list"""
for entry in args:
try:
self.__not_jsonable.remove(entry)
except ValueError:
pass
def from_json(self, json_string: str) -> None: def from_json(self, json_string: str) -> None:
"""Load a JSON string""" """Load a JSON string"""
self.from_dict(**loads(json_string)) self.from_dict(**loads(json_string))

View File

@ -331,7 +331,7 @@ class PyMISP:
misp_object_r = self._check_json_response(r) misp_object_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in misp_object_r: if not (self.global_pythonify or pythonify) or 'errors' in misp_object_r:
return misp_object_r return misp_object_r
o = MISPObject(misp_object_r['Object']['name']) o = MISPObject(misp_object_r['Object']['name'], standalone=False)
o.from_dict(**misp_object_r) o.from_dict(**misp_object_r)
return o return o
@ -342,7 +342,7 @@ class PyMISP:
new_object = self._check_json_response(r) new_object = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_object: if not (self.global_pythonify or pythonify) or 'errors' in new_object:
return new_object return new_object
o = MISPObject(new_object['Object']['name']) o = MISPObject(new_object['Object']['name'], standalone=False)
o.from_dict(**new_object) o.from_dict(**new_object)
return o return o
@ -356,7 +356,7 @@ class PyMISP:
updated_object = self._check_json_response(r) updated_object = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_object: if not (self.global_pythonify or pythonify) or 'errors' in updated_object:
return updated_object return updated_object
o = MISPObject(updated_object['Object']['name']) o = MISPObject(updated_object['Object']['name'], standalone=False)
o.from_dict(**updated_object) o.from_dict(**updated_object)
return o return o

View File

@ -603,7 +603,7 @@ class MISPObject(AbstractMISP):
'sharing_group_id', 'comment', 'first_seen', 'last_seen', 'sharing_group_id', 'comment', 'first_seen', 'last_seen',
'deleted'} 'deleted'}
def __init__(self, name: str, strict: bool=False, standalone: bool=False, default_attributes_parameters: dict={}, **kwargs): def __init__(self, name: str, strict: bool=False, standalone: bool=True, default_attributes_parameters: dict={}, **kwargs):
''' Master class representing a generic MISP object ''' Master class representing a generic MISP object
:name: Name of the object :name: Name of the object
@ -629,6 +629,7 @@ class MISPObject(AbstractMISP):
self.last_seen: datetime self.last_seen: datetime
self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes] self.__fast_attribute_access: dict = defaultdict(list) # Hashtable object_relation: [attributes]
self.ObjectReference: List[MISPObjectReference] = [] self.ObjectReference: List[MISPObjectReference] = []
self._standalone: bool = False
self.Attribute: List[MISPObjectAttribute] = [] self.Attribute: List[MISPObjectAttribute] = []
self.SharingGroup: MISPSharingGroup self.SharingGroup: MISPSharingGroup
self._default_attributes_parameters: dict self._default_attributes_parameters: dict
@ -656,10 +657,7 @@ class MISPObject(AbstractMISP):
else: else:
self.distribution = 5 # Default to inherit self.distribution = 5 # Default to inherit
self.sharing_group_id = 0 self.sharing_group_id = 0
self._standalone = standalone self.standalone = standalone
if self._standalone:
# Mark as non_jsonable because we need to add the references manually after the object(s) have been created
self.update_not_jsonable('ObjectReference')
def _load_template_path(self, template_path: Union[Path, str]) -> bool: def _load_template_path(self, template_path: Union[Path, str]) -> bool:
self._definition: Optional[Dict] = self._load_json(template_path) self._definition: Optional[Dict] = self._load_json(template_path)
@ -742,6 +740,21 @@ class MISPObject(AbstractMISP):
else: else:
raise PyMISPError('All the attributes have to be of type MISPObjectReference.') raise PyMISPError('All the attributes have to be of type MISPObjectReference.')
@property
def standalone(self):
return self._standalone
@standalone.setter
def standalone(self, new_standalone: bool):
if self._standalone != new_standalone:
if new_standalone:
self.update_not_jsonable("ObjectReference")
else:
self._remove_from_not_jsonable("ObjectReference")
self._standalone = new_standalone
else:
pass
def from_dict(self, **kwargs): def from_dict(self, **kwargs):
if 'Object' in kwargs: if 'Object' in kwargs:
kwargs = kwargs['Object'] kwargs = kwargs['Object']
@ -1385,6 +1398,7 @@ class MISPEvent(AbstractMISP):
misp_obj.from_dict(**kwargs) misp_obj.from_dict(**kwargs)
else: else:
raise InvalidMISPObject("An object to add to an existing Event needs to be either a MISPObject, or a plain python dictionary") raise InvalidMISPObject("An object to add to an existing Event needs to be either a MISPObject, or a plain python dictionary")
misp_obj.standalone = False
self.Object.append(misp_obj) self.Object.append(misp_obj)
self.edited = True self.edited = True
return misp_obj return misp_obj

View File

@ -9,8 +9,8 @@ logger = logging.getLogger('pymisp')
class ASNObject(AbstractMISPObjectGenerator): class ASNObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): def __init__(self, parameters: dict, strict: bool=True, **kwargs):
super(ASNObject, self).__init__('asn', strict=strict, standalone=standalone, **kwargs) super(ASNObject, self).__init__('asn', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()

View File

@ -9,8 +9,8 @@ logger = logging.getLogger('pymisp')
class DomainIPObject(AbstractMISPObjectGenerator): class DomainIPObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): def __init__(self, parameters: dict, strict: bool=True, **kwargs):
super(DomainIPObject, self).__init__('domain-ip', strict=strict, standalone=standalone, **kwargs) super(DomainIPObject, self).__init__('domain-ip', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()

View File

@ -32,8 +32,8 @@ def make_elf_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone
class ELFObject(AbstractMISPObjectGenerator): class ELFObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: lief.ELF.Binary=None, filepath: Union[Path, str]=None, pseudofile: Union[BytesIO, bytes]=None, standalone: bool=True, **kwargs): def __init__(self, parsed: lief.ELF.Binary=None, filepath: Union[Path, str]=None, pseudofile: Union[BytesIO, bytes]=None, **kwargs):
super(ELFObject, self).__init__('elf', standalone=standalone, **kwargs) super(ELFObject, self).__init__('elf', **kwargs)
if not HAS_PYDEEP: if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git") logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if pseudofile: if pseudofile:
@ -64,7 +64,7 @@ class ELFObject(AbstractMISPObjectGenerator):
if self.__elf.sections: if self.__elf.sections:
pos = 0 pos = 0
for section in self.__elf.sections: for section in self.__elf.sections:
s = ELFSectionObject(section, self._standalone, default_attributes_parameters=self._default_attributes_parameters) s = ELFSectionObject(section, standalone=self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'includes', 'Section {} of ELF'.format(pos)) self.add_reference(s.uuid, 'includes', 'Section {} of ELF'.format(pos))
pos += 1 pos += 1
self.sections.append(s) self.sections.append(s)
@ -73,10 +73,10 @@ class ELFObject(AbstractMISPObjectGenerator):
class ELFSectionObject(AbstractMISPObjectGenerator): class ELFSectionObject(AbstractMISPObjectGenerator):
def __init__(self, section: lief.ELF.Section, standalone: bool=True, **kwargs): def __init__(self, section: lief.ELF.Section, **kwargs):
# Python3 way # Python3 way
# super().__init__('pe-section') # super().__init__('pe-section')
super(ELFSectionObject, self).__init__('elf-section', standalone=standalone, **kwargs) super(ELFSectionObject, self).__init__('elf-section', **kwargs)
self.__section = section self.__section = section
self.__data = bytes(self.__section.content) self.__data = bytes(self.__section.content)
self.generate_attributes() self.generate_attributes()

View File

@ -14,10 +14,10 @@ logger = logging.getLogger('pymisp')
class EMailObject(AbstractMISPObjectGenerator): class EMailObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, attach_original_email: bool=True, standalone: bool=True, **kwargs): def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, attach_original_email: bool=True, **kwargs):
# PY3 way: # PY3 way:
# super().__init__('file') # super().__init__('file')
super(EMailObject, self).__init__('email', standalone=standalone, **kwargs) super(EMailObject, self).__init__('email', **kwargs)
if filepath: if filepath:
with open(filepath, 'rb') as f: with open(filepath, 'rb') as f:
self.__pseudofile = BytesIO(f.read()) self.__pseudofile = BytesIO(f.read())

View File

@ -9,8 +9,8 @@ logger = logging.getLogger('pymisp')
class Fail2BanObject(AbstractMISPObjectGenerator): class Fail2BanObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): def __init__(self, parameters: dict, strict: bool=True, **kwargs):
super(Fail2BanObject, self).__init__('fail2ban', strict=strict, standalone=standalone, **kwargs) super(Fail2BanObject, self).__init__('fail2ban', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()

View File

@ -30,10 +30,10 @@ except ImportError:
class FileObject(AbstractMISPObjectGenerator): class FileObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, filename: str=None, standalone: bool=True, **kwargs): def __init__(self, filepath: Union[Path, str]=None, pseudofile: BytesIO=None, filename: str=None, **kwargs):
# PY3 way: # PY3 way:
# super().__init__('file') # super().__init__('file')
super(FileObject, self).__init__('file', standalone=standalone, **kwargs) super(FileObject, self).__init__('file', **kwargs)
if not HAS_PYDEEP: if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git") logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if not HAS_MAGIC: if not HAS_MAGIC:

View File

@ -9,8 +9,8 @@ logger = logging.getLogger('pymisp')
class GeolocationObject(AbstractMISPObjectGenerator): class GeolocationObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): def __init__(self, parameters: dict, strict: bool=True, **kwargs):
super(GeolocationObject, self).__init__('asn', strict=strict, standalone=standalone, **kwargs) super(GeolocationObject, self).__init__('asn', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()

View File

@ -9,8 +9,8 @@ logger = logging.getLogger('pymisp')
class GitVulnFinderObject(AbstractMISPObjectGenerator): class GitVulnFinderObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool=True, standalone: bool=True, **kwargs): def __init__(self, parameters: dict, strict: bool=True, **kwargs):
super(GitVulnFinderObject, self).__init__('git-vuln-finder', strict=strict, standalone=standalone, **kwargs) super(GitVulnFinderObject, self).__init__('git-vuln-finder', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()

View File

@ -32,10 +32,10 @@ def make_macho_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalo
class MachOObject(AbstractMISPObjectGenerator): class MachOObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: Optional[lief.MachO.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, standalone: bool=True, **kwargs): def __init__(self, parsed: Optional[lief.MachO.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, **kwargs):
# Python3 way # Python3 way
# super().__init__('elf') # super().__init__('elf')
super(MachOObject, self).__init__('macho', standalone=standalone, **kwargs) super(MachOObject, self).__init__('macho', **kwargs)
if not HAS_PYDEEP: if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git") logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if pseudofile: if pseudofile:
@ -66,7 +66,7 @@ class MachOObject(AbstractMISPObjectGenerator):
if self.__macho.sections: if self.__macho.sections:
pos = 0 pos = 0
for section in self.__macho.sections: for section in self.__macho.sections:
s = MachOSectionObject(section, self._standalone, default_attributes_parameters=self._default_attributes_parameters) s = MachOSectionObject(section, standalone=self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'includes', 'Section {} of MachO'.format(pos)) self.add_reference(s.uuid, 'includes', 'Section {} of MachO'.format(pos))
pos += 1 pos += 1
self.sections.append(s) self.sections.append(s)
@ -75,10 +75,10 @@ class MachOObject(AbstractMISPObjectGenerator):
class MachOSectionObject(AbstractMISPObjectGenerator): class MachOSectionObject(AbstractMISPObjectGenerator):
def __init__(self, section: lief.MachO.Section, standalone: bool=True, **kwargs): def __init__(self, section: lief.MachO.Section, **kwargs):
# Python3 way # Python3 way
# super().__init__('pe-section') # super().__init__('pe-section')
super(MachOSectionObject, self).__init__('macho-section', standalone=standalone, **kwargs) super(MachOSectionObject, self).__init__('macho-section', **kwargs)
self.__section = section self.__section = section
self.__data = bytes(self.__section.content) self.__data = bytes(self.__section.content)
self.generate_attributes() self.generate_attributes()

View File

@ -11,8 +11,8 @@ logger = logging.getLogger('pymisp')
class MicroblogObject(AbstractMISPObjectGenerator): class MicroblogObject(AbstractMISPObjectGenerator):
def __init__(self, parameters: dict, strict: bool = True, standalone: bool = True, **kwargs): def __init__(self, parameters: dict, strict: bool = True, **kwargs):
super(MicroblogObject, self).__init__('microblog', strict=strict, standalone=standalone, **kwargs) super(MicroblogObject, self).__init__('microblog', strict=strict, **kwargs)
self._parameters = parameters self._parameters = parameters
self.generate_attributes() self.generate_attributes()

View File

@ -34,10 +34,10 @@ def make_pe_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone:
class PEObject(AbstractMISPObjectGenerator): class PEObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: Optional[lief.PE.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, standalone: bool=True, **kwargs): def __init__(self, parsed: Optional[lief.PE.Binary]=None, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None, **kwargs):
# Python3 way # Python3 way
# super().__init__('pe') # super().__init__('pe')
super(PEObject, self).__init__('pe', standalone=standalone, **kwargs) super(PEObject, self).__init__('pe', **kwargs)
if not HAS_PYDEEP: if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git") logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if pseudofile: if pseudofile:
@ -111,7 +111,7 @@ class PEObject(AbstractMISPObjectGenerator):
if self.__pe.sections: if self.__pe.sections:
pos = 0 pos = 0
for section in self.__pe.sections: for section in self.__pe.sections:
s = PESectionObject(section, self._standalone, default_attributes_parameters=self._default_attributes_parameters) s = PESectionObject(section, standalone=self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'includes', 'Section {} of PE'.format(pos)) self.add_reference(s.uuid, 'includes', 'Section {} of PE'.format(pos))
if ((self.__pe.entrypoint >= section.virtual_address) if ((self.__pe.entrypoint >= section.virtual_address)
and (self.__pe.entrypoint < (section.virtual_address + section.virtual_size))): and (self.__pe.entrypoint < (section.virtual_address + section.virtual_size))):
@ -124,10 +124,10 @@ class PEObject(AbstractMISPObjectGenerator):
class PESectionObject(AbstractMISPObjectGenerator): class PESectionObject(AbstractMISPObjectGenerator):
def __init__(self, section: lief.PE.Section, standalone: bool=True, **kwargs): def __init__(self, section: lief.PE.Section, **kwargs):
# Python3 way # Python3 way
# super().__init__('pe-section') # super().__init__('pe-section')
super(PESectionObject, self).__init__('pe-section', standalone=standalone, **kwargs) super(PESectionObject, self).__init__('pe-section', **kwargs)
self.__section = section self.__section = section
self.__data = bytes(self.__section.content) self.__data = bytes(self.__section.content)
self.generate_attributes() self.generate_attributes()

View File

@ -8,7 +8,7 @@ class SBSignatureObject(AbstractMISPObjectGenerator):
''' '''
Sandbox Analyzer Sandbox Analyzer
''' '''
def __init__(self, software: str, report: list, standalone: bool=True, **kwargs): def __init__(self, software: str, report: list, **kwargs):
super(SBSignatureObject, self).__init__("sb-signature", **kwargs) super(SBSignatureObject, self).__init__("sb-signature", **kwargs)
self._software = software self._software = software
self._report = report self._report = report

View File

@ -13,10 +13,10 @@ logger = logging.getLogger('pymisp')
class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator): class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator):
def __init__(self, authorized_keys_path: Optional[Union[Path, str]]=None, authorized_keys_pseudofile: Optional[StringIO]=None, standalone: bool=True, **kwargs): def __init__(self, authorized_keys_path: Optional[Union[Path, str]]=None, authorized_keys_pseudofile: Optional[StringIO]=None, **kwargs):
# PY3 way: # PY3 way:
# super().__init__('file') # super().__init__('file')
super(SSHAuthorizedKeysObject, self).__init__('ssh-authorized-keys', standalone=standalone, **kwargs) super(SSHAuthorizedKeysObject, self).__init__('ssh-authorized-keys', **kwargs)
if authorized_keys_path: if authorized_keys_path:
with open(authorized_keys_path, 'r') as f: with open(authorized_keys_path, 'r') as f:
self.__pseudofile = StringIO(f.read()) self.__pseudofile = StringIO(f.read())

View File

@ -13,10 +13,10 @@ faup = Faup()
class URLObject(AbstractMISPObjectGenerator): class URLObject(AbstractMISPObjectGenerator):
def __init__(self, url: str, standalone: bool=True, **kwargs): def __init__(self, url: str, **kwargs):
# PY3 way: # PY3 way:
# super().__init__('file') # super().__init__('file')
super(URLObject, self).__init__('url', standalone=standalone, **kwargs) super(URLObject, self).__init__('url', **kwargs)
faup.decode(unquote_plus(url)) faup.decode(unquote_plus(url))
self.generate_attributes() self.generate_attributes()

View File

@ -17,8 +17,8 @@ class VehicleObject(AbstractMISPObjectGenerator):
'uk': "http://www.regcheck.org.uk/api/reg.asmx/Check" 'uk': "http://www.regcheck.org.uk/api/reg.asmx/Check"
} }
def __init__(self, country: str, registration: str, username: str, standalone=True, **kwargs): def __init__(self, country: str, registration: str, username: str, **kwargs):
super(VehicleObject, self).__init__("vehicle", standalone=standalone, **kwargs) super(VehicleObject, self).__init__("vehicle", **kwargs)
self._country = country self._country = country
self._registration = registration self._registration = registration
self._username = username self._username = username

View File

@ -24,10 +24,10 @@ class VTReportObject(AbstractMISPObjectGenerator):
:indicator: IOC to search VirusTotal for :indicator: IOC to search VirusTotal for
''' '''
def __init__(self, apikey: str, indicator: str, vt_proxies: Optional[dict]=None, standalone: bool=True, **kwargs): def __init__(self, apikey: str, indicator: str, vt_proxies: Optional[dict]=None, **kwargs):
# PY3 way: # PY3 way:
# super().__init__("virustotal-report") # super().__init__("virustotal-report")
super(VTReportObject, self).__init__("virustotal-report", standalone=standalone, **kwargs) super(VTReportObject, self).__init__("virustotal-report", **kwargs)
indicator = indicator.strip() indicator = indicator.strip()
self._resource_type = self.__validate_resource(indicator) self._resource_type = self.__validate_resource(indicator)
if self._resource_type: if self._resource_type:

View File

@ -9,7 +9,8 @@ import glob
import hashlib import hashlib
from datetime import date, datetime from datetime import date, datetime
from pymisp import MISPEvent, MISPSighting, MISPTag, MISPOrganisation from pymisp import (MISPEvent, MISPSighting, MISPTag, MISPOrganisation,
MISPObject)
from pymisp.exceptions import InvalidMISPObject from pymisp.exceptions import InvalidMISPObject
from pymisp.tools import GitVulnFinderObject from pymisp.tools import GitVulnFinderObject
@ -201,6 +202,19 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2)) self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_obj_references_export(self):
self.init_event()
obj1 = MISPObject(name="file")
obj2 = MISPObject(name="url", standalone=False)
obj1.add_reference(obj2, "downloads")
obj2.add_reference(obj1, "downloaded-by")
self.assertFalse("ObjectReference" in obj1.jsonable())
self.assertTrue("ObjectReference" in obj2.jsonable())
self.mispevent.add_object(obj1)
obj2.standalone = True
self.assertTrue("ObjectReference" in obj1.jsonable())
self.assertFalse("ObjectReference" in obj2.jsonable())
def test_event_not_edited(self): def test_event_not_edited(self):
self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json') self.mispevent.load_file('tests/mispevent_testfiles/existing_event.json')
self.assertFalse(self.mispevent.edited) self.assertFalse(self.mispevent.edited)