diff --git a/examples/ioc-2-misp/ioc2misp.py b/examples/ioc-2-misp/ioc2misp.py old mode 100644 new mode 100755 index ecbeb60..5becc74 --- a/examples/ioc-2-misp/ioc2misp.py +++ b/examples/ioc-2-misp/ioc2misp.py @@ -8,341 +8,331 @@ from keys import mispUrl, mispKey, csvTaxonomyFile, iocMispMapping try: - from pymisp import PyMISP + from pymisp import PyMISP except: - print "you need pymisp form github" - import sys - sys.exit(1) - -import json + print("you need pymisp form github") + import sys + sys.exit(1) + import os import argparse try: - from bs4 import BeautifulSoup + from bs4 import BeautifulSoup except: - print "install BeautifulSoup : sudo apt-get install python-bs4 python-lxml" - import sys - sys.exit(1) + print("install BeautifulSoup : sudo apt-get install python-bs4 python-lxml") + import sys + sys.exit(1) + def misp_init(url, key): - return PyMISP(url, key, False, 'json') - -def check_valid_ioc(): - - (filepath, filename) = os.path.split(iocDescriptions["iocfile"]) - (shortname, extension) = os.path.splitext(filename) + return PyMISP(url, key, False, 'json') + + +def check_valid_ioc(): + + (filepath, filename) = os.path.split(iocDescriptions["iocfile"]) + (shortname, extension) = os.path.splitext(filename) + + if (("ioc" in extension)) and (sum(1 for _ in open(iocDescriptions["iocfile"])) > 1): + iocDescriptions['filename'] = filename + return True + return False - if (("ioc" in extension)) and (sum(1 for _ in open(iocDescriptions["iocfile"])) > 1): - iocDescriptions['filename'] = filename - return True - return False def get_parse_ioc_file(): + return BeautifulSoup(open(iocDescriptions["iocfile"]), "lxml") - return BeautifulSoup(open(iocDescriptions["iocfile"]) , "lxml") def parse_ioc_search_content(iocContextSearch): - for k,v in iocMispMapping.items(): - if str(k).lower() == str(iocContextSearch).lower(): - return v - return False - -def create_attribute_json(iocContextSearch, attributeValue, attributeComment,force=False): - ##################################### - # force used for description to upload - if force: - parseResult=("Other","comment") - else: - parseResult = parse_ioc_search_content(iocContextSearch) - - if parseResult is False: - - print "/!\ Not implemented :: {0} :: {1} :: Item add as 'Other','Comment'. Add it in your keys.py".format(iocContextSearch,attributeValue) - ######################################## - # force import to misp - parseResult=("Other","comment") - - comment = "" - try : - comment= parseResult[2]+attributeComment - except: - comment= attributeComment + for k, v in iocMispMapping.items(): + if str(k).lower() == str(iocContextSearch).lower(): + return v + return False + + +def create_attribute_json(iocContextSearch, attributeValue, attributeComment, force=False): + ##################################### + # force used for description to upload + if force: + parseResult = ("Other", "comment") + else: + parseResult = parse_ioc_search_content(iocContextSearch) + + if parseResult is False: + + print("/!\ Not implemented :: {0} :: {1} :: Item add as 'Other','Comment'. Add it in your keys.py".format(iocContextSearch, attributeValue)) + ######################################## + # force import to misp + parseResult = ("Other", "comment") + + comment = "" + try: + comment = parseResult[2] + attributeComment + except: + comment = attributeComment + + attribute = {"category": parseResult[0], + "type": parseResult[1], + "value": attributeValue, + "timestamp": "0", + "to_ids": "0", + "distribution": "0", + "comment": comment + } + return attribute - attribute = { - "category": parseResult[0], - "type": parseResult[1], - "value": attributeValue, - "timestamp": "0", - "to_ids": "0", - "distribution": "0", - "comment": comment, - } - return attribute def create_attributes_from_ioc_json(soup): - attributes = [] + attributes = [] - IndicatorItemValues = {} - for item in soup.find_all("indicatoritem"): + IndicatorItemValues = {} + for item in soup.find_all("indicatoritem"): - if item.find('context'): - IndicatorItemValues["context"]=str(item.find('context')['search']) - else: - IndicatorItemValues["context"]="" - if item.find('content'): - IndicatorItemValues["content"]=str(item.find('content').text) - else: - IndicatorItemValues["content"]="" - if item.find('comment'): - IndicatorItemValues["comment"]=str(item.find('comment').text) - else: - IndicatorItemValues["comment"]="" - + if item.find('context'): + IndicatorItemValues["context"] = str(item.find('context')['search']) + else: + IndicatorItemValues["context"] = "" + if item.find('content'): + IndicatorItemValues["content"] = str(item.find('content').text) + else: + IndicatorItemValues["content"] = "" + if item.find('comment'): + IndicatorItemValues["comment"] = str(item.find('comment').text) + else: + IndicatorItemValues["comment"] = "" - jsonAttribute = create_attribute_json(IndicatorItemValues["context"],IndicatorItemValues["content"],IndicatorItemValues["comment"]) - attributes.append(jsonAttribute) + jsonAttribute = create_attribute_json(IndicatorItemValues["context"], IndicatorItemValues["content"], IndicatorItemValues["comment"]) + attributes.append(jsonAttribute) - return attributes + return attributes def create_misp_event_json(attributes): - import time - if iocDescriptions["authored_by"]: - attributes.append( - create_attribute_json(None,"authored_by",iocDescriptions["authored_by"],True) - ) - if iocDescriptions["authored_date"]: - attributes.append( - create_attribute_json(None,"authored_date",iocDescriptions["authored_date"],True) - ) - - ################################################## - # make short-description in "info field - # if not exist make description - # if "info"="short-description" make descrption as comment - mispInfoFild = "" - if iocDescriptions["short_description"]: - mispInfoFild = iocDescriptions["short_description"] - if iocDescriptions["description"]: - attributes.append( - create_attribute_json(None,"description",iocDescriptions["description"],True) - ) - else: - if iocDescriptions["description"]: - mispInfoFild = iocDescriptions["description"] - else: - mispInfoFild = "No description or short_description from IOC find." - - eventJson = { - "Event": { - "info": mispInfoFild, - "timestamp": "1", - "attribute_count": 0, - "analysis": "0", - "date": time.strftime("%Y-%m-%d"), - "org": "", - "distribution": "0", - "Attribute": [], - "proposal_email_lock": False, - "threat_level_id": "4", - } - } + import time + if iocDescriptions["authored_by"]: + attributes.append(create_attribute_json(None, "authored_by", iocDescriptions["authored_by"], True)) + if iocDescriptions["authored_date"]: + attributes.append(create_attribute_json(None, "authored_date", iocDescriptions["authored_date"], True)) - eventJson["Event"]["Attribute"] = attributes + ################################################## + # make short-description in "info field + # if not exist make description + # if "info"="short-description" make descrption as comment + mispInfoFild = "" + if iocDescriptions["short_description"]: + mispInfoFild = iocDescriptions["short_description"] + if iocDescriptions["description"]: + attributes.append(create_attribute_json(None, "description", iocDescriptions["description"], True)) + else: + if iocDescriptions["description"]: + mispInfoFild = iocDescriptions["description"] + else: + mispInfoFild = "No description or short_description from IOC find." - return eventJson + eventJson = {"Event": {"info": mispInfoFild, + "timestamp": "1", + "attribute_count": 0, + "analysis": "0", + "date": time.strftime("%Y-%m-%d"), + "org": "", + "distribution": "0", + "Attribute": [], + "proposal_email_lock": False, + "threat_level_id": "4", + }} + + eventJson["Event"]["Attribute"] = attributes + + return eventJson def get_descriptions(soup, description): - if soup.find(description.lower()): - return soup.find(description.lower()).text - return "" - + if soup.find(description.lower()): + return soup.find(description.lower()).text + return "" + + def save_ioc_description(soup): - list_description = ["short_description","authored_by","authored_date","description"] - - for description in list_description: - iocDescriptions[description]=get_descriptions(soup, description) + list_description = ["short_description", "authored_by", "authored_date", "description"] - return + for description in list_description: + iocDescriptions[description] = get_descriptions(soup, description) + return def get_taxonomy(soup): - import csv - taxonomy = [] - reader = csv.reader(open(csvTaxonomyFile, 'rb'), delimiter=';') - ##################################### - # save file in a dict - # r[0] = @link from csv - # r[1] = @value from csv - # = value - # r[2] = @keep - # 0 : don't creat tag - # 1 : tag created - # r[3] = @taxonomy + import csv + taxonomy = [] + reader = csv.reader(open(csvTaxonomyFile, 'rb'), delimiter=';') + ##################################### + # save file in a dict + # r[0] = @link from csv + # r[1] = @value from csv + # = value + # r[2] = @keep + # 0 : don't creat tag + # 1 : tag created + # r[3] = @taxonomy - csvdic = {i:r for i,r in enumerate(reader)} + csvdic = {i: r for i, r in enumerate(reader)} + + ######################################### + # find all link with soup + for n in soup.find_all('link', rel=True): + rel = str(n.attrs['rel'][0]).lower() + + ########################## + # build special taxo + # special string because link if a html value + relValue = str(n.next_sibling).strip() + if rel == 'family': + if len(relValue) > 0: + taxonomy.append("malware_classification:malware-family='" + relValue + "'") + elif rel == 'threatgroup': + if len(relValue) > 0: + taxonomy.append("malware_classification:malware-threatgroup='" + relValue + "'") + + ######################### + # build taxo from csv match + else: + taxo = [r[3] for r in {i: r for i, r in csvdic.items() if r[0].lower() == rel and str(r[2]) == "1"}.values() if r[1].lower() == relValue.lower() and str(r[2]) == "1"] + + # taxo find in correspondance file + if (len(taxo) > 0 and taxo[0] != ''): + taxonomy.append(taxo[0]) + # not find + return taxonomy - ######################################### - # find all link with soup - for n in soup.find_all('link', rel=True): - rel = str(n.attrs['rel'][0]).lower() - - ########################## - # build special taxo - # special string because link if a html value - relValue = str(n.next_sibling).strip() - if rel == 'family': - if len(relValue)>0: - taxonomy.append("malware_classification:malware-family='"+relValue+"'") - elif rel == 'threatgroup': - if len(relValue)>0: - taxonomy.append("malware_classification:malware-threatgroup='"+relValue+"'") - - ######################### - # build taxo from csv match - else: - taxo = [r[3] for r in - {i:r for i,r in csvdic.items() - if r[0].lower() == rel and str(r[2])=="1" - }.values() - if r[1].lower() == relValue.lower() and str(r[2])=="1" - ] - - # taxo find in correspondance file - if (len(taxo) > 0 and taxo[0] != '') : - taxonomy.append(taxo[0]) - # not find - return taxonomy def custum_color_tag(tagg): - color="#00ace6" - if ":amber" in tagg :color="#ffc200" - if ":green:" in tagg :color="#009933" - if "tlp:green" in tagg :color="#009933" - if ":red:" in tagg :color="#ff0000" - if "tlp:red" in tagg :color="#ff0000" - if "tlp:white" in tagg :color="#fafafa" - return color + color = "#00ace6" + if ":amber" in tagg: + color = "#ffc200" + if ":green:" in tagg: + color = "#009933" + if "tlp:green" in tagg: + color = "#009933" + if ":red:" in tagg: + color = "#ff0000" + if "tlp:red" in tagg: + color = "#ff0000" + if "tlp:white" in tagg: + color = "#fafafa" + return color + def push_event_to_misp(jsonEvent): - global misp + global misp - #################### - # upload json event - r = misp.add_event(jsonEvent) - event=r.json() - - # save event id for file upload and tagg - iocDescriptions["misp_event_id"]=event["Event"]["id"] + #################### + # upload json event + r = misp.add_event(jsonEvent) + event = r.json() + + # save event id for file upload and tagg + iocDescriptions["misp_event_id"] = event["Event"]["id"] + + return - return def upload_file(): - # filename,path, eid, distrib, ids, categ, info, ids, analysis, threat - misp.upload_sample( - iocDescriptions['filename'], - iocDescriptions["iocfile"], - iocDescriptions["misp_event_id"], - "0", - False, - "External analysis", - iocDescriptions["short_description"], - None, - "1", - "4", - ) - - return + # filename,path, eid, distrib, ids, categ, info, ids, analysis, threat + misp.upload_sample(iocDescriptions['filename'], + iocDescriptions["iocfile"], + iocDescriptions["misp_event_id"], + "0", + False, + "External analysis", + iocDescriptions["short_description"], + None, + "1", + "4", + ) + return + def update_tag(listOfTagg): - for tagg in listOfTagg: - color = custum_color_tag(tagg) + for tagg in listOfTagg: + color = custum_color_tag(tagg) - ############################# - # creatz tag in MISP + ############################# + # creatz tag in MISP - r = misp.new_tag(str(tagg), str(color)) - ############################# - # link tag to MISP event - toPost={} - toPost['Event']={'id':iocDescriptions["misp_event_id"]} - misp.add_tag( - toPost, - str(tagg)) - return + misp.new_tag(str(tagg), str(color)) + ############################# + # link tag to MISP event + toPost = {} + toPost['Event'] = {'id': iocDescriptions["misp_event_id"]} + misp.add_tag(toPost, str(tagg)) + return def main(): - global misp - global iocDescriptions - iocDescriptions = {} - - - ################################ - # parse for valid argments - parser = argparse.ArgumentParser(description='Get an event from a MISP instance.') - parser.add_argument("-i", "--input", required=True, help="Input file") - parser.add_argument("-t", "--tag", help="Add custom tags 'tlp:red,cossi:tmp=test'") - args = parser.parse_args() + global misp + global iocDescriptions + iocDescriptions = {} - iocDescriptions["iocfile"]=os.path.abspath(args.input) - - ################################ - # check if file have ioc extention and if he is not empty - if check_valid_ioc(): - - ################################ - # Try to parse file - iocfileparse = get_parse_ioc_file() - else : - print "/!\ Bad format {0}".format(iocDescriptions["iocfile"]) - return + ################################ + # parse for valid argments + parser = argparse.ArgumentParser(description='Get an event from a MISP instance.') + parser.add_argument("-i", "--input", required=True, help="Input file") + parser.add_argument("-t", "--tag", help="Add custom tags 'tlp:red,cossi:tmp=test'") + args = parser.parse_args() - ################################ - # save description for create event - save_ioc_description(iocfileparse) - - ################################ - # parse ioc and buid json attributes - jsonAttributes = create_attributes_from_ioc_json(iocfileparse) + iocDescriptions["iocfile"] = os.path.abspath(args.input) - ################################ - # create a json misp event and append attributes - jsonEvent = create_misp_event_json(jsonAttributes) - - - ################################ - # try connection - try: - misp = misp_init(mispUrl, mispKey) - except: - print "/!\ Connection fail, bad url ({0}) or API key : {1}".format(mispUrl,mispKey) - return - - ################################ - # Add event to MSIP - push_event_to_misp(jsonEvent) + ################################ + # check if file have ioc extention and if he is not empty + if check_valid_ioc(): - - ################################ - # Upload the IOC file and close tmpfile - upload_file() + ################################ + # Try to parse file + iocfileparse = get_parse_ioc_file() + else: + print("/!\ Bad format {0}".format(iocDescriptions["iocfile"])) + return - ################################ - # Update MISP Event with tag from IOC - update_tag(get_taxonomy(iocfileparse)) - - ################################ - # Add custom Tag (-t) - if args.tag : - customTag = args.tag - update_tag(customTag.split(",")) + ################################ + # save description for create event + save_ioc_description(iocfileparse) + ################################ + # parse ioc and buid json attributes + jsonAttributes = create_attributes_from_ioc_json(iocfileparse) + + ################################ + # create a json misp event and append attributes + jsonEvent = create_misp_event_json(jsonAttributes) + + ################################ + # try connection + try: + misp = misp_init(mispUrl, mispKey) + except: + print("/!\ Connection fail, bad url ({0}) or API key : {1}".format(mispUrl, mispKey)) + return + + ################################ + # Add event to MSIP + push_event_to_misp(jsonEvent) + + ################################ + # Upload the IOC file and close tmpfile + upload_file() + + ################################ + # Update MISP Event with tag from IOC + update_tag(get_taxonomy(iocfileparse)) + + ################################ + # Add custom Tag (-t) + if args.tag: + customTag = args.tag + update_tag(customTag.split(",")) if __name__ == '__main__': - main() + main()