diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 76e8d6e..0826b8c 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -207,6 +207,9 @@ class MISPAttribute(AbstractMISP): self.Event: MISPEvent self.RelatedAttribute: List[MISPAttribute] + # For malware sample + self._malware_binary: Optional[BytesIO] + def add_tag(self, tag: Optional[Union[str, MISPTag, Dict]] = None, **kwargs) -> MISPTag: return super()._add_tag(tag, **kwargs) @@ -246,8 +249,8 @@ class MISPAttribute(AbstractMISP): with f.open(name, pwd=b'infected') as unpacked: self.malware_filename = unpacked.read().decode().strip() else: - with f.open(name, pwd=b'infected') as unpacked: - self._malware_binary = BytesIO(unpacked.read()) + # decrypting a zipped file is extremely slow. We do it on-demand in self.malware_binary + continue except Exception: # not a encrypted zip file, assuming it is a new malware sample self._prepare_new_malware_sample() @@ -307,7 +310,19 @@ class MISPAttribute(AbstractMISP): @property def malware_binary(self) -> Optional[BytesIO]: """Returns a BytesIO of the malware (if the attribute has one, obvs).""" + if self.type != 'malware-sample': + # Not a malware sample + return None if hasattr(self, '_malware_binary'): + # Already unpacked + return self._malware_binary + elif hasattr(self, 'malware_filename'): + # Have a binary, but didn't decrypt it yet + with ZipFile(self.data) as f: # type: ignore + for name in f.namelist(): + if not name.endswith('.filename.txt'): + with f.open(name, pwd=b'infected') as unpacked: + self._malware_binary = BytesIO(unpacked.read()) return self._malware_binary return None @@ -514,11 +529,7 @@ class MISPAttribute(AbstractMISP): else: # Assuming the user only passed the filename self.malware_filename = self.value - # m = hashlib.md5() - # m.update(self.data.getvalue()) self.value = self.malware_filename - # md5 = m.hexdigest() - # self.value = '{}|{}'.format(self.malware_filename, md5) self._malware_binary = self.data self.encrypt = True diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 72cdb2e..13ef9cc 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -11,6 +11,7 @@ from datetime import datetime, timedelta, date, timezone from io import BytesIO import json from pathlib import Path +import hashlib import urllib3 # type: ignore import time @@ -2220,11 +2221,36 @@ class TestComprehensive(unittest.TestCase): def test_expansion(self): first = self.create_simple_event() try: - with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f: - first.add_attribute('malware-sample', value='whoami.exe', data=BytesIO(f.read()), expand='binary') + md5_disk = hashlib.md5() + with open('tests/viper-test-files/test_files/sample2.pe', 'rb') as f: + filecontent = f.read() + md5_disk.update(filecontent) + malware_sample_initial_attribute = first.add_attribute('malware-sample', value='Big PE sample', data=BytesIO(filecontent), expand='binary') + md5_init_attribute = hashlib.md5() + md5_init_attribute.update(malware_sample_initial_attribute.malware_binary.getvalue()) + self.assertEqual(md5_init_attribute.digest(), md5_disk.digest()) + first.run_expansions() first = self.admin_misp_connector.add_event(first, pythonify=True) - self.assertEqual(len(first.objects), 7) + self.assertEqual(len(first.objects), 8, first.objects) + # Speed test + # # reference time + start = time.time() + self.admin_misp_connector.get_event(first.id, pythonify=False) + ref_time = time.time() - start + # # Speed test pythonify + start = time.time() + first = self.admin_misp_connector.get_event(first.id, pythonify=True) + pythonify_time = time.time() - start + self.assertTrue((pythonify_time - ref_time) <= 0.5, f'Pythonify too slow: {ref_time} vs. {pythonify_time}.') + + # Test on demand decrypt malware binary + file_objects = first.get_objects_by_name('file') + samples = file_objects[0].get_attributes_by_relation('malware-sample') + binary = samples[0].malware_binary + md5_from_server = hashlib.md5() + md5_from_server.update(binary.getvalue()) + self.assertEqual(md5_from_server.digest(), md5_disk.digest()) finally: # Delete event self.admin_misp_connector.delete_event(first)