mirror of https://github.com/MISP/PyMISP
chg: [feed-generator] support for distribution and sharing groups
parent
8b66d5f753
commit
a9970d3078
|
@ -6,6 +6,10 @@ import json
|
|||
import os
|
||||
from pymisp import ExpandedPyMISP
|
||||
from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels
|
||||
try:
|
||||
from settings import with_distribution
|
||||
except:
|
||||
with_distribution = False
|
||||
|
||||
try:
|
||||
from settings import include_deleted
|
||||
|
@ -79,7 +83,7 @@ if __name__ == '__main__':
|
|||
for i, attribute in enumerate(e.attributes):
|
||||
if attribute.type in exclude_attribute_types:
|
||||
e.attributes.pop(i)
|
||||
e_feed = e.to_feed(valid_distributions=valid_attribute_distributions, with_meta=True)
|
||||
e_feed = e.to_feed(valid_distributions=valid_attribute_distributions, with_meta=True, with_distribution=with_distribution)
|
||||
except Exception as err:
|
||||
print(err, event['uuid'])
|
||||
continue
|
||||
|
|
|
@ -45,4 +45,9 @@ valid_attribute_distribution_levels = ['0', '1', '2', '3', '4', '5']
|
|||
# will not be able to get these attributes back unless their events get updated.
|
||||
# For example:
|
||||
# exclude_attribute_types = ['malware-sample']
|
||||
exclude_attribute_types = []
|
||||
exclude_attribute_types = []
|
||||
|
||||
# Include the distribution and sharing group information (and names/UUIDs of organisations in those Sharing Groups)
|
||||
# Set this to False if you want to discard the distribution metadata. That way all data will inherit the distribution
|
||||
# the feed
|
||||
with_distribution = False
|
|
@ -126,6 +126,8 @@ class MISPOrganisation(AbstractMISP):
|
|||
|
||||
|
||||
class MISPSharingGroup(AbstractMISP):
|
||||
_fields_for_feed: set = {'uuid', 'name', 'roaming', 'created', 'organisation_uuid', 'Organisation', 'SharingGroupOrg', 'SharingGroupServer'}
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.name: str
|
||||
|
@ -161,6 +163,16 @@ class MISPSharingGroup(AbstractMISP):
|
|||
return f'<{self.__class__.__name__}(name={self.name})'
|
||||
return f'<{self.__class__.__name__}(NotInitialized)'
|
||||
|
||||
def _to_feed(self) -> Dict:
|
||||
to_return = super()._to_feed()
|
||||
to_return['SharingGroupOrg'] = [org._to_feed() for org in self.SharingGroupOrg]
|
||||
to_return['Organisation'].pop('id', None)
|
||||
for server in to_return['SharingGroupServer']:
|
||||
server.pop('id', None)
|
||||
server.pop('sharing_group_id', None)
|
||||
server.pop('server_id', None)
|
||||
server['Server'].pop('id', None)
|
||||
return to_return
|
||||
|
||||
class MISPShadowAttribute(AbstractMISP):
|
||||
|
||||
|
@ -332,12 +344,19 @@ class MISPAttribute(AbstractMISP):
|
|||
if not hasattr(self, 'timestamp'):
|
||||
self.timestamp = datetime.timestamp(datetime.now())
|
||||
|
||||
def _to_feed(self) -> Dict:
|
||||
def _to_feed(self, with_distribution = True) -> Dict:
|
||||
if with_distribution:
|
||||
self._fields_for_feed.add('distribution')
|
||||
to_return = super()._to_feed()
|
||||
if self.data:
|
||||
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
||||
if self.tags:
|
||||
to_return['Tag'] = list(filter(None, [tag._to_feed() for tag in self.tags]))
|
||||
if with_distribution:
|
||||
try:
|
||||
to_return['SharingGroup'] = self.SharingGroup._to_feed()
|
||||
except AttributeError:
|
||||
pass
|
||||
return to_return
|
||||
|
||||
@property
|
||||
|
@ -659,9 +678,8 @@ class MISPObjectReference(AbstractMISP):
|
|||
class MISPObject(AbstractMISP):
|
||||
|
||||
_fields_for_feed: set = {'name', 'meta-category', 'description', 'template_uuid',
|
||||
'template_version', 'uuid', 'timestamp', 'distribution',
|
||||
'sharing_group_id', 'comment', 'first_seen', 'last_seen',
|
||||
'deleted'}
|
||||
'template_version', 'uuid', 'timestamp', 'comment',
|
||||
'first_seen', 'last_seen', 'deleted'}
|
||||
|
||||
def __init__(self, name: str, strict: bool = False, standalone: bool = True, default_attributes_parameters: Dict = {}, **kwargs):
|
||||
''' Master class representing a generic MISP object
|
||||
|
@ -745,10 +763,17 @@ class MISPObject(AbstractMISP):
|
|||
if not hasattr(self, 'timestamp'):
|
||||
self.timestamp = datetime.timestamp(datetime.now())
|
||||
|
||||
def _to_feed(self) -> Dict:
|
||||
def _to_feed(self, with_distribution = True) -> Dict:
|
||||
if with_distribution:
|
||||
self._fields_for_feed.add('distribution')
|
||||
to_return = super(MISPObject, self)._to_feed()
|
||||
if self.references:
|
||||
to_return['ObjectReference'] = [reference._to_feed() for reference in self.references]
|
||||
if with_distribution:
|
||||
try:
|
||||
to_return['SharingGroup'] = self.SharingGroup._to_feed()
|
||||
except AttributeError:
|
||||
pass
|
||||
return to_return
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
|
@ -1506,10 +1531,11 @@ class MISPEvent(AbstractMISP):
|
|||
to_return += attribute.hash_values(algorithm)
|
||||
return to_return
|
||||
|
||||
def to_feed(self, valid_distributions: List[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False) -> Dict:
|
||||
def to_feed(self, valid_distributions: List[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution = True) -> Dict:
|
||||
""" Generate a json output for MISP Feed.
|
||||
|
||||
:param valid_distributions: only makes sense if the distribution key is set; i.e., the event is exported from a MISP instance.
|
||||
:param with_distribution: exports distribution and Sharing Group info; otherwise all SharingGroup information is discarded (protecting privacy)
|
||||
"""
|
||||
required = ['info', 'Orgc']
|
||||
for r in required:
|
||||
|
@ -1521,6 +1547,9 @@ class MISPEvent(AbstractMISP):
|
|||
and int(self.distribution) not in valid_distributions):
|
||||
return {}
|
||||
|
||||
if with_distribution:
|
||||
self._fields_for_feed.add('distribution')
|
||||
|
||||
to_return = super()._to_feed()
|
||||
if with_meta:
|
||||
to_return['_hashes'] = []
|
||||
|
@ -1533,7 +1562,7 @@ class MISPEvent(AbstractMISP):
|
|||
for attribute in self.attributes:
|
||||
if (valid_distributions and attribute.get('distribution') is not None and attribute.distribution not in valid_distributions):
|
||||
continue
|
||||
to_return['Attribute'].append(attribute._to_feed())
|
||||
to_return['Attribute'].append(attribute._to_feed(with_distribution=with_distribution))
|
||||
if with_meta:
|
||||
to_return['_hashes'] += attribute.hash_values('md5')
|
||||
|
||||
|
@ -1542,16 +1571,24 @@ class MISPEvent(AbstractMISP):
|
|||
for obj in self.objects:
|
||||
if (valid_distributions and obj.get('distribution') is not None and obj.distribution not in valid_distributions):
|
||||
continue
|
||||
obj_to_attach = obj._to_feed()
|
||||
if with_distribution:
|
||||
obj._fields_for_feed.add('distribution')
|
||||
obj_to_attach = obj._to_feed(with_distribution=with_distribution)
|
||||
obj_to_attach['Attribute'] = []
|
||||
for attribute in obj.attributes:
|
||||
if (valid_distributions and attribute.get('distribution') is not None and attribute.distribution not in valid_distributions):
|
||||
continue
|
||||
obj_to_attach['Attribute'].append(attribute._to_feed())
|
||||
obj_to_attach['Attribute'].append(attribute._to_feed(with_distribution=with_distribution))
|
||||
if with_meta:
|
||||
to_return['_hashes'] += attribute.hash_values('md5')
|
||||
to_return['Object'].append(obj_to_attach)
|
||||
|
||||
if with_distribution:
|
||||
try:
|
||||
to_return['SharingGroup'] = self.SharingGroup._to_feed()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
return {'Event': to_return}
|
||||
|
||||
@property
|
||||
|
|
Loading…
Reference in New Issue