mirror of https://github.com/MISP/PyMISP
chg: Better handling of sightings.
parent
eba8b6df24
commit
e95948bcf6
|
@ -549,10 +549,8 @@ class ExpandedPyMISP(PyMISP):
|
|||
to_return.append(s)
|
||||
return to_return
|
||||
|
||||
def add_sighting(self, sighting: MISPSighting, attribute: Union[MISPAttribute, int, str, UUID]=None):
|
||||
def add_sighting(self, sighting: MISPSighting, attribute: Union[MISPAttribute, int, str, UUID]=None, pythonify: bool=False):
|
||||
'''Add a new sighting (globally, or to a specific attribute)'''
|
||||
# FIXME: no pythonify possible: https://github.com/MISP/MISP/issues/4867
|
||||
pythonify = False
|
||||
if attribute:
|
||||
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
|
||||
new_sighting = self._prepare_request('POST', f'sightings/add/{attribute_id}', data=sighting)
|
||||
|
|
|
@ -122,6 +122,7 @@ class MISPAttribute(AbstractMISP):
|
|||
self.__strict = strict
|
||||
self.uuid = str(uuid.uuid4())
|
||||
self.ShadowAttribute = []
|
||||
self.Sighting = []
|
||||
|
||||
@property
|
||||
def known_types(self):
|
||||
|
@ -147,6 +148,18 @@ class MISPAttribute(AbstractMISP):
|
|||
else:
|
||||
raise PyMISPError('All the attributes have to be of type MISPShadowAttribute.')
|
||||
|
||||
@property
|
||||
def sightings(self):
|
||||
return self.Sighting
|
||||
|
||||
@sightings.setter
|
||||
def sightings(self, sightings):
|
||||
"""Set a list of prepared MISPShadowAttribute."""
|
||||
if all(isinstance(x, MISPSighting) for x in sightings):
|
||||
self.Sighting = sightings
|
||||
else:
|
||||
raise PyMISPError('All the attributes have to be of type MISPSighting.')
|
||||
|
||||
def delete(self):
|
||||
"""Mark the attribute as deleted (soft delete)"""
|
||||
self.deleted = True
|
||||
|
@ -156,7 +169,7 @@ class MISPAttribute(AbstractMISP):
|
|||
return self.add_shadow_attribute(shadow_attribute, **kwargs)
|
||||
|
||||
def add_shadow_attribute(self, shadow_attribute=None, **kwargs):
|
||||
"""Add a tag to the attribute (by name or a MISPTag object)"""
|
||||
"""Add a shadow attribute to the attribute (by name or a MISPShadowAttribute object)"""
|
||||
if isinstance(shadow_attribute, MISPShadowAttribute):
|
||||
misp_shadow_attribute = shadow_attribute
|
||||
elif isinstance(shadow_attribute, dict):
|
||||
|
@ -171,6 +184,22 @@ class MISPAttribute(AbstractMISP):
|
|||
self.edited = True
|
||||
return misp_shadow_attribute
|
||||
|
||||
def add_sighting(self, sighting=None, **kwargs):
|
||||
"""Add a sighting to the attribute (by name or a MISPSighting object)"""
|
||||
if isinstance(sighting, MISPSighting):
|
||||
misp_sighting = sighting
|
||||
elif isinstance(sighting, dict):
|
||||
misp_sighting = MISPSighting()
|
||||
misp_sighting.from_dict(**sighting)
|
||||
elif kwargs:
|
||||
misp_sighting = MISPSighting()
|
||||
misp_sighting.from_dict(**kwargs)
|
||||
else:
|
||||
raise PyMISPError("The sighting is in an invalid format (can be either string, MISPShadowAttribute, or an expanded dict): {}".format(sighting))
|
||||
self.sightings.append(misp_sighting)
|
||||
self.edited = True
|
||||
return misp_sighting
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if kwargs.get('Attribute'):
|
||||
kwargs = kwargs.get('Attribute')
|
||||
|
@ -249,6 +278,9 @@ class MISPAttribute(AbstractMISP):
|
|||
if kwargs.get('Tag'):
|
||||
for tag in kwargs.pop('Tag'):
|
||||
self.add_tag(tag)
|
||||
if kwargs.get('Sighting'):
|
||||
for sighting in kwargs.pop('Sighting'):
|
||||
self.add_sighting(sighting)
|
||||
if kwargs.get('ShadowAttribute'):
|
||||
for s_attr in kwargs.pop('ShadowAttribute'):
|
||||
self.add_shadow_attribute(s_attr)
|
||||
|
@ -1004,9 +1036,9 @@ class MISPSighting(AbstractMISP):
|
|||
if hasattr(self, 'value'):
|
||||
return '<{self.__class__.__name__}(value={self.value})'.format(self=self)
|
||||
if hasattr(self, 'id'):
|
||||
return '<{self.__class__.__name__}(value={self.id})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(id={self.id})'.format(self=self)
|
||||
if hasattr(self, 'uuid'):
|
||||
return '<{self.__class__.__name__}(value={self.uuid})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(uuid={self.uuid})'.format(self=self)
|
||||
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
|
||||
|
||||
|
||||
|
|
|
@ -42,6 +42,8 @@ except ImportError as e:
|
|||
|
||||
urllib3.disable_warnings()
|
||||
|
||||
fast_mode = True
|
||||
|
||||
|
||||
class TestComprehensive(unittest.TestCase):
|
||||
|
||||
|
@ -50,12 +52,17 @@ class TestComprehensive(unittest.TestCase):
|
|||
cls.maxDiff = None
|
||||
# Connect as admin
|
||||
cls.admin_misp_connector = ExpandedPyMISP(url, key, verifycert, debug=False)
|
||||
r = cls.admin_misp_connector.update_misp()
|
||||
print(r)
|
||||
if not fast_mode:
|
||||
r = cls.admin_misp_connector.update_misp()
|
||||
print(r)
|
||||
# Creates an org
|
||||
organisation = MISPOrganisation()
|
||||
organisation.name = 'Test Org'
|
||||
cls.test_org = cls.admin_misp_connector.add_organisation(organisation, pythonify=True)
|
||||
# Create an org to delegate to
|
||||
organisation = MISPOrganisation()
|
||||
organisation.name = 'Test Org - delegate'
|
||||
cls.test_org_delegate = cls.admin_misp_connector.add_organisation(organisation, pythonify=True)
|
||||
# Set the refault role (id 3 on the VM)
|
||||
cls.admin_misp_connector.set_default_role(3)
|
||||
# Creates a user
|
||||
|
@ -72,12 +79,13 @@ class TestComprehensive(unittest.TestCase):
|
|||
user.role_id = 4
|
||||
cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True)
|
||||
cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey, verifycert)
|
||||
# Update all json stuff
|
||||
cls.admin_misp_connector.update_object_templates()
|
||||
cls.admin_misp_connector.update_galaxies()
|
||||
cls.admin_misp_connector.update_noticelists()
|
||||
cls.admin_misp_connector.update_warninglists()
|
||||
cls.admin_misp_connector.update_taxonomies()
|
||||
if not fast_mode:
|
||||
# Update all json stuff
|
||||
cls.admin_misp_connector.update_object_templates()
|
||||
cls.admin_misp_connector.update_galaxies()
|
||||
cls.admin_misp_connector.update_noticelists()
|
||||
cls.admin_misp_connector.update_warninglists()
|
||||
cls.admin_misp_connector.update_taxonomies()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
|
@ -87,6 +95,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
cls.admin_misp_connector.delete_user(cls.test_usr)
|
||||
# Delete org
|
||||
cls.admin_misp_connector.delete_organisation(cls.test_org)
|
||||
cls.admin_misp_connector.delete_organisation(cls.test_org_delegate)
|
||||
|
||||
def create_simple_event(self, force_timestamps=False):
|
||||
mispevent = MISPEvent(force_timestamps=force_timestamps)
|
||||
|
@ -809,19 +818,15 @@ class TestComprehensive(unittest.TestCase):
|
|||
second = self.user_misp_connector.add_event(second)
|
||||
|
||||
current_ts = int(time.time())
|
||||
# NOTE: no pythonify available yet
|
||||
# r = self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
|
||||
r = self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
|
||||
self.assertEqual(r['message'], 'Sighting added')
|
||||
self.assertEqual(int(r.attribute_id), first.attributes[0].id)
|
||||
|
||||
s = MISPSighting()
|
||||
s.value = second.attributes[0].value
|
||||
s.source = 'Testcases'
|
||||
s.type = '1'
|
||||
# NOTE: no pythonify available yet
|
||||
# r = self.user_misp_connector.add_sighting(s, second.attributes[0])
|
||||
r = self.user_misp_connector.add_sighting(s, second.attributes[0])
|
||||
self.assertEqual(r['message'], 'Sighting added')
|
||||
self.assertEqual(r.source, 'Testcases')
|
||||
|
||||
s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts, include_attribute=True,
|
||||
include_event_meta=True, pythonify=True)
|
||||
|
@ -864,10 +869,7 @@ class TestComprehensive(unittest.TestCase):
|
|||
self.assertTrue(isinstance(s, list))
|
||||
self.assertEqual(int(s[0].attribute_id), first.attributes[0].id)
|
||||
|
||||
# NOTE: no pythonify available yet
|
||||
# r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id, pythonify=True)
|
||||
r = self.admin_misp_connector.add_sighting(s, second.attributes[0])
|
||||
self.assertEqual(r['message'], 'Sighting added')
|
||||
self.admin_misp_connector.add_sighting(s, second.attributes[0])
|
||||
s = self.user_misp_connector.sightings(second.attributes[0])
|
||||
self.assertEqual(len(s), 2)
|
||||
s = self.user_misp_connector.sightings(second.attributes[0], self.test_org)
|
||||
|
|
Loading…
Reference in New Issue