MISP/tools/misp_retention.py

99 lines
3.8 KiB
Python
Executable File

#!/usr/bin/env python3
#
# This script requires the MISP retention taxonomy is installed and enabled
# See https://github.com/MISP/misp-taxonomies/tree/master/retention/retention
from pymisp import ExpandedPyMISP, MISPEvent
from datetime import datetime
from dateutil.relativedelta import relativedelta
import re
from keys import misp_url, misp_key
# pip install pymisp python-dateutil
class misphelper(object):
"""Helper class around a MISP object."""
taxonomyId = None
expiredTag = "retention:expired"
def __init__(self):
self.misp = ExpandedPyMISP(url=misp_url,
key=misp_key,
ssl=True)
self.taxonomyId = self.searchTaxonomy()
def searchTaxonomy(self):
res = self.misp.taxonomies()
for tax in res:
if (tax["Taxonomy"]["namespace"] == "retention" and tax["Taxonomy"]["enabled"]):
return tax["Taxonomy"]["id"]
raise Exception("Could not find the 'retention' Taxonomy in MISP. Please enable this first!")
def processEvent(self, event):
mevent = MISPEvent()
mevent.from_dict(Event=event)
changed = False
for attr in mevent.attributes:
if (attr["type"] == "ip-dst" or attr["type"] == "ip-src") and attr["to_ids"]:
print("Removing IDS flag in event '{}' on attr '{}'".format(mevent.id, attr["value"]))
changed = True
attr["to_ids"] = False
self.misp.update_attribute(attr)
for obj in mevent.objects:
for attr in obj.Attribute:
if (attr["type"] == "ip-dst" or attr["type"] == "ip-src") and attr["to_ids"]:
print("Removing IDS flag in event '{}' on attr '{}'".format(mevent.id, attr["value"]))
changed = True
attr["to_ids"] = False
self.misp.update_attribute(attr)
self.misp.tag(mevent, self.expiredTag, True)
if changed:
self.misp.update_event(mevent.id, mevent)
self.misp.publish(mevent)
def findEventsAfterRetention(self, events, retention):
for event in events:
ts = datetime.strptime(event["Event"]["date"], "%Y-%m-%d")
now = datetime.utcnow()
if retention[1] == "d":
delta = relativedelta(days=int(retention[0]))
elif retention[1] == "w":
delta = relativedelta(weeks=int(retention[0]))
elif retention[1] == "m":
delta = relativedelta(months=int(retention[0]))
elif retention[1] == "y":
delta = relativedelta(years=int(retention[0]))
if ts < (now - delta):
self.processEvent(event["Event"])
def queryRetentionTags(self):
res = self.misp.get_taxonomy(self.taxonomyId)
for tag in res['entries']:
m = re.match(r"^retention:([0-9]+)([d,w,m,y])$", tag["tag"])
if m:
tagSearch = self.misp.build_complex_query(and_parameters = [tag["tag"]], not_parameters = [self.expiredTag])
events = self.misp.search(published=True, tags=tagSearch)
self.findEventsAfterRetention(events, (m.group(1), m.group(2)))
else:
# set expiredTag to hidden if it was accidentally enabled by "enable all"
if tag["tag"] == self.expiredTag:
if tag["existing_tag"]["Tag"]["hide_tag"] is False:
tag["existing_tag"]["Tag"]["hide_tag"] = True
self.misp.update_tag(tag["existing_tag"]["Tag"])
else:
raise Exception("Could not parse retention time/unit from tag: '{}'.".format(tag["tag"]))
if __name__ == "__main__":
misp = misphelper()
misp.queryRetentionTags()