Merge branch 'master' of github.com:CIRCL/PyMISP

pull/3/head
Alexandre Dulaunoy 2015-11-06 11:29:34 +01:00
commit 7ddfff3f4d
3 changed files with 203 additions and 46 deletions

View File

@ -1,3 +1,3 @@
__version__ = '2.1'
__version__ = '2.1.1'
from .api import PyMISP, PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey

View File

@ -107,8 +107,11 @@ class PyMISP(object):
self.types = ['md5', 'sha1', 'sha256', 'filename', 'filename|md5', 'filename|sha1',
'filename|sha256', 'ip-src', 'ip-dst', 'hostname', 'domain', 'url',
'user-agent', 'http-method', 'regkey', 'regkey|value', 'AS', 'snort',
'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe', 'mutex',
'vulnerability', 'attachment', 'malware-sample', 'link', 'comment', 'text', 'other']
'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe',
'mutex', 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment',
'text', 'email-src', 'email-dst', 'email-subject', 'email-attachment',
'yara', 'target-user', 'target-email', 'target-machine', 'target-org',
'target-location', 'target-external', 'other']
try:
# Make sure the MISP instance is working and the URL is valid
@ -310,16 +313,19 @@ class PyMISP(object):
# ##### File attributes #####
def _send_attributes(self, event, attributes):
event = self._prepare_update(event)
for a in attributes:
if a.get('distribution') is None:
a['distribution'] = event['Event']['distribution']
event['Event']['Attribute'] = attributes
response = self.update_event(event['Event']['id'], event, 'json')
def _send_attributes(self, event, attributes, proposal=False):
if proposal:
response = self.proposal_add(event['Event']['id'], attributes)
else:
event = self._prepare_update(event)
for a in attributes:
if a.get('distribution') is None:
a['distribution'] = event['Event']['distribution']
event['Event']['Attribute'] = attributes
response = self.update_event(event['Event']['id'], event, 'json')
return self._check_response(response)
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None):
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None, proposal=False):
categories = ['Payload delivery', 'Artifacts dropped', 'Payload installation', 'External analysis']
if category not in categories:
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories))))
@ -340,9 +346,9 @@ class PyMISP(object):
attributes.append(self._prepare_full_attribute(category, type_value.format('sha256'), value.format(sha256),
to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
type_value = '{}'
value = '{}'
if rvalue:
@ -354,67 +360,129 @@ class PyMISP(object):
attributes = []
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if not mutex.startswith('\\BaseNamedObjects\\'):
mutex = '\\BaseNamedObjects\\{}'.format(mutex)
attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
# ##### Network attributes #####
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None):
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None):
def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None):
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None):
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None):
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None):
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None):
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
return self._send_attributes(event, attributes)
return self._send_attributes(event, attributes, proposal)
# ##### Email attributes #####
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
categories = ['Payload delivery', 'Network activity']
if category not in categories:
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories))))
attributes = []
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Target attributes #####
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##################################################
# ######### Upload samples through the API #########
@ -486,6 +554,51 @@ class PyMISP(object):
response = session.post(url, data=json.dumps(to_post))
return self._check_response(response)
# ############################
# ######## Proposals #########
# ############################
def __query_proposal(self, session, path, id, attribute=None):
path = path.strip('/')
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
query = None
if path in ['add', 'edit']:
query = {'request': {'ShadowAttribute': attribute}}
if path == 'view':
response = session.get(url)
else:
if query is not None:
response = session.post(url, data=json.dumps(query))
else:
response = session.post(url)
return self._check_response(response)
def proposal_view(self, event_id=None, proposal_id=None):
session = self.__prepare_session('json')
if proposal_id is not None and event_id is not None:
return {'error': 'You can only view an event ID or a proposal ID'}
if event_id is not None:
id = event_id
else:
id = proposal_id
return self.__query_proposal(session, 'view', id)
def proposal_add(self, event_id, attribute):
session = self.__prepare_session('json')
return self.__query_proposal(session, 'add', event_id, attribute)
def proposal_edit(self, attribute_id, attribute):
session = self.__prepare_session('json')
return self.__query_proposal(session, 'edit', attribute_id, attribute)
def proposal_accept(self, proposal_id):
session = self.__prepare_session('json')
return self.__query_proposal(session, 'accept', proposal_id)
def proposal_discard(self, proposal_id):
session = self.__prepare_session('json')
return self.__query_proposal(session, 'discard', proposal_id)
# ##############################
# ######## REST Search #########
# ##############################

View File

@ -33,11 +33,12 @@ class TestBasic(unittest.TestCase):
def new_event(self):
event = self.misp.new_event(0, 1, 0, "This is a test")
event_id = self._clean_event(event)
self.assertEqual(event, {u'Event': {u'info': u'This is a test', u'locked': False,
u'attribute_count': u'0', u'analysis': u'0',
u'ShadowAttribute': [], u'published': False,
u'distribution': u'0', u'Attribute': [], u'proposal_email_lock': False,
u'threat_level_id': u'1'}})
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
u'attribute_count': u'0', u'analysis': u'0',
u'ShadowAttribute': [], u'published': False,
u'distribution': u'0', u'Attribute': [], u'proposal_email_lock': False,
u'threat_level_id': u'1'}},
self.assertEqual(event, to_check, 'Failed at creating a new Event')
return int(event_id)
def add_hashes(self, eventid):
@ -59,7 +60,7 @@ class TestBasic(unittest.TestCase):
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
self.assertEqual(event, to_check)
self.assertEqual(event, to_check, 'Failed at adding hashes')
def publish(self, eventid):
r = self.misp.get_event(eventid)
@ -80,19 +81,62 @@ class TestBasic(unittest.TestCase):
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
self.assertEqual(event, to_check)
self.assertEqual(event, to_check, 'Failed at publishing event')
def delete(self, eventid):
event = self.misp.delete_event(eventid)
event.json()
print event.json()
def test_all(self):
def delete_attr(self, attrid):
event = self.misp.delete_attribute(attrid)
print event.json()
def get(self, eventid):
event = self.misp.get_event(eventid)
print event.json()
def add(self):
event = {u'Event': {u'info': u'This is a test', u'locked': False,
u'attribute_count': u'3', u'analysis': u'0',
u'ShadowAttribute': [], u'published': False, u'distribution': u'0',
u'Attribute': [
{u'category': u'Payload installation', u'comment': u'Fanny modules',
u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a',
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'},
{u'category': u'Payload installation', u'comment': u'Fanny modules',
u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7',
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'},
{u'category': u'Payload installation', u'comment': u'Fanny modules',
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
event = self.misp.add_event(event)
print event.json()
def test_create_event(self):
eventid = self.new_event()
time.sleep(1)
self.add_hashes(eventid)
time.sleep(1)
self.publish(eventid)
self.delete(eventid)
def test_get_event(self):
eventid = self.new_event()
time.sleep(1)
self.get(eventid)
time.sleep(1)
self.delete(eventid)
def test_add_event(self):
self.add()
time.sleep(1)
self.delete(1)
def test_del_attr(self):
eventid = self.new_event()
time.sleep(1)
self.delete_attr(1)
time.sleep(1)
self.delete(eventid)
if __name__ == '__main__':
unittest.main()