From 08e1c40987b321fcf61f06bf9efc73372fc3fc91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= <raphael@vinot.info> Date: Tue, 1 Sep 2015 18:46:10 +0200 Subject: [PATCH] Add helpers to update events with specific attributes. --- examples/create_events.py | 34 +++++++ pymisp/api.py | 195 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 220 insertions(+), 9 deletions(-) create mode 100755 examples/create_events.py diff --git a/examples/create_events.py b/examples/create_events.py new file mode 100755 index 0000000..01685fc --- /dev/null +++ b/examples/create_events.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from pymisp import PyMISP +from keys import url_priv, key_priv +# from keys import url_cert, key_cert +import argparse + +# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one +try: + input = raw_input +except NameError: + pass + + +def init(url, key): + return PyMISP(url, key, True, 'json') + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Send malware sample to MISP.') + parser.add_argument("-d", "--distrib", type=int, help="The distribution setting used for the attributes and for the newly created event, if relevant. [0-3].") + parser.add_argument("-i", "--info", help="Used to populate the event info field if no event ID supplied.") + parser.add_argument("-a", "--analysis", type=int, help="The analysis level of the newly created event, if applicatble. [0-2]") + parser.add_argument("-t", "--threat", type=int, help="The threat level ID of the newly created event, if applicatble. [0-3]") + args = parser.parse_args() + + misp = init(url_priv, key_priv) + # misp = init(url_cert, key_cert) + + event = misp.new_event(args.distrib, args.threat, args.analysis, args.info) + print event + + response = misp.add_mutex(event, 'booh') + print response diff --git a/pymisp/api.py b/pymisp/api.py index 891c33b..62c6f8a 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -5,6 +5,7 @@ import json import datetime +import time import requests import os import base64 @@ -65,6 +66,16 @@ class PyMISP(object): self.ssl = ssl self.out_type = out_type + self.categories = ['Internal reference', 'Targeting data', 'Antivirus detection', + 'Payload delivery', 'Payload installation', 'Artifacts dropped', + 'Persistence mechanism', 'Network activity', 'Payload type', + 'Attribution', 'External analysis', 'Other'] + self.types = ['md5', 'sha1', 'sha256', 'filename', 'filename|md5', 'filename|sha1', + 'filename|sha256', 'ip-src', 'ip-dst', 'hostname', 'domain', 'url', + 'user-agent', 'http-method', 'regkey', 'regkey|value', 'AS', 'snort', + 'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe', 'mutex', + 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment', 'text', 'other'] + def __prepare_session(self, force_out=None): """ Prepare the headers of the session @@ -165,6 +176,172 @@ class PyMISP(object): url = urljoin(self.root_url, 'attributes/{}'.format(attribute_id)) return session.delete(url) + # ############################################## + # ######### Event handling (Json only) ######### + # ############################################## + + def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False): + to_return = {'Event': {}} + # Setup details of a new event + if distribution not in [0, 1, 2, 3]: + raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3'.format(distribution)) + if threat_level_id not in [1, 2, 3, 4]: + raise NewEventError('{} is invalid, the threat_level_id has to be in 1, 2, 3, 4'.format(threat_level_id)) + if analysis not in [0, 1, 2]: + raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(analysis)) + if date is None: + date = datetime.date.today().isoformat() + if published not in [True, False]: + raise NewEventError('{} is invalid, published has to be True or False'.format(published)) + to_return['Event'] = {'distribution': distribution, 'info': info, 'date': date, 'published': published, + 'threat_level_id': threat_level_id, 'analysis': analysis} + return to_return + + def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=None): + to_return = {} + if category not in self.categories: + raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(self.categories)))) + to_return['category'] = category + + if type_value not in self.types: + raise NewAttributeError('{} is invalid, type_value has to be in {}'.format(type_value, (', '.join(self.types)))) + to_return['type'] = type_value + + if to_ids not in [True, False]: + raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(to_ids)) + to_return['to_ids'] = to_ids + + if distribution is not None: + distribution = int(distribution) + # If None: take the default value of the event + if distribution not in [None, 0, 1, 2, 3]: + raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3 or None'.format(distribution)) + if distribution is not None: + to_return['distribution'] = distribution + + to_return['value'] = value + + if comment is not None: + to_return['comment'] = comment + + return to_return + + def _prepare_update(self, event): + # Cleanup the received event to make it publishable + event['Event'].pop('locked', None) + event['Event'].pop('attribute_count', None) + event['Event'].pop('RelatedEvent', None) + event['Event'].pop('orgc', None) + event['Event'].pop('ShadowAttribute', None) + event['Event'].pop('org', None) + event['Event'].pop('proposal_email_lock', None) + event['Event'].pop('publish_timestamp', None) + event['Event'].pop('published', None) + new_timestamp = int(time.time()) + if int(event['Event']['timestamp']) == new_timestamp: + new_timestamp += 1 + event['Event']['timestamp'] = new_timestamp + event['Event']['id'] = int(event['Event']['id']) + return event + + # ########## Helpers ########## + + def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False): + data = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published) + response = self.add_event(data) + return response.json() + + def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None): + categories = ['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis'] + if category not in categories: + raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories)))) + + to_post = self._prepare_update(event) + if distribution is None: + distribution = to_post['Event']['distribution'] + + attributes = [] + type_value = '{}' + value = '{}' + if filename: + type_value = 'filename|{}' + value = filename + '|{}' + if md5: + attributes.append(self._prepare_full_attribute(category, type_value.format('md5'), value.format(md5), + to_ids, comment, distribution)) + if sha1: + attributes.append(self._prepare_full_attribute(category, type_value.format('sha1'), value.format(sha1), + to_ids, comment, distribution)) + if sha256: + attributes.append(self._prepare_full_attribute(category, type_value.format('sha256'), value.format(sha256), + to_ids, comment, distribution)) + to_post['Event']['Attribute'] = attributes + print json.dumps(to_post, indent=2) + response = self.update_event(to_post['Event']['id'], to_post) + return response.json() + + def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=False, comment=None, distribution=None): + to_post = self._prepare_update(event) + if distribution is None: + distribution = to_post['Event']['distribution'] + + type_value = '{}' + value = '{}' + if rvalue: + type_value = 'regkey|value' + value = '{}|{}'.format(regkey, rvalue) + else: + type_value = 'regkey' + value = regkey + + attributes = [] + attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution)) + to_post['Event']['Attribute'] = attributes + print json.dumps(to_post, indent=2) + response = self.update_event(to_post['Event']['id'], to_post) + return response.json() + + def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=False, comment=None, distribution=None): + to_post = self._prepare_update(event) + if distribution is None: + distribution = to_post['Event']['distribution'] + + attributes = [] + if in_file: + attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution)) + if in_memory: + attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution)) + + to_post['Event']['Attribute'] = attributes + response = self.update_event(to_post['Event']['id'], to_post) + return response.json() + + def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=False, comment=None, distribution=None): + to_post = self._prepare_update(event) + if distribution is None: + distribution = to_post['Event']['distribution'] + + attributes = [] + if not named_pipe.startswith('\\.\\pipe\\'): + named_pipe = '\\.\\pipe\\{}'.format(named_pipe) + attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution)) + to_post['Event']['Attribute'] = attributes + response = self.update_event(to_post['Event']['id'], to_post) + return response.json() + + def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=False, comment=None, distribution=None): + to_post = self._prepare_update(event) + if distribution is None: + distribution = to_post['Event']['distribution'] + + attributes = [] + if not mutex.startswith('\\BaseNamedObjects\\'): + mutex = '\\BaseNamedObjects\\{}'.format(mutex) + attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution)) + to_post['Event']['Attribute'] = attributes + response = self.update_event(to_post['Event']['id'], to_post) + return response.json() + # ################################################## # ######### Upload samples through the API ######### # ################################################## @@ -183,21 +360,21 @@ class PyMISP(object): def prepare_attribute(self, event_id, distribution, to_ids, category, info, analysis, threat_level_id): to_post = {'request': {}} + authorized_categs = ['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis'] + if not isinstance(event_id, int): # New event - to_post['request'].update(self._create_event(distribution, threat_level_id, - analysis, info)) + to_post['request'] = self._create_event(distribution, threat_level_id, analysis, info) else: - to_post['request'].update({'event_id': int(event_id)}) + to_post['request']['event_id'] = int(event_id) if to_ids not in [True, False]: - raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(analysis)) - to_post['request'].update({'to_ids': to_ids}) + raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(to_ids)) + to_post['request']['to_ids'] = to_ids - if category not in ['Payload delivery', 'Artifacts dropped', - 'Payload Installation', 'External Analysis']: - raise NewAttributeError('{} is invalid, category has to be in {}'.format(analysis, (', '.join(['Payload delivery', 'Artifacts dropped', 'Payload Installation', 'External Analysis'])))) - to_post['request'].update({'category': category}) + if category not in authorized_categs: + raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(authorized_categs)))) + to_post['request']['category'] = category return to_post