chg: Use lief 0.11.0, generate authenticode entries

pull/684/head
Raphaël Vinot 2021-01-19 15:44:58 +01:00
parent b610b388f8
commit 76c4f92c17
5 changed files with 123 additions and 13 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from pymisp import ExpandedPyMISP from pymisp import PyMISP
from pymisp.tools import make_binary_objects from pymisp.tools import make_binary_objects
import traceback import traceback
from keys import misp_url, misp_key, misp_verifycert from keys import misp_url, misp_key, misp_verifycert
@ -14,7 +14,7 @@ if __name__ == '__main__':
parser.add_argument("-p", "--path", required=True, help="Path to process (expanded using glob).") parser.add_argument("-p", "--path", required=True, help="Path to process (expanded using glob).")
args = parser.parse_args() args = parser.parse_args()
pymisp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert) pymisp = PyMISP(misp_url, misp_key, misp_verifycert)
for f in glob.glob(args.path): for f in glob.glob(args.path):
try: try:
@ -28,6 +28,15 @@ if __name__ == '__main__':
r = pymisp.add_object(args.event, s) r = pymisp.add_object(args.event, s)
if peo: if peo:
if hasattr(peo, 'certificates') and hasattr(peo, 'signers'):
# special authenticode case for PE objects
for c in peo.certificates:
pymisp.add_object(args.event, c, pythonify=True)
for s in peo.signers:
pymisp.add_object(args.event, s, pythonify=True)
del peo.certificates
del peo.signers
del peo.sections
r = pymisp.add_object(args.event, peo, pythonify=True) r = pymisp.add_object(args.event, peo, pythonify=True)
for ref in peo.ObjectReference: for ref in peo.ObjectReference:
r = pymisp.add_object_reference(ref) r = pymisp.add_object_reference(ref)

View File

@ -906,9 +906,18 @@ class MISPObject(AbstractMISP):
if simple_value is not None: # /!\ The value *can* be 0 if simple_value is not None: # /!\ The value *can* be 0
value = {'value': simple_value} value = {'value': simple_value}
if value.get('value') is None: if value.get('value') is None:
logger.warning("The value of the attribute you're trying to add is None, skipping it. Object relation: {}".format(object_relation)) logger.warning("The value of the attribute you're trying to add is None or empty string, skipping it. Object relation: {}".format(object_relation))
return None return None
else: else:
if isinstance(value['value'], bytes):
# That shouldn't happen, but we live in the real world, and it does.
# So we try to decode (otherwise, MISP barf), and raise a warning if needed.
try:
value['value'] = value['value'].decode()
except Exception:
logger.warning("The value of the attribute you're trying to add is a bytestream ({!r}), and we're unable to make it a string.".format(value['value']))
return None
# Make sure we're not adding an empty value. # Make sure we're not adding an empty value.
if isinstance(value['value'], str): if isinstance(value['value'], str):
value['value'] = value['value'].strip() value['value'] = value['value'].strip()

View File

@ -13,8 +13,7 @@ logger = logging.getLogger('pymisp')
try: try:
import lief # type: ignore import lief # type: ignore
from lief import Logger # type: ignore lief.logging.disable()
Logger.disable()
HAS_LIEF = True HAS_LIEF = True
from .peobject import make_pe_objects from .peobject import make_pe_objects

View File

@ -9,6 +9,7 @@ from datetime import datetime
import logging import logging
from typing import Optional, Union from typing import Optional, Union
from pathlib import Path from pathlib import Path
from base64 import b64encode
from . import FileObject from . import FileObject
@ -36,8 +37,7 @@ class PEObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: Optional[lief.PE.Binary] = None, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[BytesIO] = None, **kwargs): def __init__(self, parsed: Optional[lief.PE.Binary] = None, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[BytesIO] = None, **kwargs):
"""Creates an PE object, with lief""" """Creates an PE object, with lief"""
# Python3 way super().__init__('pe')
# super().__init__('pe')
super(PEObject, self).__init__('pe', **kwargs) super(PEObject, self).__init__('pe', **kwargs)
if not HAS_PYDEEP: if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git") logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
@ -88,7 +88,8 @@ class PEObject(AbstractMISPObjectGenerator):
# General information # General information
self.add_attribute('entrypoint-address', value=self.__pe.entrypoint) self.add_attribute('entrypoint-address', value=self.__pe.entrypoint)
self.add_attribute('compilation-timestamp', value=datetime.utcfromtimestamp(self.__pe.header.time_date_stamps).isoformat()) self.add_attribute('compilation-timestamp', value=datetime.utcfromtimestamp(self.__pe.header.time_date_stamps).isoformat())
# self.imphash = self.__pe.get_imphash() self.add_attribute('imphash', value=lief.PE.get_imphash(self.__pe, lief.PE.IMPHASH_MODE.PEFILE))
self.add_attribute('authentihash', value=self.__pe.authentihash_sha256.hex())
try: try:
if (self.__pe.has_resources if (self.__pe.has_resources
and self.__pe.resources_manager.has_version and self.__pe.resources_manager.has_version
@ -120,16 +121,64 @@ class PEObject(AbstractMISPObjectGenerator):
pos += 1 pos += 1
self.sections.append(s) self.sections.append(s)
self.add_attribute('number-sections', value=len(self.sections)) self.add_attribute('number-sections', value=len(self.sections))
# TODO: TLSSection / DIRECTORY_ENTRY_TLS # Signatures
self.certificates = []
self.signers = []
for sign in self.__pe.signatures:
for c in sign.certificates:
cert_obj = PECertificate(c)
self.add_reference(cert_obj.uuid, 'signed-by')
self.certificates.append(cert_obj)
for s_info in sign.signers:
signer_obj = PESigners(s_info)
self.add_reference(signer_obj.uuid, 'signed-by')
self.signers.append(signer_obj)
class PECertificate(AbstractMISPObjectGenerator):
def __init__(self, certificate: lief.PE.x509, **kwargs):
super().__init__('x509')
self.__certificate = certificate
self.generate_attributes()
def generate_attributes(self):
self.add_attribute('issuer', value=self.__certificate.issuer)
self.add_attribute('serial-number', value=self.__certificate.serial_number)
self.add_attribute('validity-not-before', value=datetime(*self.__certificate.valid_from))
self.add_attribute('validity-not-after', value=datetime(*self.__certificate.valid_to))
self.add_attribute('version', value=self.__certificate.version)
self.add_attribute('subject', value=self.__certificate.subject)
self.add_attribute('signature_algorithm', value=self.__certificate.signature_algorithm)
self.add_attribute('serial-number', value=self.__certificate.serial_number)
self.add_attribute('raw-base64', value=b64encode(self.__certificate.raw))
class PESigners(AbstractMISPObjectGenerator):
def __init__(self, signer: lief.PE.SignerInfo, **kwargs):
super().__init__('authenticode-signerinfo')
self.__signer = signer
self.generate_attributes()
def generate_attributes(self):
self.add_attribute('issuer', value=self.__signer.issuer)
self.add_attribute('serial-number', value=self.__signer.serial_number)
self.add_attribute('version', value=self.__signer.version)
self.add_attribute('digest_algorithm', value=self.__signer.digest_algorithm.name)
self.add_attribute('encryption_algorithm', value=self.__signer.encryption_algorithm.name)
self.add_attribute('digest-base64', value=b64encode(self.__signer.encrypted_digest))
info = self.__signer.get_attribute(lief.PE.SIG_ATTRIBUTE_TYPES.SPC_SP_OPUS_INFO)
if info:
self.add_attribute('program-name', value=info.program_name)
self.add_attribute('url', value=info.more_info)
class PESectionObject(AbstractMISPObjectGenerator): class PESectionObject(AbstractMISPObjectGenerator):
def __init__(self, section: lief.PE.Section, **kwargs): def __init__(self, section: lief.PE.Section, **kwargs):
"""Creates an PE Section object. Object generated by PEObject.""" """Creates an PE Section object. Object generated by PEObject."""
# Python3 way super().__init__('pe-section')
# super().__init__('pe-section')
super(PESectionObject, self).__init__('pe-section', **kwargs)
self.__section = section self.__section = section
self.__data = bytes(self.__section.content) self.__data = bytes(self.__section.content)
self.generate_attributes() self.generate_attributes()

View File

@ -1434,7 +1434,7 @@ class TestComprehensive(unittest.TestCase):
r = self.user_misp_connector.add_object(first, s) r = self.user_misp_connector.add_object(first, s)
self.assertEqual(r.name, 'pe-section', r) self.assertEqual(r.name, 'pe-section', r)
r = self.user_misp_connector.add_object(first, peo) r = self.user_misp_connector.add_object(first, peo, pythonify=True)
self.assertEqual(r.name, 'pe', r) self.assertEqual(r.name, 'pe', r)
for ref in peo.ObjectReference: for ref in peo.ObjectReference:
r = self.user_misp_connector.add_object_reference(ref) r = self.user_misp_connector.add_object_reference(ref)
@ -1453,6 +1453,50 @@ class TestComprehensive(unittest.TestCase):
# Delete event # Delete event
self.admin_misp_connector.delete_event(first) self.admin_misp_connector.delete_event(first)
def test_lief_and_sign(self):
first = self.create_simple_event()
try:
first = self.user_misp_connector.add_event(first)
fo, peo, seos = make_binary_objects('tests/viper-test-files/test_files/chromeinstall-8u31.exe')
# Make sure VT imphash is the same as the one generated by lief
vtimphash = '697c52d3bf08cccfd62da7bc503fdceb'
imphash = peo.get_attributes_by_relation('imphash')[0]
self.assertEqual(imphash.value, vtimphash)
# Make sure VT authentihash is the same as the one generated by lief
vtauthentihash = 'eb7be5a6f8ef4c2da5a183b4a3177153183e344038c56a00f5d88570a373d858'
authentihash = peo.get_attributes_by_relation('authentihash')[0]
self.assertEqual(authentihash.value, vtauthentihash)
# The following is a duplicate of examples/add_file_object.py
if seos:
for s in seos:
self.user_misp_connector.add_object(first, s)
if peo:
if hasattr(peo, 'certificates') and hasattr(peo, 'signers'):
# special authenticode case for PE objects
for c in peo.certificates:
self.user_misp_connector.add_object(first, c, pythonify=True)
for s in peo.signers:
self.user_misp_connector.add_object(first, s, pythonify=True)
del peo.certificates
del peo.signers
del peo.sections
self.user_misp_connector.add_object(first, peo, pythonify=True)
for ref in peo.ObjectReference:
self.user_misp_connector.add_object_reference(ref)
if fo:
self.user_misp_connector.add_object(first, fo, pythonify=True)
for ref in fo.ObjectReference:
self.user_misp_connector.add_object_reference(ref)
first = self.user_misp_connector.get_event(first, pythonify=True)
self.assertEqual(len(first.objects), 10, first.objects)
finally:
# Delete event
self.admin_misp_connector.delete_event(first)
def test_add_event_with_attachment(self): def test_add_event_with_attachment(self):
first = self.create_simple_event() first = self.create_simple_event()
try: try: