Merge pull request #26 from jbremer/master

Various improvements
pull/30/head
Raphaël Vinot 2016-08-17 14:34:40 +02:00 committed by GitHub
commit c05254a841
2 changed files with 124 additions and 40 deletions

View File

@ -33,6 +33,29 @@ except NameError:
basestring = str
class distributions(object):
"""Enumeration of the available distributions."""
your_organization = 0
this_community = 1
connected_communities = 2
all_communities = 3
class threat_level(object):
"""Enumeration of the available threat levels."""
high = 1
medium = 2
low = 3
undefined = 4
class analysis(object):
"""Enumeration of the available analysis statuses."""
initial = 0
ongoing = 1
completed = 2
class PyMISPError(Exception):
def __init__(self, message):
super(PyMISPError, self).__init__(message)
@ -95,6 +118,11 @@ class PyMISP(object):
:param proxies: Proxy dict as describes here: http://docs.python-requests.org/en/master/user/advanced/#proxies
"""
# So it can may be accessed from the misp object.
distributions = distributions
threat_level = threat_level
analysis = analysis
def __init__(self, url, key, ssl=True, out_type='json', debug=False, proxies=None):
if not url:
raise NoURL('Please provide the URL of your MISP instance.')
@ -349,6 +377,10 @@ class PyMISP(object):
event['Event']['id'] = int(event['Event']['id'])
return event
def _one_or_more(self, value):
"""Returns a list/tuple of one or more items, regardless of input."""
return value if isinstance(value, (tuple, list)) else (value,)
# ########## Helpers ##########
def get(self, eid):
@ -443,22 +475,23 @@ class PyMISP(object):
def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
for link in self._one_or_more(link):
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
for name in self._one_or_more(name):
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
for filename in self._one_or_more(filename):
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
type_value = '{}'
value = '{}'
if rvalue:
type_value = 'regkey|value'
value = '{}|{}'.format(regkey, rvalue)
@ -470,20 +503,36 @@ class PyMISP(object):
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_regkeys(self, event, regkeys_values, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for regkey, rvalue in regkeys_values.items():
if rvalue:
type_value = 'regkey|value'
value = '{}|{}'.format(regkey, rvalue)
else:
type_value = 'regkey'
value = regkey
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
for pattern in self._one_or_more(pattern):
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
for named_pipe in self._one_or_more(named_pipe):
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
@ -495,29 +544,34 @@ class PyMISP(object):
def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
for yara in self._one_or_more(yara):
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Network attributes #####
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
for ipdst in self._one_or_more(ipdst):
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
for ipsrc in self._one_or_more(ipsrc):
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
for hostname in self._one_or_more(hostname):
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
for domain in self._one_or_more(domain):
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
@ -525,107 +579,132 @@ class PyMISP(object):
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for domain, ip in domain_ips.items():
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
for url in self._one_or_more(url):
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
for useragent in self._one_or_more(useragent):
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
for pattern in self._one_or_more(pattern):
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
for snort in self._one_or_more(snort):
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Email attributes #####
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Target attributes #####
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Attribution attributes #####
def add_threat_actor(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Internal reference attributes #####
def add_internal_link(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_comment(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_text(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_other(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##################################################
@ -674,15 +753,17 @@ class PyMISP(object):
with open(path, 'rb') as f:
return str(base64.b64encode(f.read()))
def upload_sample(self, filename, filepath, event_id, distribution, to_ids,
category, comment, info, analysis, threat_level_id):
def upload_sample(self, filename, filepath, event_id, distribution=None,
to_ids=True, category=None, comment=None, info=None,
analysis=None, threat_level_id=None):
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
comment, info, analysis, threat_level_id)
to_post['request']['files'] = [{'filename': filename, 'data': self._encode_file_to_upload(filepath)}]
return self._upload_sample(to_post)
def upload_samplelist(self, filepaths, event_id, distribution, to_ids, category,
info, analysis, threat_level_id):
def upload_samplelist(self, filepaths, event_id, distribution=None,
to_ids=True, category=None, info=None,
analysis=None, threat_level_id=None):
to_post = self.prepare_attribute(event_id, distribution, to_ids, category,
info, analysis, threat_level_id)
files = []

View File

@ -156,6 +156,9 @@ class TestBasic(unittest.TestCase):
time.sleep(1)
self.delete(eventid)
def test_one_or_more(self):
self.assertEqual(self.misp._one_or_more(1), (1,))
self.assertEqual(self.misp._one_or_more([1]), [1])
if __name__ == '__main__':
unittest.main()