Merge pull request #809 from cvandeplas/feature-feedgenerator-sharinggroups

chg: [feed-generator] support for distribution and sharing groups
pull/810/head
Raphaël Vinot 2021-11-29 07:46:06 -08:00 committed by GitHub
commit 167438ba13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 11 deletions

View File

@ -6,6 +6,10 @@ import json
import os
from pymisp import ExpandedPyMISP
from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels
try:
from settings import with_distribution
except ImportError:
with_distribution = False
try:
from settings import include_deleted
@ -79,7 +83,7 @@ if __name__ == '__main__':
for i, attribute in enumerate(e.attributes):
if attribute.type in exclude_attribute_types:
e.attributes.pop(i)
e_feed = e.to_feed(valid_distributions=valid_attribute_distributions, with_meta=True)
e_feed = e.to_feed(valid_distributions=valid_attribute_distributions, with_meta=True, with_distribution=with_distribution)
except Exception as err:
print(err, event['uuid'])
continue

View File

@ -45,4 +45,9 @@ valid_attribute_distribution_levels = ['0', '1', '2', '3', '4', '5']
# will not be able to get these attributes back unless their events get updated.
# For example:
# exclude_attribute_types = ['malware-sample']
exclude_attribute_types = []
exclude_attribute_types = []
# Include the distribution and sharing group information (and names/UUIDs of organisations in those Sharing Groups)
# Set this to False if you want to discard the distribution metadata. That way all data will inherit the distribution
# the feed
with_distribution = False

View File

@ -126,6 +126,8 @@ class MISPOrganisation(AbstractMISP):
class MISPSharingGroup(AbstractMISP):
_fields_for_feed: set = {'uuid', 'name', 'roaming', 'created', 'organisation_uuid', 'Organisation', 'SharingGroupOrg', 'SharingGroupServer'}
def __init__(self):
super().__init__()
self.name: str
@ -161,6 +163,17 @@ class MISPSharingGroup(AbstractMISP):
return f'<{self.__class__.__name__}(name={self.name})'
return f'<{self.__class__.__name__}(NotInitialized)'
def _to_feed(self) -> Dict:
to_return = super()._to_feed()
to_return['SharingGroupOrg'] = [org._to_feed() for org in self.SharingGroupOrg]
to_return['Organisation'].pop('id', None)
for server in to_return['SharingGroupServer']:
server.pop('id', None)
server.pop('sharing_group_id', None)
server.pop('server_id', None)
server['Server'].pop('id', None)
return to_return
class MISPShadowAttribute(AbstractMISP):
@ -332,12 +345,19 @@ class MISPAttribute(AbstractMISP):
if not hasattr(self, 'timestamp'):
self.timestamp = datetime.timestamp(datetime.now())
def _to_feed(self) -> Dict:
def _to_feed(self, with_distribution=False) -> Dict:
if with_distribution:
self._fields_for_feed.add('distribution')
to_return = super()._to_feed()
if self.data:
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
if self.tags:
to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags]))
if with_distribution:
try:
to_return['SharingGroup'] = self.SharingGroup._to_feed()
except AttributeError:
pass
return to_return
@property
@ -659,9 +679,8 @@ class MISPObjectReference(AbstractMISP):
class MISPObject(AbstractMISP):
_fields_for_feed: set = {'name', 'meta-category', 'description', 'template_uuid',
'template_version', 'uuid', 'timestamp', 'distribution',
'sharing_group_id', 'comment', 'first_seen', 'last_seen',
'deleted'}
'template_version', 'uuid', 'timestamp', 'comment',
'first_seen', 'last_seen', 'deleted'}
def __init__(self, name: str, strict: bool = False, standalone: bool = True, default_attributes_parameters: Dict = {}, **kwargs):
''' Master class representing a generic MISP object
@ -745,10 +764,17 @@ class MISPObject(AbstractMISP):
if not hasattr(self, 'timestamp'):
self.timestamp = datetime.timestamp(datetime.now())
def _to_feed(self) -> Dict:
def _to_feed(self, with_distribution=False) -> Dict:
if with_distribution:
self._fields_for_feed.add('distribution')
to_return = super(MISPObject, self)._to_feed()
if self.references:
to_return['ObjectReference'] = [reference._to_feed() for reference in self.references]
if with_distribution:
try:
to_return['SharingGroup'] = self.SharingGroup._to_feed()
except AttributeError:
pass
return to_return
def __setattr__(self, name, value):
@ -1506,10 +1532,11 @@ class MISPEvent(AbstractMISP):
to_return += attribute.hash_values(algorithm)
return to_return
def to_feed(self, valid_distributions: List[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False) -> Dict:
def to_feed(self, valid_distributions: List[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution=False) -> Dict:
""" Generate a json output for MISP Feed.
:param valid_distributions: only makes sense if the distribution key is set; i.e., the event is exported from a MISP instance.
:param with_distribution: exports distribution and Sharing Group info; otherwise all SharingGroup information is discarded (protecting privacy)
"""
required = ['info', 'Orgc']
for r in required:
@ -1521,6 +1548,9 @@ class MISPEvent(AbstractMISP):
and int(self.distribution) not in valid_distributions):
return {}
if with_distribution:
self._fields_for_feed.add('distribution')
to_return = super()._to_feed()
if with_meta:
to_return['_hashes'] = []
@ -1533,7 +1563,7 @@ class MISPEvent(AbstractMISP):
for attribute in self.attributes:
if (valid_distributions and attribute.get('distribution') is not None and attribute.distribution not in valid_distributions):
continue
to_return['Attribute'].append(attribute._to_feed())
to_return['Attribute'].append(attribute._to_feed(with_distribution=with_distribution))
if with_meta:
to_return['_hashes'] += attribute.hash_values('md5')
@ -1542,16 +1572,24 @@ class MISPEvent(AbstractMISP):
for obj in self.objects:
if (valid_distributions and obj.get('distribution') is not None and obj.distribution not in valid_distributions):
continue
obj_to_attach = obj._to_feed()
if with_distribution:
obj._fields_for_feed.add('distribution')
obj_to_attach = obj._to_feed(with_distribution=with_distribution)
obj_to_attach['Attribute'] = []
for attribute in obj.attributes:
if (valid_distributions and attribute.get('distribution') is not None and attribute.distribution not in valid_distributions):
continue
obj_to_attach['Attribute'].append(attribute._to_feed())
obj_to_attach['Attribute'].append(attribute._to_feed(with_distribution=with_distribution))
if with_meta:
to_return['_hashes'] += attribute.hash_values('md5')
to_return['Object'].append(obj_to_attach)
if with_distribution:
try:
to_return['SharingGroup'] = self.SharingGroup._to_feed()
except AttributeError:
pass
return {'Event': to_return}
@property