mirror of https://github.com/MISP/PyMISP
chg: on-demand decryption of malware-binary, speeds up pythonify.
parent
4c2ee4fd2f
commit
fe91e10ced
|
@ -207,6 +207,9 @@ class MISPAttribute(AbstractMISP):
|
|||
self.Event: MISPEvent
|
||||
self.RelatedAttribute: List[MISPAttribute]
|
||||
|
||||
# For malware sample
|
||||
self._malware_binary: Optional[BytesIO]
|
||||
|
||||
def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag:
|
||||
return super()._add_tag(tag, **kwargs)
|
||||
|
||||
|
@ -246,8 +249,8 @@ class MISPAttribute(AbstractMISP):
|
|||
with f.open(name, pwd=b'infected') as unpacked:
|
||||
self.malware_filename = unpacked.read().decode().strip()
|
||||
else:
|
||||
with f.open(name, pwd=b'infected') as unpacked:
|
||||
self._malware_binary = BytesIO(unpacked.read())
|
||||
# decrypting a zipped file is extremely slow. We do it on-demand in self.malware_binary
|
||||
continue
|
||||
except Exception:
|
||||
# not a encrypted zip file, assuming it is a new malware sample
|
||||
self._prepare_new_malware_sample()
|
||||
|
@ -307,7 +310,19 @@ class MISPAttribute(AbstractMISP):
|
|||
@property
|
||||
def malware_binary(self) -> Optional[BytesIO]:
|
||||
"""Returns a BytesIO of the malware (if the attribute has one, obvs)."""
|
||||
if self.type != 'malware-sample':
|
||||
# Not a malware sample
|
||||
return None
|
||||
if hasattr(self, '_malware_binary'):
|
||||
# Already unpacked
|
||||
return self._malware_binary
|
||||
elif hasattr(self, 'malware_filename'):
|
||||
# Have a binary, but didn't decrypt it yet
|
||||
with ZipFile(self.data) as f: # type: ignore
|
||||
for name in f.namelist():
|
||||
if not name.endswith('.filename.txt'):
|
||||
with f.open(name, pwd=b'infected') as unpacked:
|
||||
self._malware_binary = BytesIO(unpacked.read())
|
||||
return self._malware_binary
|
||||
return None
|
||||
|
||||
|
@ -514,11 +529,7 @@ class MISPAttribute(AbstractMISP):
|
|||
else:
|
||||
# Assuming the user only passed the filename
|
||||
self.malware_filename = self.value
|
||||
# m = hashlib.md5()
|
||||
# m.update(self.data.getvalue())
|
||||
self.value = self.malware_filename
|
||||
# md5 = m.hexdigest()
|
||||
# self.value = '{}|{}'.format(self.malware_filename, md5)
|
||||
self._malware_binary = self.data
|
||||
self.encrypt = True
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ from datetime import datetime, timedelta, date, timezone
|
|||
from io import BytesIO
|
||||
import json
|
||||
from pathlib import Path
|
||||
import hashlib
|
||||
|
||||
import urllib3 # type: ignore
|
||||
import time
|
||||
|
@ -2220,11 +2221,36 @@ class TestComprehensive(unittest.TestCase):
|
|||
def test_expansion(self):
|
||||
first = self.create_simple_event()
|
||||
try:
|
||||
with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f:
|
||||
first.add_attribute('malware-sample', value='whoami.exe', data=BytesIO(f.read()), expand='binary')
|
||||
md5_disk = hashlib.md5()
|
||||
with open('tests/viper-test-files/test_files/sample2.pe', 'rb') as f:
|
||||
filecontent = f.read()
|
||||
md5_disk.update(filecontent)
|
||||
malware_sample_initial_attribute = first.add_attribute('malware-sample', value='Big PE sample', data=BytesIO(filecontent), expand='binary')
|
||||
md5_init_attribute = hashlib.md5()
|
||||
md5_init_attribute.update(malware_sample_initial_attribute.malware_binary.getvalue())
|
||||
self.assertEqual(md5_init_attribute.digest(), md5_disk.digest())
|
||||
|
||||
first.run_expansions()
|
||||
first = self.admin_misp_connector.add_event(first, pythonify=True)
|
||||
self.assertEqual(len(first.objects), 7)
|
||||
self.assertEqual(len(first.objects), 8, first.objects)
|
||||
# Speed test
|
||||
# # reference time
|
||||
start = time.time()
|
||||
self.admin_misp_connector.get_event(first.id, pythonify=False)
|
||||
ref_time = time.time() - start
|
||||
# # Speed test pythonify
|
||||
start = time.time()
|
||||
first = self.admin_misp_connector.get_event(first.id, pythonify=True)
|
||||
pythonify_time = time.time() - start
|
||||
self.assertTrue((pythonify_time - ref_time) <= 0.5, f'Pythonify too slow: {ref_time} vs. {pythonify_time}.')
|
||||
|
||||
# Test on demand decrypt malware binary
|
||||
file_objects = first.get_objects_by_name('file')
|
||||
samples = file_objects[0].get_attributes_by_relation('malware-sample')
|
||||
binary = samples[0].malware_binary
|
||||
md5_from_server = hashlib.md5()
|
||||
md5_from_server.update(binary.getvalue())
|
||||
self.assertEqual(md5_from_server.digest(), md5_disk.digest())
|
||||
finally:
|
||||
# Delete event
|
||||
self.admin_misp_connector.delete_event(first)
|
||||
|
|
Loading…
Reference in New Issue