mirror of https://github.com/MISP/PyMISP
fix: do not skip data in add_attribute methods
parent
35377399e8
commit
94c2a644af
|
@ -181,7 +181,7 @@ class MISPAttribute(AbstractMISP):
|
|||
self.__category_type_mapping: dict = self.describe_types['category_type_mappings']
|
||||
self.__sane_default: dict = self.describe_types['sane_defaults']
|
||||
self.__strict: bool = strict
|
||||
self._data: Optional[BytesIO] = None
|
||||
self.data: Optional[BytesIO] = None
|
||||
self.first_seen: datetime
|
||||
self.last_seen: datetime
|
||||
self.uuid: str = str(uuid.uuid4())
|
||||
|
@ -207,6 +207,37 @@ class MISPAttribute(AbstractMISP):
|
|||
"""Set a list of prepared MISPTag."""
|
||||
super()._set_tags(tags)
|
||||
|
||||
def _prepare_data(self, data: Union[Path, str, bytes, BytesIO, None]):
|
||||
if not data:
|
||||
super().__setattr__('data', None)
|
||||
return
|
||||
|
||||
if isinstance(data, BytesIO):
|
||||
super().__setattr__('data', data)
|
||||
elif isinstance(data, Path):
|
||||
with data.open('rb') as f_temp:
|
||||
super().__setattr__('data', BytesIO(f_temp.read()))
|
||||
elif isinstance(data, (str, bytes)):
|
||||
super().__setattr__('data', BytesIO(base64.b64decode(data)))
|
||||
else:
|
||||
raise PyMISPError(f'Invalid type ({type(data)}) for the data key: {data}')
|
||||
|
||||
if self.type == 'malware-sample':
|
||||
try:
|
||||
with ZipFile(self.data) as f:
|
||||
if not self.__is_misp_encrypted_file(f):
|
||||
raise PyMISPError('Not an existing malware sample')
|
||||
for name in f.namelist():
|
||||
if name.endswith('.filename.txt'):
|
||||
with f.open(name, pwd=b'infected') as unpacked:
|
||||
self.malware_filename = unpacked.read().decode().strip()
|
||||
else:
|
||||
with f.open(name, pwd=b'infected') as unpacked:
|
||||
self._malware_binary = BytesIO(unpacked.read())
|
||||
except Exception:
|
||||
# not a encrypted zip file, assuming it is a new malware sample
|
||||
self._prepare_new_malware_sample()
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
if name in ['first_seen', 'last_seen']:
|
||||
value = _make_datetime(value)
|
||||
|
@ -215,7 +246,11 @@ class MISPAttribute(AbstractMISP):
|
|||
raise PyMISPError('last_seen ({value}) has to be after first_seen ({self.first_seen})')
|
||||
if name == 'first_seen' and hasattr(self, 'last_seen') and self.last_seen < value:
|
||||
raise PyMISPError('first_seen ({value}) has to be before last_seen ({self.last_seen})')
|
||||
super().__setattr__(name, value)
|
||||
super().__setattr__(name, value)
|
||||
elif name == 'data':
|
||||
self._prepare_data(value)
|
||||
else:
|
||||
super().__setattr__(name, value)
|
||||
|
||||
def hash_values(self, algorithm: str='sha512') -> List[str]:
|
||||
"""Compute the hash of every values for fast lookups"""
|
||||
|
@ -488,35 +523,6 @@ class MISPAttribute(AbstractMISP):
|
|||
return False
|
||||
return True
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
return self._data if self._data else None
|
||||
|
||||
@data.setter
|
||||
def data(self, data: Union[Path, str, bytes, BytesIO]):
|
||||
if isinstance(data, Path):
|
||||
with data.open('rb') as f_temp:
|
||||
self._data = BytesIO(f_temp.read())
|
||||
if isinstance(data, (str, bytes)):
|
||||
self._data = BytesIO(base64.b64decode(data))
|
||||
elif isinstance(data, BytesIO):
|
||||
self._data = data
|
||||
if self.type == 'malware-sample':
|
||||
try:
|
||||
with ZipFile(self.data) as f:
|
||||
if not self.__is_misp_encrypted_file(f):
|
||||
raise PyMISPError('Not an existing malware sample')
|
||||
for name in f.namelist():
|
||||
if name.endswith('.filename.txt'):
|
||||
with f.open(name, pwd=b'infected') as unpacked:
|
||||
self.malware_filename = unpacked.read().decode().strip()
|
||||
else:
|
||||
with f.open(name, pwd=b'infected') as unpacked:
|
||||
self._malware_binary = BytesIO(unpacked.read())
|
||||
except Exception:
|
||||
# not a encrypted zip file, assuming it is a new malware sample
|
||||
self._prepare_new_malware_sample()
|
||||
|
||||
def __repr__(self):
|
||||
if hasattr(self, 'value'):
|
||||
return '<{self.__class__.__name__}(type={self.type}, value={self.value})'.format(self=self)
|
||||
|
|
|
@ -1953,6 +1953,8 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.assertEqual(e.org.name, 'Test Org - delegate')
|
||||
r = self.delegate_user_misp_connector.delete_event(e)
|
||||
self.assertEqual(r['message'], 'Event deleted.', r)
|
||||
# Change base_event UUID do we can add it
|
||||
base_event.uuid = str(uuid4())
|
||||
e = test_roles_user_connector.add_event(base_event)
|
||||
delegation = test_roles_user_connector.delegate_event(e, self.test_org_delegate)
|
||||
r = test_roles_user_connector.discard_event_delegation(delegation.id)
|
||||
|
|
Loading…
Reference in New Issue