From 20fb5cfa2f715082dca4b4f2e6fc62484e13ffb7 Mon Sep 17 00:00:00 2001 From: ag-michael Date: Wed, 22 Jul 2020 06:49:37 +0000 Subject: [PATCH] ** README.md - LONGTEXT causes sqlalchemy issues, LONGBLOB works great! changed README accordingly. - Added documentation on how to configure gunicorn to run the production opentaxii server ** misp_taxii_hooks/hooks.py - Add auto_publish feature - Add exception handling - For MISP.search() `(controller="attributes", value=str(attrib))` was not working at all, changed it to more specific `(controller="attributes", value=str(attrib))` - Set the default `distribution` to 0 (Your Organization Only), inadvertent sharing of TLP-RED intel could be very bad! This should be configurable in the future - systemd/journald was taking up lots of ram thanks to gunicorn's excessive logging. I set some of the very noisy `log.info` to `log.debug` ** scripts/run-taxii-poll.py - Added rate limiting feature - Continue polling when individual blocks fail to push to the local server ** config/config.default.yaml - Added auto_publish setting ** scripts/install-remote-server.sh - Added rate_limit related config lines --- README.md | 4 +- config/config.default.yaml | 1 + misp_taxii_hooks/hooks.py | 80 ++++++++++++++++++++------------ scripts/install-remote-server.sh | 2 + scripts/run-taxii-poll.py | 23 ++++++++- 5 files changed, 77 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 7acacf5..cc87691 100644 --- a/README.md +++ b/README.md @@ -141,8 +141,8 @@ mysql -u [database user] -p # Enter Database password mysql> use taxiipersist; -mysql> alter table `inbox_messages` modify `original_message` LONGTEXT; -mysql> alter table `content_blocks` modify `content` LONGTEXT; +mysql> alter table `inbox_messages` modify `original_message` LONGBLOB; +mysql> alter table `content_blocks` modify `content` LONGBLOB; mysql> exit; ``` diff --git a/config/config.default.yaml b/config/config.default.yaml index d7f3469..e222511 100644 --- a/config/config.default.yaml +++ b/config/config.default.yaml @@ -1,6 +1,7 @@ --- domain: "localhost:9000" support_basic_auth: yes +auto_publish: no persistence_api: class: opentaxii.persistence.sqldb.SQLDatabaseAPI diff --git a/misp_taxii_hooks/hooks.py b/misp_taxii_hooks/hooks.py index 1fe0356..cc33674 100644 --- a/misp_taxii_hooks/hooks.py +++ b/misp_taxii_hooks/hooks.py @@ -10,7 +10,9 @@ import tempfile import logging from pyaml import yaml from io import StringIO - +from lxml.etree import XMLSyntaxError +import time +import json log = logging.getLogger("__main__") from opentaxii.signals import ( @@ -47,40 +49,60 @@ MISP = pymisp.PyMISP( ssl = CONFIG["misp"].get("verifySSL", True) ) +if "auto_publish" in CONFIG: + AUTO_PUBLISH = CONFIG["auto_publish"] + def post_stix(manager, content_block, collection_ids, service_id): ''' Callback function for when our taxii server gets new data Will convert it to a MISPEvent and push to the server ''' + # Handle exceptions + try: + # Load the package + log.info("Posting STIX...") + block = content_block.content + if isinstance(block, bytes): + block = block.decode() - # Load the package - log.info("Posting STIX...") - block = content_block.content - if isinstance(block, bytes): - block = block.decode() - - package = pymisp.tools.stix.load_stix(StringIO(block)) - log.info("STIX loaded succesfully.") - values = [x.value for x in package.attributes] - log.info("Extracted %s", values) - for attrib in values: - log.info("Checking for existence of %s", attrib) - search = MISP.search(controller="attributes", value=str(attrib)) - if search["Attribute"] != []: - # This means we have it! - log.info("%s is a duplicate, we'll ignore it.", attrib) - package.attributes.pop([x.value for x in package.attributes].index(attrib)) + package = pymisp.tools.stix.load_stix(StringIO(block),distribution=0) + log.info("STIX loaded succesfully.") + + # Auto-publish the event,if configured to do so. + if AUTO_PUBLISH: + package.publish() + + values = [x.value for x in package.attributes] + log.info("Extracted %s", values) + for attrib in values: + try: + log.debug("Checking for existence of %s", attrib) + search = MISP.search(controller="attributes", value=str(attrib)) + if search["Attribute"] != []: + # This means we have it! + log.debug("%s is a duplicate, we'll ignore it.", attrib) + package.attributes.pop([x.value for x in package.attributes].index(attrib)) + else: + log.info("%s is unique, we'll keep it", attrib) + except Exception: + log.exception("Attribute lookup error:%s",attrib) + continue + # Push the event to MISP + # TODO: There's probably a proper method to do this rather than json_full + # But I don't wanna read docs + if (len(package.attributes) > 0): + log.info("Uploading event to MISP with attributes %s", [x.value for x in package.attributes]) + MISP.add_event(package) else: - log.info("%s is unique, we'll keep it", attrib) - - # Push the event to MISP - # TODO: There's probably a proper method to do this rather than json_full - # But I don't wanna read docs - if (len(package.attributes) > 0): - log.info("Uploading event to MISP with attributes %s", [x.value for x in package.attributes]) - MISP.add_event(package) - else: - log.info("No attributes, not bothering.") - + log.debug("No attributes %s, not bothering.",str(package.attributes)) + except json.decoder.JSONDecodeError as e: + log.exception("Json Decoder Error") + log.info("Content: %s",str(block)) + except XMLSyntaxError as e: + log.exception("lxml.etree.XMLSyntaxError") + log.info("Content: %s",str(block)) + except Exception as e: + log.exception("Exception: %s",str(e)) # Make TAXII call our push function whenever it gets new data CONTENT_BLOCK_CREATED.connect(post_stix) + diff --git a/scripts/install-remote-server.sh b/scripts/install-remote-server.sh index 5568430..27a257e 100644 --- a/scripts/install-remote-server.sh +++ b/scripts/install-remote-server.sh @@ -37,6 +37,8 @@ cat >> $CONFIGDIR/remote-servers.yml << EOF discovery_path: use_https: False taxii_version: '1.1' + rate_limit: 0 + rate_limit_threshold: 0 headers: auth: username: diff --git a/scripts/run-taxii-poll.py b/scripts/run-taxii-poll.py index f7e9a21..16bcebe 100644 --- a/scripts/run-taxii-poll.py +++ b/scripts/run-taxii-poll.py @@ -7,6 +7,7 @@ import argparse import os import logging import sys +import time from datetime import datetime # Create an argument parser for our program @@ -72,7 +73,6 @@ local_client = create_client(host=local_config["host"], local_client.username = local_config["auth"]["username"] local_client.password = local_config["auth"]["password"] - local_inbox = "{}://{}:{}{}".format( "https" if local_config["use_https"] else "http", local_config["host"], local_config["port"], @@ -146,10 +146,17 @@ for server in config: log.debug("Discovering services...") services = cli.discover_services() log.debug(services) + rate_limit = 0 + if "rate_limit" in server: + rate_limit = server["rate_limit"] + + rate_limit_threshold = 5 + if "rate_limit_threshold" in server: + rate_limit_threshold = server["rate_limit_threshold"] log.debug("Auth set.") for collection in server["collections"]: - log.debug("Polling %s", collection) + log.debug("Polling: %s ; Collection: /%s", server["host"],collection) server_uri_override = server.get("uri", "") if not server_uri_override.startswith("http"): server_uri_override = None @@ -158,12 +165,21 @@ for server in config: log.debug("Within date range %s - %s", poll_from or "Beginning of time", poll_to) + poll_count =0 try: + log.debug("Polling %s , URI:%s",collection,server.get("uri",None)) for content_block in cli.poll(collection_name=collection, subscription_id=subscription_id, begin_date=poll_from, end_date=poll_to, uri=server.get("uri", None)): + time.sleep(0.5) #Prevent hammering the server + poll_count += 1 + if poll_count >= rate_limit_threshold: + log.info("Rate limiting,sleeping for %s after %s polls",str(rate_limit),str(rate_limit_threshold)) + time.sleep(rate_limit) + poll_count = 0 + try: log.debug("Pushing block %s", content_block) local_client.push( @@ -175,6 +191,9 @@ for server in config: log.error("FAILED TO PUSH BLOCK!") log.error("%s", content_block) log.exception(ex, exc_info=True) + # Continue polling when individual blocks fail to push to the local server + continue + except Exception as ex: log.error("FAILED TO POLL %s", collection) log.exception(ex, exc_info=True)