PyMISP/examples/ioc-2-misp/ioc2misp.py

338 lines
11 KiB
Python
Raw Permalink Normal View History

2016-05-12 17:33:13 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Format description
# @variables : camelCase
# @functions : snake_case
from keys import mispUrl, mispKey, csvTaxonomyFile, iocMispMapping
try:
2016-06-13 12:14:32 +02:00
from pymisp import PyMISP
2016-05-12 17:33:13 +02:00
except:
2016-06-13 12:14:32 +02:00
print("you need pymisp form github")
import sys
sys.exit(1)
2016-05-12 17:33:13 +02:00
import os
import argparse
try:
2016-06-13 12:14:32 +02:00
from bs4 import BeautifulSoup
2016-05-12 17:33:13 +02:00
except:
2016-06-13 12:14:32 +02:00
print("install BeautifulSoup : sudo apt-get install python-bs4 python-lxml")
import sys
sys.exit(1)
2016-05-12 17:33:13 +02:00
def misp_init(url, key):
2016-06-13 12:14:32 +02:00
return PyMISP(url, key, False, 'json')
2016-05-12 17:33:13 +02:00
def check_valid_ioc():
2016-06-13 12:14:32 +02:00
(filepath, filename) = os.path.split(iocDescriptions["iocfile"])
(shortname, extension) = os.path.splitext(filename)
if (("ioc" in extension)) and (sum(1 for _ in open(iocDescriptions["iocfile"])) > 1):
iocDescriptions['filename'] = filename
return True
return False
2016-05-12 17:33:13 +02:00
def get_parse_ioc_file():
2016-06-13 12:14:32 +02:00
return BeautifulSoup(open(iocDescriptions["iocfile"]), "lxml")
2016-05-12 17:33:13 +02:00
def parse_ioc_search_content(iocContextSearch):
2016-06-13 12:14:32 +02:00
for k, v in iocMispMapping.items():
if str(k).lower() == str(iocContextSearch).lower():
return v
return False
def create_attribute_json(iocContextSearch, attributeValue, attributeComment, force=False):
#####################################
# force used for description to upload
if force:
parseResult = ("Other", "comment")
else:
parseResult = parse_ioc_search_content(iocContextSearch)
if parseResult is False:
print("/!\ Not implemented :: {0} :: {1} :: Item add as 'Other','Comment'. Add it in your keys.py".format(iocContextSearch, attributeValue))
########################################
# force import to misp
parseResult = ("Other", "comment")
comment = ""
try:
comment = parseResult[2] + attributeComment
except:
comment = attributeComment
attribute = {"category": parseResult[0],
"type": parseResult[1],
"value": attributeValue,
"timestamp": "0",
"to_ids": "0",
"distribution": "0",
"comment": comment
}
return attribute
2016-05-12 17:33:13 +02:00
def create_attributes_from_ioc_json(soup):
2016-06-13 12:14:32 +02:00
attributes = []
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
IndicatorItemValues = {}
for item in soup.find_all("indicatoritem"):
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
if item.find('context'):
IndicatorItemValues["context"] = str(item.find('context')['search'])
else:
IndicatorItemValues["context"] = ""
if item.find('content'):
IndicatorItemValues["content"] = str(item.find('content').text)
else:
IndicatorItemValues["content"] = ""
if item.find('comment'):
IndicatorItemValues["comment"] = str(item.find('comment').text)
else:
IndicatorItemValues["comment"] = ""
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
jsonAttribute = create_attribute_json(IndicatorItemValues["context"], IndicatorItemValues["content"], IndicatorItemValues["comment"])
attributes.append(jsonAttribute)
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
return attributes
2016-05-12 17:33:13 +02:00
def create_misp_event_json(attributes):
2016-06-13 12:14:32 +02:00
import time
if iocDescriptions["authored_by"]:
attributes.append(create_attribute_json(None, "authored_by", iocDescriptions["authored_by"], True))
if iocDescriptions["authored_date"]:
attributes.append(create_attribute_json(None, "authored_date", iocDescriptions["authored_date"], True))
##################################################
# make short-description in "info field
# if not exist make description
# if "info"="short-description" make descrption as comment
mispInfoFild = ""
if iocDescriptions["short_description"]:
mispInfoFild = iocDescriptions["short_description"]
if iocDescriptions["description"]:
attributes.append(create_attribute_json(None, "description", iocDescriptions["description"], True))
else:
if iocDescriptions["description"]:
mispInfoFild = iocDescriptions["description"]
else:
mispInfoFild = "No description or short_description from IOC find."
eventJson = {"Event": {"info": mispInfoFild,
"timestamp": "1",
"attribute_count": 0,
"analysis": "0",
"date": time.strftime("%Y-%m-%d"),
"org": "",
"distribution": "0",
"Attribute": [],
"proposal_email_lock": False,
"threat_level_id": "4",
}}
eventJson["Event"]["Attribute"] = attributes
return eventJson
2016-05-12 17:33:13 +02:00
def get_descriptions(soup, description):
2016-06-13 12:14:32 +02:00
if soup.find(description.lower()):
return soup.find(description.lower()).text
return ""
2016-05-12 17:33:13 +02:00
def save_ioc_description(soup):
2016-06-13 12:14:32 +02:00
list_description = ["short_description", "authored_by", "authored_date", "description"]
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
for description in list_description:
iocDescriptions[description] = get_descriptions(soup, description)
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
return
2016-05-12 17:33:13 +02:00
def get_taxonomy(soup):
2016-06-13 12:14:32 +02:00
import csv
taxonomy = []
reader = csv.reader(open(csvTaxonomyFile, 'rb'), delimiter=';')
#####################################
# save file in a dict
# r[0] = @link from csv
# r[1] = @value from csv
# = value
# r[2] = @keep
# 0 : don't creat tag
# 1 : tag created
# r[3] = @taxonomy
csvdic = {i: r for i, r in enumerate(reader)}
#########################################
# find all link with soup
for n in soup.find_all('link', rel=True):
rel = str(n.attrs['rel'][0]).lower()
##########################
# build special taxo
# special string because link if a html value
relValue = str(n.next_sibling).strip()
if rel == 'family':
if len(relValue) > 0:
taxonomy.append("malware_classification:malware-family='" + relValue + "'")
elif rel == 'threatgroup':
if len(relValue) > 0:
taxonomy.append("malware_classification:malware-threatgroup='" + relValue + "'")
#########################
# build taxo from csv match
else:
taxo = [r[3] for r in {i: r for i, r in csvdic.items() if r[0].lower() == rel and str(r[2]) == "1"}.values() if r[1].lower() == relValue.lower() and str(r[2]) == "1"]
# taxo find in correspondance file
if (len(taxo) > 0 and taxo[0] != ''):
taxonomy.append(taxo[0])
# not find
return taxonomy
2016-05-12 17:33:13 +02:00
def custum_color_tag(tagg):
2016-06-13 12:14:32 +02:00
color = "#00ace6"
if ":amber" in tagg:
color = "#ffc200"
if ":green:" in tagg:
color = "#009933"
if "tlp:green" in tagg:
color = "#009933"
if ":red:" in tagg:
color = "#ff0000"
if "tlp:red" in tagg:
color = "#ff0000"
if "tlp:white" in tagg:
color = "#fafafa"
return color
2016-05-12 17:33:13 +02:00
def push_event_to_misp(jsonEvent):
2016-06-13 12:14:32 +02:00
global misp
####################
# upload json event
event = misp.add_event(jsonEvent)
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
# save event id for file upload and tagg
iocDescriptions["misp_event_id"] = event["Event"]["id"]
return
2016-05-12 17:33:13 +02:00
def upload_file():
2016-06-13 12:14:32 +02:00
# filename,path, eid, distrib, ids, categ, info, ids, analysis, threat
misp.upload_sample(iocDescriptions['filename'],
iocDescriptions["iocfile"],
iocDescriptions["misp_event_id"],
"0",
False,
"External analysis",
iocDescriptions["short_description"],
None,
"1",
"4",
)
return
2016-05-12 17:33:13 +02:00
def update_tag(listOfTagg):
2016-06-13 12:14:32 +02:00
for tagg in listOfTagg:
color = custum_color_tag(tagg)
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
#############################
# creatz tag in MISP
2016-05-12 17:33:13 +02:00
2016-06-13 12:14:32 +02:00
misp.new_tag(str(tagg), str(color))
#############################
# link tag to MISP event
toPost = {}
toPost['Event'] = {'id': iocDescriptions["misp_event_id"]}
misp.add_tag(toPost, str(tagg))
return
2016-05-12 17:33:13 +02:00
def main():
2016-06-13 12:14:32 +02:00
global misp
global iocDescriptions
iocDescriptions = {}
################################
# parse for valid argments
parser = argparse.ArgumentParser(description='Get an event from a MISP instance.')
parser.add_argument("-i", "--input", required=True, help="Input file")
parser.add_argument("-t", "--tag", help="Add custom tags 'tlp:red,cossi:tmp=test'")
args = parser.parse_args()
iocDescriptions["iocfile"] = os.path.abspath(args.input)
################################
# check if file have ioc extention and if he is not empty
if check_valid_ioc():
################################
# Try to parse file
iocfileparse = get_parse_ioc_file()
else:
print("/!\ Bad format {0}".format(iocDescriptions["iocfile"]))
return
################################
# save description for create event
save_ioc_description(iocfileparse)
################################
# parse ioc and buid json attributes
jsonAttributes = create_attributes_from_ioc_json(iocfileparse)
################################
# create a json misp event and append attributes
jsonEvent = create_misp_event_json(jsonAttributes)
################################
# try connection
try:
misp = misp_init(mispUrl, mispKey)
except:
print("/!\ Connection fail, bad url ({0}) or API key : {1}".format(mispUrl, mispKey))
return
################################
# Add event to MSIP
push_event_to_misp(jsonEvent)
################################
# Upload the IOC file and close tmpfile
upload_file()
################################
# Update MISP Event with tag from IOC
update_tag(get_taxonomy(iocfileparse))
################################
# Add custom Tag (-t)
if args.tag:
customTag = args.tag
update_tag(customTag.split(","))
2016-05-12 17:33:13 +02:00
if __name__ == '__main__':
2016-06-13 12:14:32 +02:00
main()