MISP-Taxii-Server/misp_taxii_hooks/hooks.py

87 lines
2.7 KiB
Python
Raw Normal View History

2016-11-18 10:28:00 +01:00
#!/usr/bin/env python3
######
# TODO: DETECT DUPLICATE DATA
#####
2016-11-23 13:16:30 +01:00
import os
2016-11-18 10:28:00 +01:00
import pymisp
import tempfile
import logging
2016-11-23 13:16:30 +01:00
from pyaml import yaml
from io import StringIO
2016-11-18 10:28:00 +01:00
log = logging.getLogger("__main__")
2016-11-18 10:28:00 +01:00
from opentaxii.signals import (
CONTENT_BLOCK_CREATED, INBOX_MESSAGE_CREATED
)
## CONFIG
2016-11-23 16:31:56 +01:00
if "OPENTAXII_CONFIG" in os.environ:
print("Using config from {}".format(os.environ["OPENTAXII_CONFIG"]))
CONFIG = yaml.load(open(os.environ["OPENTAXII_CONFIG"], "r"))
2016-11-23 13:16:30 +01:00
else:
print("Trying to use env variables...")
if "MISP_URL" in os.environ:
misp_url = os.environ["MISP_URL"]
else:
2016-11-23 16:31:56 +01:00
print("Unkown misp URL. Set OPENTAXII_CONFIG or MISP_URL.")
2016-11-23 13:16:30 +01:00
misp_url = "UNKNOWN"
if "MISP_API" in os.environ:
misp_api = os.environ["MISP_API"]
else:
2016-11-23 16:31:56 +01:00
print("Unknown misp API key. Set OPENTAXII_CONFIG or MISP_API.")
2016-11-23 13:16:30 +01:00
misp_api = "UNKNOWN"
2016-11-18 10:28:00 +01:00
2016-11-23 13:16:30 +01:00
CONFIG = {
2016-11-23 16:31:56 +01:00
"misp" : {
"url" : misp_url,
"api" : misp_api
}
2016-11-23 13:16:30 +01:00
}
MISP = pymisp.PyMISP(
2016-11-23 16:31:56 +01:00
CONFIG["misp"]["url"],
CONFIG["misp"]["api"],
2017-05-22 17:45:06 +02:00
ssl = CONFIG["misp"].get("verifySSL", True)
)
def post_stix(manager, content_block, collection_ids, service_id):
'''
Callback function for when our taxii server gets new data
Will convert it to a MISPEvent and push to the server
'''
# Load the package
log.info("Posting STIX...")
block = content_block.content
if isinstance(block, bytes):
block = block.decode()
package = pymisp.tools.stix.load_stix(StringIO(block))
log.info("STIX loaded succesfully.")
2017-06-14 14:52:29 +02:00
values = [x.value for x in package.attributes]
log.info("Extracted %s", values)
2017-06-14 14:52:29 +02:00
for attrib in values:
log.info("Checking for existence of %s", attrib)
2017-06-16 13:28:10 +02:00
search = MISP.search("attributes", values=str(attrib))
2017-06-14 14:52:29 +02:00
if search["response"] != []:
# This means we have it!
log.info("%s is a duplicate, we'll ignore it.", attrib)
2017-06-14 14:52:29 +02:00
package.attributes.pop([x.value for x in package.attributes].index(attrib))
else:
log.info("%s is unique, we'll keep it", attrib)
# Push the event to MISP
# TODO: There's probably a proper method to do this rather than json_full
# But I don't wanna read docs
if (len(package.attributes) > 0):
log.info("Uploading event to MISP with attributes %s", [x.value for x in package.attributes])
MISP.add_event(package)
else:
log.info("No attributes, not bothering.")
2016-11-18 10:28:00 +01:00
# Make TAXII call our push function whenever it gets new data
2016-11-18 10:28:00 +01:00
CONTENT_BLOCK_CREATED.connect(post_stix)