mirror of https://github.com/MISP/PyMISP
Merge pull request #461 from cudeso/master
Disable to_ids based on false positive sightings reportingpull/469/head
commit
c23375c275
|
@ -0,0 +1,136 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
'''
|
||||||
|
Koen Van Impe
|
||||||
|
|
||||||
|
Disable the to_ids flag of an attribute when there are to many false positives
|
||||||
|
Put this script in crontab to run every /15 or /60
|
||||||
|
*/5 * * * * mispuser /usr/bin/python3 /home/mispuser/PyMISP/examples/falsepositive_disabletoids.py
|
||||||
|
|
||||||
|
Do inline config in "main"
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
from pymisp import ExpandedPyMISP, MISPEvent
|
||||||
|
from keys import misp_url, misp_key, misp_verifycert
|
||||||
|
from datetime import datetime
|
||||||
|
from datetime import date
|
||||||
|
|
||||||
|
import datetime as dt
|
||||||
|
import smtplib
|
||||||
|
import mimetypes
|
||||||
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email import encoders
|
||||||
|
from email.mime.base import MIMEBase
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
|
||||||
|
def init(url, key, verifycert):
|
||||||
|
'''
|
||||||
|
Template to get MISP module started
|
||||||
|
'''
|
||||||
|
return ExpandedPyMISP(url, key, verifycert, 'json')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
minimal_fp = 0
|
||||||
|
threshold_to_ids = .50
|
||||||
|
minimal_date_sighting_date = '1970-01-01 00:00:00'
|
||||||
|
|
||||||
|
smtp_from = 'INSERT_FROM'
|
||||||
|
smtp_to = 'INSERT_TO'
|
||||||
|
smtp_server = 'localhost'
|
||||||
|
report_changes = ''
|
||||||
|
ts_format = '%Y-%m-%d %H:%M:%S'
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Disable the to_ids flag of attributes with a certain number of false positives above a threshold.")
|
||||||
|
parser.add_argument('-m', '--mail', action='store_true', help='Mail the report')
|
||||||
|
parser.add_argument('-o', '--mailoptions', action='store', help='mailoptions: \'smtp_from=INSERT_FROM;smtp_to=INSERT_TO;smtp_server=localhost\'')
|
||||||
|
parser.add_argument('-b', '--minimal-fp', default=minimal_fp, type=int, help='Minimal number of false positive (default: %(default)s )')
|
||||||
|
parser.add_argument('-t', '--threshold', default=threshold_to_ids, type=float, help='Threshold false positive/true positive rate (default: %(default)s )')
|
||||||
|
parser.add_argument('-d', '--minimal-date-sighting', default=minimal_date_sighting_date, help='Minimal date for sighting (false positive / true positive) (default: %(default)s )')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
misp = init(misp_url, misp_key, misp_verifycert)
|
||||||
|
|
||||||
|
minimal_fp = int(args.minimal_fp)
|
||||||
|
threshold_to_ids = args.threshold
|
||||||
|
minimal_date_sighting_date = args.minimal_date_sighting
|
||||||
|
minimal_date_sighting = int(dt.datetime.strptime(minimal_date_sighting_date, '%Y-%m-%d %H:%M:%S').strftime("%s"))
|
||||||
|
|
||||||
|
# Fetch all the attributes
|
||||||
|
result = misp.search('attributes', to_ids=1, include_sightings=1)
|
||||||
|
|
||||||
|
if 'Attribute' in result:
|
||||||
|
for attribute in result['Attribute']:
|
||||||
|
true_positive = 0
|
||||||
|
false_positive = 0
|
||||||
|
compute_threshold = 0
|
||||||
|
attribute_id = attribute['id']
|
||||||
|
attribute_value = attribute['value']
|
||||||
|
attribute_uuid = attribute['uuid']
|
||||||
|
event_id = attribute['event_id']
|
||||||
|
|
||||||
|
# Only do something if there is a sighting
|
||||||
|
if 'Sighting' in attribute:
|
||||||
|
|
||||||
|
for sighting in attribute['Sighting']:
|
||||||
|
if int(sighting['date_sighting']) > minimal_date_sighting:
|
||||||
|
if int(sighting['type']) == 0:
|
||||||
|
true_positive = true_positive + 1
|
||||||
|
elif int(sighting['type']) == 1:
|
||||||
|
false_positive = false_positive + 1
|
||||||
|
|
||||||
|
if false_positive > minimal_fp:
|
||||||
|
compute_threshold = false_positive / (true_positive + false_positive)
|
||||||
|
|
||||||
|
if compute_threshold >= threshold_to_ids:
|
||||||
|
# Fetch event title for report text
|
||||||
|
event_details = misp.get_event(event_id)
|
||||||
|
event_info = event_details['Event']['info']
|
||||||
|
|
||||||
|
misp.update_attribute( { 'uuid': attribute_uuid, 'to_ids': 0})
|
||||||
|
|
||||||
|
report_changes = report_changes + 'Disable to_ids for [%s] (%s) in event [%s] (%s) - FP: %s TP: %s \n' % (attribute_value, attribute_id, event_info, event_id, false_positive, true_positive)
|
||||||
|
|
||||||
|
# Changing the attribute to_ids flag sets the event to unpublished
|
||||||
|
misp.publish(event_id)
|
||||||
|
|
||||||
|
# Only send/print the report if it contains content
|
||||||
|
if report_changes:
|
||||||
|
if args.mail:
|
||||||
|
if args.mailoptions:
|
||||||
|
mailoptions = args.mailoptions.split(';')
|
||||||
|
for s in mailoptions:
|
||||||
|
if s.split('=')[0] == 'smtp_from':
|
||||||
|
smtp_from = s.split('=')[1]
|
||||||
|
if s.split('=')[0] == 'smtp_to':
|
||||||
|
smtp_to = s.split('=')[1]
|
||||||
|
if s.split('=')[0] == 'smtp_server':
|
||||||
|
smtp_server = s.split('=')[1]
|
||||||
|
|
||||||
|
now = datetime.now()
|
||||||
|
current_date = now.strftime(ts_format)
|
||||||
|
report_changes_body = 'MISP Disable to_ids flags for %s on %s\n-------------------------------------------------------------------------------\n\n' % (misp_url, current_date)
|
||||||
|
report_changes_body = report_changes_body + 'Minimal number of false positives before considering threshold: %s\n' % (minimal_fp)
|
||||||
|
report_changes_body = report_changes_body + 'Threshold false positives/true positives to disable to_ids flag: %s\n' % (threshold_to_ids)
|
||||||
|
report_changes_body = report_changes_body + 'Minimal date for sighting false positives: %s\n\n' % (minimal_date_sighting_date)
|
||||||
|
report_changes_body = report_changes_body + report_changes
|
||||||
|
report_changes_body = report_changes_body + '\nEvents that have attributes with changed to_ids flag have been republished, without e-mail notification.'
|
||||||
|
report_changes_body = report_changes_body + '\n\nMISP Disable to_ids Finished\n'
|
||||||
|
|
||||||
|
subject = 'Report of disable to_ids flag for false positives sightings of %s' % (current_date)
|
||||||
|
msg = MIMEMultipart()
|
||||||
|
msg['From'] = smtp_from
|
||||||
|
msg['To'] = smtp_to
|
||||||
|
msg['Subject'] = subject
|
||||||
|
|
||||||
|
msg.attach(MIMEText(report_changes_body, 'text'))
|
||||||
|
print(report_changes_body)
|
||||||
|
server = smtplib.SMTP(smtp_server)
|
||||||
|
server.sendmail(smtp_from, smtp_to, msg.as_string())
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(report_changes)
|
|
@ -0,0 +1,165 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
'''
|
||||||
|
Koen Van Impe
|
||||||
|
|
||||||
|
List all the sightings
|
||||||
|
|
||||||
|
Put this script in crontab to run every day
|
||||||
|
25 4 * * * mispuser /usr/bin/python3 /home/mispuser/PyMISP/examples/show_sightings.py
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
from pymisp import ExpandedPyMISP
|
||||||
|
from keys import misp_url, misp_key, misp_verifycert
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
import smtplib
|
||||||
|
import mimetypes
|
||||||
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email import encoders
|
||||||
|
from email.mime.base import MIMEBase
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
|
||||||
|
def init(url, key, verifycert):
|
||||||
|
'''
|
||||||
|
Template to get MISP module started
|
||||||
|
'''
|
||||||
|
return ExpandedPyMISP(url, key, verifycert, 'json')
|
||||||
|
|
||||||
|
|
||||||
|
def set_drift_timestamp(drift_timestamp, drift_timestamp_path):
|
||||||
|
'''
|
||||||
|
Save the timestamp in a (local) file
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
with open(drift_timestamp_path, 'w+') as f:
|
||||||
|
f.write(str(drift_timestamp))
|
||||||
|
return True
|
||||||
|
except IOError:
|
||||||
|
sys.exit("Unable to write drift_timestamp %s to %s" % (drift_timestamp, drift_timestamp_path))
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_drift_timestamp(drift_timestamp_path):
|
||||||
|
'''
|
||||||
|
From when do we start with the sightings?
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
with open(drift_timestamp_path) as f:
|
||||||
|
drift = f.read()
|
||||||
|
if drift:
|
||||||
|
drift = int(float(drift))
|
||||||
|
else:
|
||||||
|
drift = 0
|
||||||
|
except IOError:
|
||||||
|
drift = 0
|
||||||
|
|
||||||
|
return drift
|
||||||
|
|
||||||
|
|
||||||
|
def search_sightings(misp, from_timestamp, end_timestamp):
|
||||||
|
'''
|
||||||
|
Search all the sightings
|
||||||
|
'''
|
||||||
|
completed_sightings = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
found_sightings = misp.search_sightings(date_from=from_timestamp, date_to=end_timestamp)
|
||||||
|
except Exception as e:
|
||||||
|
sys.exit('Unable to search for sightings')
|
||||||
|
|
||||||
|
if found_sightings is not None:
|
||||||
|
for s in found_sightings:
|
||||||
|
if 'Sighting' in s:
|
||||||
|
sighting = s['Sighting']
|
||||||
|
if 'attribute_id' in sighting:
|
||||||
|
attribute_id = sighting['attribute_id']
|
||||||
|
|
||||||
|
# Query the attribute and event to get the details
|
||||||
|
try:
|
||||||
|
attribute = misp.get_attribute(attribute_id)
|
||||||
|
except Exception as e:
|
||||||
|
print("Unable to fetch attribute")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if 'Attribute' in attribute and 'uuid' in attribute['Attribute']:
|
||||||
|
event_details = misp.get_event(attribute['Attribute']['event_id'])
|
||||||
|
event_info = event_details['Event']['info']
|
||||||
|
attribute_uuid = attribute['Attribute']['uuid']
|
||||||
|
completed_sightings.append({'attribute_uuid': attribute_uuid, 'date_sighting': sighting['date_sighting'], 'source': sighting['source'], 'type': sighting['type'], 'uuid': sighting['uuid'], 'event_id': attribute['Attribute']['event_id'], 'value': attribute['Attribute']['value'], 'attribute_id': attribute['Attribute']['id'], 'event_title': event_info})
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return completed_sightings
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
smtp_from = 'INSERT_FROM'
|
||||||
|
smtp_to = 'INSERT_TO'
|
||||||
|
smtp_server = 'localhost'
|
||||||
|
report_sightings = ''
|
||||||
|
ts_format = '%Y-%m-%d %H:%M:%S'
|
||||||
|
drift_timestamp_path = '/home/mispuser/PyMISP/examples/show_sightings.drift'
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Show all the sightings.")
|
||||||
|
parser.add_argument('-m', '--mail', action='store_true', help='Mail the report')
|
||||||
|
parser.add_argument('-o', '--mailoptions', action='store', help='mailoptions: \'smtp_from=INSERT_FROM;smtp_to=INSERT_TO;smtp_server=localhost\'')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
misp = init(misp_url, misp_key, misp_verifycert)
|
||||||
|
|
||||||
|
start_timestamp = get_drift_timestamp(drift_timestamp_path=drift_timestamp_path)
|
||||||
|
end_timestamp = time.time()
|
||||||
|
start_timestamp_s = datetime.fromtimestamp(start_timestamp).strftime(ts_format)
|
||||||
|
end_timestamp_s = datetime.fromtimestamp(end_timestamp).strftime(ts_format)
|
||||||
|
|
||||||
|
# Get all attribute sightings
|
||||||
|
found_sightings = search_sightings(misp, start_timestamp, end_timestamp)
|
||||||
|
if found_sightings:
|
||||||
|
for s in found_sightings:
|
||||||
|
if int(s['type']) == 0:
|
||||||
|
s_type = 'TP'
|
||||||
|
else:
|
||||||
|
s_type = 'FP'
|
||||||
|
date_sighting = datetime.fromtimestamp(int(s['date_sighting'])).strftime(ts_format)
|
||||||
|
source = s['source']
|
||||||
|
if not s['source']:
|
||||||
|
source = 'N/A'
|
||||||
|
report_sightings = report_sightings + '%s for [%s] (%s) in event [%s] (%s) on %s from %s\n' % (s_type, s['value'], s['attribute_id'], s['event_title'], s['event_id'], date_sighting, source)
|
||||||
|
|
||||||
|
set_drift_timestamp(end_timestamp, drift_timestamp_path)
|
||||||
|
else:
|
||||||
|
report_sightings = 'No sightings found'
|
||||||
|
|
||||||
|
# Mail options
|
||||||
|
if args.mail:
|
||||||
|
if args.mailoptions:
|
||||||
|
mailoptions = args.mailoptions.split(';')
|
||||||
|
for s in mailoptions:
|
||||||
|
if s.split('=')[0] == 'smtp_from':
|
||||||
|
smtp_from = s.split('=')[1]
|
||||||
|
if s.split('=')[0] == 'smtp_to':
|
||||||
|
smtp_to = s.split('=')[1]
|
||||||
|
if s.split('=')[0] == 'smtp_server':
|
||||||
|
smtp_server = s.split('=')[1]
|
||||||
|
|
||||||
|
report_sightings_body = 'MISP Sightings report for %s between %s and %s\n-------------------------------------------------------------------------------\n\n' % (misp_url, start_timestamp_s, end_timestamp_s)
|
||||||
|
report_sightings_body = report_sightings_body + report_sightings
|
||||||
|
subject = 'Report of sightings between %s and %s' % (start_timestamp_s, end_timestamp_s)
|
||||||
|
|
||||||
|
msg = MIMEMultipart()
|
||||||
|
msg['From'] = smtp_from
|
||||||
|
msg['To'] = smtp_to
|
||||||
|
msg['Subject'] = subject
|
||||||
|
|
||||||
|
msg.attach(MIMEText(report_sightings_body, 'text'))
|
||||||
|
server = smtplib.SMTP(smtp_server)
|
||||||
|
server.sendmail(smtp_from, smtp_to, msg.as_string())
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(report_sightings)
|
Loading…
Reference in New Issue