mirror of https://github.com/MISP/PyMISP
Add helpers to update events with specific attributes.
parent
0d112bea47
commit
08e1c40987
|
@ -0,0 +1,34 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from pymisp import PyMISP
|
||||
from keys import url_priv, key_priv
|
||||
# from keys import url_cert, key_cert
|
||||
import argparse
|
||||
|
||||
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
|
||||
try:
|
||||
input = raw_input
|
||||
except NameError:
|
||||
pass
|
||||
|
||||
|
||||
def init(url, key):
|
||||
return PyMISP(url, key, True, 'json')
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Send malware sample to MISP.')
|
||||
parser.add_argument("-d", "--distrib", type=int, help="The distribution setting used for the attributes and for the newly created event, if relevant. [0-3].")
|
||||
parser.add_argument("-i", "--info", help="Used to populate the event info field if no event ID supplied.")
|
||||
parser.add_argument("-a", "--analysis", type=int, help="The analysis level of the newly created event, if applicatble. [0-2]")
|
||||
parser.add_argument("-t", "--threat", type=int, help="The threat level ID of the newly created event, if applicatble. [0-3]")
|
||||
args = parser.parse_args()
|
||||
|
||||
misp = init(url_priv, key_priv)
|
||||
# misp = init(url_cert, key_cert)
|
||||
|
||||
event = misp.new_event(args.distrib, args.threat, args.analysis, args.info)
|
||||
print event
|
||||
|
||||
response = misp.add_mutex(event, 'booh')
|
||||
print response
|
195
pymisp/api.py
195
pymisp/api.py
|
@ -5,6 +5,7 @@
|
|||
|
||||
import json
|
||||
import datetime
|
||||
import time
|
||||
import requests
|
||||
import os
|
||||
import base64
|
||||
|
@ -65,6 +66,16 @@ class PyMISP(object):
|
|||
self.ssl = ssl
|
||||
self.out_type = out_type
|
||||
|
||||
self.categories = ['Internal reference', 'Targeting data', 'Antivirus detection',
|
||||
'Payload delivery', 'Payload installation', 'Artifacts dropped',
|
||||
'Persistence mechanism', 'Network activity', 'Payload type',
|
||||
'Attribution', 'External analysis', 'Other']
|
||||
self.types = ['md5', 'sha1', 'sha256', 'filename', 'filename|md5', 'filename|sha1',
|
||||
'filename|sha256', 'ip-src', 'ip-dst', 'hostname', 'domain', 'url',
|
||||
'user-agent', 'http-method', 'regkey', 'regkey|value', 'AS', 'snort',
|
||||
'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe', 'mutex',
|
||||
'vulnerability', 'attachment', 'malware-sample', 'link', 'comment', 'text', 'other']
|
||||
|
||||
def __prepare_session(self, force_out=None):
|
||||
"""
|
||||
Prepare the headers of the session
|
||||
|
@ -165,6 +176,172 @@ class PyMISP(object):
|
|||
url = urljoin(self.root_url, 'attributes/{}'.format(attribute_id))
|
||||
return session.delete(url)
|
||||
|
||||
# ##############################################
|
||||
# ######### Event handling (Json only) #########
|
||||
# ##############################################
|
||||
|
||||
def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False):
|
||||
to_return = {'Event': {}}
|
||||
# Setup details of a new event
|
||||
if distribution not in [0, 1, 2, 3]:
|
||||
raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3'.format(distribution))
|
||||
if threat_level_id not in [1, 2, 3, 4]:
|
||||
raise NewEventError('{} is invalid, the threat_level_id has to be in 1, 2, 3, 4'.format(threat_level_id))
|
||||
if analysis not in [0, 1, 2]:
|
||||
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(analysis))
|
||||
if date is None:
|
||||
date = datetime.date.today().isoformat()
|
||||
if published not in [True, False]:
|
||||
raise NewEventError('{} is invalid, published has to be True or False'.format(published))
|
||||
to_return['Event'] = {'distribution': distribution, 'info': info, 'date': date, 'published': published,
|
||||
'threat_level_id': threat_level_id, 'analysis': analysis}
|
||||
return to_return
|
||||
|
||||
def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=None):
|
||||
to_return = {}
|
||||
if category not in self.categories:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(self.categories))))
|
||||
to_return['category'] = category
|
||||
|
||||
if type_value not in self.types:
|
||||
raise NewAttributeError('{} is invalid, type_value has to be in {}'.format(type_value, (', '.join(self.types))))
|
||||
to_return['type'] = type_value
|
||||
|
||||
if to_ids not in [True, False]:
|
||||
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(to_ids))
|
||||
to_return['to_ids'] = to_ids
|
||||
|
||||
if distribution is not None:
|
||||
distribution = int(distribution)
|
||||
# If None: take the default value of the event
|
||||
if distribution not in [None, 0, 1, 2, 3]:
|
||||
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3 or None'.format(distribution))
|
||||
if distribution is not None:
|
||||
to_return['distribution'] = distribution
|
||||
|
||||
to_return['value'] = value
|
||||
|
||||
if comment is not None:
|
||||
to_return['comment'] = comment
|
||||
|
||||
return to_return
|
||||
|
||||
def _prepare_update(self, event):
|
||||
# Cleanup the received event to make it publishable
|
||||
event['Event'].pop('locked', None)
|
||||
event['Event'].pop('attribute_count', None)
|
||||
event['Event'].pop('RelatedEvent', None)
|
||||
event['Event'].pop('orgc', None)
|
||||
event['Event'].pop('ShadowAttribute', None)
|
||||
event['Event'].pop('org', None)
|
||||
event['Event'].pop('proposal_email_lock', None)
|
||||
event['Event'].pop('publish_timestamp', None)
|
||||
event['Event'].pop('published', None)
|
||||
new_timestamp = int(time.time())
|
||||
if int(event['Event']['timestamp']) == new_timestamp:
|
||||
new_timestamp += 1
|
||||
event['Event']['timestamp'] = new_timestamp
|
||||
event['Event']['id'] = int(event['Event']['id'])
|
||||
return event
|
||||
|
||||
# ########## Helpers ##########
|
||||
|
||||
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False):
|
||||
data = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published)
|
||||
response = self.add_event(data)
|
||||
return response.json()
|
||||
|
||||
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None):
|
||||
categories = ['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis']
|
||||
if category not in categories:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories))))
|
||||
|
||||
to_post = self._prepare_update(event)
|
||||
if distribution is None:
|
||||
distribution = to_post['Event']['distribution']
|
||||
|
||||
attributes = []
|
||||
type_value = '{}'
|
||||
value = '{}'
|
||||
if filename:
|
||||
type_value = 'filename|{}'
|
||||
value = filename + '|{}'
|
||||
if md5:
|
||||
attributes.append(self._prepare_full_attribute(category, type_value.format('md5'), value.format(md5),
|
||||
to_ids, comment, distribution))
|
||||
if sha1:
|
||||
attributes.append(self._prepare_full_attribute(category, type_value.format('sha1'), value.format(sha1),
|
||||
to_ids, comment, distribution))
|
||||
if sha256:
|
||||
attributes.append(self._prepare_full_attribute(category, type_value.format('sha256'), value.format(sha256),
|
||||
to_ids, comment, distribution))
|
||||
to_post['Event']['Attribute'] = attributes
|
||||
print json.dumps(to_post, indent=2)
|
||||
response = self.update_event(to_post['Event']['id'], to_post)
|
||||
return response.json()
|
||||
|
||||
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=False, comment=None, distribution=None):
|
||||
to_post = self._prepare_update(event)
|
||||
if distribution is None:
|
||||
distribution = to_post['Event']['distribution']
|
||||
|
||||
type_value = '{}'
|
||||
value = '{}'
|
||||
if rvalue:
|
||||
type_value = 'regkey|value'
|
||||
value = '{}|{}'.format(regkey, rvalue)
|
||||
else:
|
||||
type_value = 'regkey'
|
||||
value = regkey
|
||||
|
||||
attributes = []
|
||||
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
|
||||
to_post['Event']['Attribute'] = attributes
|
||||
print json.dumps(to_post, indent=2)
|
||||
response = self.update_event(to_post['Event']['id'], to_post)
|
||||
return response.json()
|
||||
|
||||
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=False, comment=None, distribution=None):
|
||||
to_post = self._prepare_update(event)
|
||||
if distribution is None:
|
||||
distribution = to_post['Event']['distribution']
|
||||
|
||||
attributes = []
|
||||
if in_file:
|
||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
|
||||
if in_memory:
|
||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
|
||||
|
||||
to_post['Event']['Attribute'] = attributes
|
||||
response = self.update_event(to_post['Event']['id'], to_post)
|
||||
return response.json()
|
||||
|
||||
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=False, comment=None, distribution=None):
|
||||
to_post = self._prepare_update(event)
|
||||
if distribution is None:
|
||||
distribution = to_post['Event']['distribution']
|
||||
|
||||
attributes = []
|
||||
if not named_pipe.startswith('\\.\\pipe\\'):
|
||||
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
|
||||
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
|
||||
to_post['Event']['Attribute'] = attributes
|
||||
response = self.update_event(to_post['Event']['id'], to_post)
|
||||
return response.json()
|
||||
|
||||
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=False, comment=None, distribution=None):
|
||||
to_post = self._prepare_update(event)
|
||||
if distribution is None:
|
||||
distribution = to_post['Event']['distribution']
|
||||
|
||||
attributes = []
|
||||
if not mutex.startswith('\\BaseNamedObjects\\'):
|
||||
mutex = '\\BaseNamedObjects\\{}'.format(mutex)
|
||||
attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution))
|
||||
to_post['Event']['Attribute'] = attributes
|
||||
response = self.update_event(to_post['Event']['id'], to_post)
|
||||
return response.json()
|
||||
|
||||
# ##################################################
|
||||
# ######### Upload samples through the API #########
|
||||
# ##################################################
|
||||
|
@ -183,21 +360,21 @@ class PyMISP(object):
|
|||
def prepare_attribute(self, event_id, distribution, to_ids, category, info,
|
||||
analysis, threat_level_id):
|
||||
to_post = {'request': {}}
|
||||
authorized_categs = ['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis']
|
||||
|
||||
if not isinstance(event_id, int):
|
||||
# New event
|
||||
to_post['request'].update(self._create_event(distribution, threat_level_id,
|
||||
analysis, info))
|
||||
to_post['request'] = self._create_event(distribution, threat_level_id, analysis, info)
|
||||
else:
|
||||
to_post['request'].update({'event_id': int(event_id)})
|
||||
to_post['request']['event_id'] = int(event_id)
|
||||
|
||||
if to_ids not in [True, False]:
|
||||
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(analysis))
|
||||
to_post['request'].update({'to_ids': to_ids})
|
||||
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(to_ids))
|
||||
to_post['request']['to_ids'] = to_ids
|
||||
|
||||
if category not in ['Payload delivery', 'Artifacts dropped',
|
||||
'Payload Installation', 'External Analysis']:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(analysis, (', '.join(['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis']))))
|
||||
to_post['request'].update({'category': category})
|
||||
if category not in authorized_categs:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(authorized_categs))))
|
||||
to_post['request']['category'] = category
|
||||
|
||||
return to_post
|
||||
|
||||
|
|
Loading…
Reference in New Issue