mirror of https://github.com/MISP/PyMISP
More cleanup
parent
e035922949
commit
6482a21834
|
@ -23,9 +23,10 @@ except ImportError:
|
|||
HAVE_REQUESTS = False
|
||||
|
||||
from . import __version__
|
||||
from .exceptions import PyMISPError, NewAttributeError, SearchError, MissingDependency, NoURL, NoKey
|
||||
from .exceptions import PyMISPError, SearchError, MissingDependency, NoURL, NoKey
|
||||
from .mispevent import MISPEvent, MISPAttribute, EncodeUpdate
|
||||
|
||||
|
||||
# Least dirty way to support python 2 and 3
|
||||
try:
|
||||
basestring
|
||||
|
@ -297,10 +298,10 @@ class PyMISP(object):
|
|||
return json.dumps(misp_event, cls=EncodeUpdate)
|
||||
|
||||
def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=5):
|
||||
misp_attribute = MISPAttribute(self.categories, self.types, self.category_type_mapping)
|
||||
misp_attribute = MISPAttribute(self.describe_types['result'])
|
||||
misp_attribute.set_all_values(type=type_value, value=value, category=category,
|
||||
to_ids=to_ids, comment=comment, distribution=distribution)
|
||||
return json.dumps(misp_attribute, cls=EncodeUpdate)
|
||||
return misp_attribute
|
||||
|
||||
def _one_or_more(self, value):
|
||||
"""Returns a list/tuple of one or more items, regardless of input."""
|
||||
|
@ -356,19 +357,14 @@ class PyMISP(object):
|
|||
else:
|
||||
e = MISPEvent(self.describe_types['result'])
|
||||
e.load(event)
|
||||
for a in e.attributes:
|
||||
if a.get('distribution') is None:
|
||||
a['distribution'] = 5
|
||||
e.attributes += attributes
|
||||
response = self.update_event(event['Event']['id'], json.dumps(e, cls=EncodeUpdate))
|
||||
return response
|
||||
|
||||
def add_named_attribute(self, event, category, type_value, value, to_ids=False, comment=None, distribution=None, proposal=False):
|
||||
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False):
|
||||
attributes = []
|
||||
if value and category and type:
|
||||
try:
|
||||
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
|
||||
except NewAttributeError as e:
|
||||
return e
|
||||
for value in self._one_or_more(value):
|
||||
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
|
||||
return self._send_attributes(event, attributes, proposal)
|
||||
|
||||
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, ssdeep=None, comment=None, to_ids=True, distribution=None, proposal=False):
|
||||
|
@ -632,8 +628,8 @@ class PyMISP(object):
|
|||
# ######### Upload samples through the API #########
|
||||
# ##################################################
|
||||
|
||||
def prepare_attribute(self, event_id, distribution, to_ids, category, comment, info,
|
||||
analysis, threat_level_id):
|
||||
def _prepare_upload(self, event_id, distribution, to_ids, category, comment, info,
|
||||
analysis, threat_level_id):
|
||||
to_post = {'request': {}}
|
||||
|
||||
if event_id is not None:
|
||||
|
@ -647,12 +643,13 @@ class PyMISP(object):
|
|||
else:
|
||||
to_post['request']['event_id'] = int(event_id)
|
||||
|
||||
if to_ids not in [True, False]:
|
||||
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(to_ids))
|
||||
default_values = self.sane_default['malware-sample']
|
||||
if to_ids is None or not isinstance(to_ids, bool):
|
||||
to_ids = bool(int(default_values['to_ids']))
|
||||
to_post['request']['to_ids'] = to_ids
|
||||
|
||||
if category not in self.categories:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(self.categories))))
|
||||
if category is None or category not in self.categories:
|
||||
category = default_values['default_category']
|
||||
to_post['request']['category'] = category
|
||||
|
||||
to_post['request']['comment'] = comment
|
||||
|
@ -665,16 +662,16 @@ class PyMISP(object):
|
|||
def upload_sample(self, filename, filepath, event_id, distribution=None,
|
||||
to_ids=True, category=None, comment=None, info=None,
|
||||
analysis=None, threat_level_id=None):
|
||||
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
|
||||
comment, info, analysis, threat_level_id)
|
||||
to_post = self._prepare_upload(event_id, distribution, to_ids, category,
|
||||
comment, info, analysis, threat_level_id)
|
||||
to_post['request']['files'] = [{'filename': filename, 'data': self._encode_file_to_upload(filepath)}]
|
||||
return self._upload_sample(to_post)
|
||||
|
||||
def upload_samplelist(self, filepaths, event_id, distribution=None,
|
||||
to_ids=True, category=None, info=None,
|
||||
analysis=None, threat_level_id=None):
|
||||
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
|
||||
info, analysis, threat_level_id)
|
||||
to_post = self._prepare_upload(event_id, distribution, to_ids, category,
|
||||
info, analysis, threat_level_id)
|
||||
files = []
|
||||
for path in filepaths:
|
||||
if not os.path.isfile(path):
|
||||
|
@ -694,18 +691,14 @@ class PyMISP(object):
|
|||
# ############################
|
||||
|
||||
def __query_proposal(self, session, path, id, attribute=None):
|
||||
path = path.strip('/')
|
||||
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
|
||||
query = None
|
||||
if path in ['add', 'edit']:
|
||||
query = {'request': {'ShadowAttribute': attribute}}
|
||||
if path == 'view':
|
||||
response = session.post(url, data=json.dumps(query))
|
||||
elif path == 'view':
|
||||
response = session.get(url)
|
||||
else:
|
||||
if query is not None:
|
||||
response = session.post(url, data=json.dumps(query))
|
||||
else:
|
||||
response = session.post(url)
|
||||
else: # accept or discard
|
||||
response = session.post(url)
|
||||
return self._check_response(response)
|
||||
|
||||
def proposal_view(self, event_id=None, proposal_id=None):
|
||||
|
|
|
@ -21,10 +21,11 @@ from .exceptions import PyMISPError, NewEventError, NewAttributeError
|
|||
|
||||
class MISPAttribute(object):
|
||||
|
||||
def __init__(self, categories, types, category_type_mapping):
|
||||
self.categories = categories
|
||||
self.types = types
|
||||
self.category_type_mapping = category_type_mapping
|
||||
def __init__(self, describe_types):
|
||||
self.categories = describe_types['categories']
|
||||
self.types = describe_types['types']
|
||||
self.category_type_mapping = describe_types['category_type_mappings']
|
||||
self.sane_default = describe_types['sane_defaults']
|
||||
self._reinitialize_attribute()
|
||||
|
||||
def _reinitialize_attribute(self):
|
||||
|
@ -46,44 +47,59 @@ class MISPAttribute(object):
|
|||
self.ShadowAttribute = []
|
||||
|
||||
def set_all_values(self, **kwargs):
|
||||
if kwargs.get('type') and kwargs.get('category'):
|
||||
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
|
||||
raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.capitalizeformat(self.type, self.category, (', '.join(self.category_type_mapping[self.category]))))
|
||||
# Required
|
||||
if kwargs.get('type'):
|
||||
self.type = kwargs['type']
|
||||
if self.type not in self.types:
|
||||
raise NewAttributeError('{} is invalid, type has to be in {}'.format(self.type, (', '.join(self.types))))
|
||||
else:
|
||||
raise NewAttributeError('The type of the attribute is required.')
|
||||
|
||||
type_defaults = self.sane_default[self.type]
|
||||
|
||||
if kwargs.get('value'):
|
||||
self.value = kwargs['value']
|
||||
else:
|
||||
raise NewAttributeError('The value of the attribute is required.')
|
||||
|
||||
# Default values
|
||||
if kwargs.get('category', None):
|
||||
if kwargs.get('category'):
|
||||
self.category = kwargs['category']
|
||||
if self.category not in self.categories:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(self.category, (', '.join(self.categories))))
|
||||
if kwargs.get('type', None):
|
||||
self.type = kwargs['type']
|
||||
if self.type not in self.types:
|
||||
raise NewAttributeError('{} is invalid, type_value has to be in {}'.format(self.type, (', '.join(self.types))))
|
||||
if self.type not in self.category_type_mapping[self.category]:
|
||||
raise NewAttributeError('{} and {} is an invalid combinaison, type_value for this category has to be in {}'.capitalizeformat(self.type, self.category, (', '.join(self.category_type_mapping[self.category]))))
|
||||
if kwargs.get('value', None):
|
||||
self.value = kwargs['value']
|
||||
if kwargs.get('to_ids', None):
|
||||
else:
|
||||
self.category = type_defaults['default_category']
|
||||
|
||||
if kwargs.get('to_ids'):
|
||||
self.to_ids = kwargs['to_ids']
|
||||
if not isinstance(self.to_ids, bool):
|
||||
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(self.to_ids))
|
||||
if kwargs.get('comment', None):
|
||||
else:
|
||||
self.to_ids = bool(int(type_defaults['to_ids']))
|
||||
if kwargs.get('comment'):
|
||||
self.comment = kwargs['comment']
|
||||
if kwargs.get('distribution', None):
|
||||
if kwargs.get('distribution'):
|
||||
self.distribution = int(kwargs['distribution'])
|
||||
if self.distribution not in [0, 1, 2, 3, 5]:
|
||||
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 5'.format(self.distribution))
|
||||
|
||||
# other possible values
|
||||
if kwargs.get('id', None):
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs['id'])
|
||||
if kwargs.get('uuid', None):
|
||||
if kwargs.get('uuid'):
|
||||
self.uuid = kwargs['uuid']
|
||||
if kwargs.get('timestamp', None):
|
||||
if kwargs.get('timestamp'):
|
||||
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs['timestamp']))
|
||||
if kwargs.get('sharing_group_id', None):
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs['sharing_group_id'])
|
||||
if kwargs.get('deleted', None):
|
||||
if kwargs.get('deleted'):
|
||||
self.deleted = kwargs['deleted']
|
||||
if kwargs.get('SharingGroup', None):
|
||||
if kwargs.get('SharingGroup'):
|
||||
self.SharingGroup = kwargs['SharingGroup']
|
||||
if kwargs.get('ShadowAttribute', None):
|
||||
if kwargs.get('ShadowAttribute'):
|
||||
self.ShadowAttribute = kwargs['ShadowAttribute']
|
||||
|
||||
def _json(self):
|
||||
|
@ -136,6 +152,7 @@ class MISPEvent(object):
|
|||
if not describe_types:
|
||||
t = json.load(open(os.path.join(self.ressources_path, 'describeTypes.json'), 'r'))
|
||||
describe_types = t['result']
|
||||
self.describe_types = describe_types
|
||||
self.categories = describe_types['categories']
|
||||
self.types = describe_types['types']
|
||||
self.category_type_mapping = describe_types['category_type_mappings']
|
||||
|
@ -150,7 +167,7 @@ class MISPEvent(object):
|
|||
self.distribution = 3
|
||||
self.threat_level_id = 2
|
||||
self.analysis = 0
|
||||
self.info = ''
|
||||
self.info = None
|
||||
self.published = False
|
||||
self.date = datetime.date.today()
|
||||
self.attributes = []
|
||||
|
@ -191,24 +208,28 @@ class MISPEvent(object):
|
|||
self.set_all_values(**e)
|
||||
|
||||
def set_all_values(self, **kwargs):
|
||||
# Required value
|
||||
if kwargs.get('info'):
|
||||
self.info = kwargs['info']
|
||||
else:
|
||||
raise NewAttributeError('The info field of the new event is required.')
|
||||
|
||||
# Default values for a valid event to send to a MISP instance
|
||||
if kwargs.get('distribution', None) is not None:
|
||||
if kwargs.get('distribution') is not None:
|
||||
self.distribution = int(kwargs['distribution'])
|
||||
if self.distribution not in [0, 1, 2, 3]:
|
||||
raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3'.format(self.distribution))
|
||||
if kwargs.get('threat_level_id', None) is not None:
|
||||
if kwargs.get('threat_level_id') is not None:
|
||||
self.threat_level_id = int(kwargs['threat_level_id'])
|
||||
if self.threat_level_id not in [1, 2, 3, 4]:
|
||||
raise NewEventError('{} is invalid, the threat_level has to be in 1, 2, 3, 4'.format(self.threat_level_id))
|
||||
if kwargs.get('analysis', None) is not None:
|
||||
if kwargs.get('analysis') is not None:
|
||||
self.analysis = int(kwargs['analysis'])
|
||||
if self.analysis not in [0, 1, 2]:
|
||||
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(self.analysis))
|
||||
if kwargs.get('info', None):
|
||||
self.info = kwargs['info']
|
||||
if kwargs.get('published', None) is not None:
|
||||
if kwargs.get('published') is not None:
|
||||
self.publish()
|
||||
if kwargs.get('date', None):
|
||||
if kwargs.get('date'):
|
||||
if isinstance(kwargs['date'], str):
|
||||
self.date = parse(kwargs['date'])
|
||||
elif isinstance(kwargs['date'], datetime.datetime):
|
||||
|
@ -217,42 +238,42 @@ class MISPEvent(object):
|
|||
self.date = kwargs['date']
|
||||
else:
|
||||
raise NewEventError('Invalid format for the date: {} - {}'.format(kwargs['date'], type(kwargs['date'])))
|
||||
if kwargs.get('Attribute', None):
|
||||
if kwargs.get('Attribute'):
|
||||
for a in kwargs['Attribute']:
|
||||
attribute = MISPAttribute(self.categories, self.types, self.category_type_mapping)
|
||||
attribute = MISPAttribute(self.describe_types)
|
||||
attribute.set_all_values(**a)
|
||||
self.attributes.append(attribute)
|
||||
|
||||
# All other keys
|
||||
if kwargs.get('id', None):
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs['id'])
|
||||
if kwargs.get('orgc_id', None):
|
||||
if kwargs.get('orgc_id'):
|
||||
self.orgc_id = int(kwargs['orgc_id'])
|
||||
if kwargs.get('org_id', None):
|
||||
if kwargs.get('org_id'):
|
||||
self.org_id = int(kwargs['org_id'])
|
||||
if kwargs.get('uuid', None):
|
||||
if kwargs.get('uuid'):
|
||||
self.uuid = kwargs['uuid']
|
||||
if kwargs.get('attribute_count', None):
|
||||
if kwargs.get('attribute_count'):
|
||||
self.attribute_count = int(kwargs['attribute_count'])
|
||||
if kwargs.get('timestamp', None):
|
||||
if kwargs.get('timestamp'):
|
||||
self.timestamp = datetime.datetime.fromtimestamp(int(kwargs['timestamp']))
|
||||
if kwargs.get('proposal_email_lock', None):
|
||||
if kwargs.get('proposal_email_lock'):
|
||||
self.proposal_email_lock = kwargs['proposal_email_lock']
|
||||
if kwargs.get('locked', None):
|
||||
if kwargs.get('locked'):
|
||||
self.locked = kwargs['locked']
|
||||
if kwargs.get('publish_timestamp', None):
|
||||
if kwargs.get('publish_timestamp'):
|
||||
self.publish_timestamp = datetime.datetime.fromtimestamp(int(kwargs['publish_timestamp']))
|
||||
if kwargs.get('sharing_group_id', None):
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs['sharing_group_id'])
|
||||
if kwargs.get('Org', None):
|
||||
if kwargs.get('Org'):
|
||||
self.Org = kwargs['Org']
|
||||
if kwargs.get('Orgc', None):
|
||||
if kwargs.get('Orgc'):
|
||||
self.Orgc = kwargs['Orgc']
|
||||
if kwargs.get('ShadowAttribute', None):
|
||||
if kwargs.get('ShadowAttribute'):
|
||||
self.ShadowAttribute = kwargs['ShadowAttribute']
|
||||
if kwargs.get('RelatedEvent', None):
|
||||
if kwargs.get('RelatedEvent'):
|
||||
self.RelatedEvent = kwargs['RelatedEvent']
|
||||
if kwargs.get('Tag', None):
|
||||
if kwargs.get('Tag'):
|
||||
self.Tag = kwargs['Tag']
|
||||
|
||||
def _json(self):
|
||||
|
@ -309,27 +330,7 @@ class MISPEvent(object):
|
|||
def unpublish(self):
|
||||
self.published = False
|
||||
|
||||
def add_attribute(self, type_value, value, **kwargs):
|
||||
if not self.sane_default.get(type_value):
|
||||
raise NewAttributeError("{} is an invalid type. Can only be one of the following: {}".format(type_value, ', '.join(self.types)))
|
||||
defaults = self.sane_default[type_value]
|
||||
if kwargs.get('category'):
|
||||
category = kwargs.get('category')
|
||||
else:
|
||||
category = defaults['default_category']
|
||||
if kwargs.get('to_ids'):
|
||||
to_ids = bool(int(kwargs.get('to_ids')))
|
||||
else:
|
||||
to_ids = bool(int(defaults['to_ids']))
|
||||
if kwargs.get('comment'):
|
||||
comment = kwargs.get('comment')
|
||||
else:
|
||||
comment = None
|
||||
if kwargs.get('distribution'):
|
||||
distribution = int(kwargs.get('distribution'))
|
||||
else:
|
||||
distribution = 5
|
||||
attribute = MISPAttribute(self.categories, self.types, self.category_type_mapping)
|
||||
attribute.set_all_values(type=type_value, value=value, category=category,
|
||||
to_ids=to_ids, comment=comment, distribution=distribution)
|
||||
def add_attribute(self, type, value, **kwargs):
|
||||
attribute = MISPAttribute(self.describe_types)
|
||||
attribute.set_all_values(type=type, value=value, **kwargs)
|
||||
self.attributes.append(attribute)
|
||||
|
|
|
@ -111,7 +111,8 @@ class TestOffline(unittest.TestCase):
|
|||
self.initURI(m)
|
||||
pymisp = PyMISP(self.domain, self.key)
|
||||
m.register_uri('POST', self.domain + 'events', json=error_empty_info)
|
||||
response = pymisp.new_event(0, 1, 0)
|
||||
# TODO Add test exception if info field isn't set
|
||||
response = pymisp.new_event(0, 1, 0, 'Foo')
|
||||
self.assertEqual(response, error_empty_info_flatten)
|
||||
m.register_uri('POST', self.domain + 'events', json=self.new_misp_event)
|
||||
response = pymisp.new_event(0, 1, 0, "This is a test.", '2016-08-26', False)
|
||||
|
|
Loading…
Reference in New Issue