** README.md

- LONGTEXT causes sqlalchemy issues, LONGBLOB works great! changed README accordingly.
- Added documentation on how to configure gunicorn to run the production opentaxii server

** misp_taxii_hooks/hooks.py

- Add auto_publish  feature
- Add exception handling
- For MISP.search() `(controller="attributes", value=str(attrib))` was not working at all, changed it to more specific `(controller="attributes", value=str(attrib))`
- Set the default `distribution` to 0 (Your Organization Only), inadvertent sharing of TLP-RED intel could be very bad! This should be configurable in the future
- systemd/journald was taking up lots of ram thanks to gunicorn's excessive logging. I set some of the very noisy `log.info` to `log.debug`

** scripts/run-taxii-poll.py

- Added rate limiting feature
- Continue polling when individual blocks fail to push to the local server

** config/config.default.yaml

- Added auto_publish setting

** scripts/install-remote-server.sh

- Added rate_limit related config lines
pull/88/head
ag-michael 2020-07-22 06:49:37 +00:00
parent c0ebb38759
commit 20fb5cfa2f
5 changed files with 77 additions and 33 deletions

View File

@ -141,8 +141,8 @@ mysql -u [database user] -p
# Enter Database password
mysql> use taxiipersist;
mysql> alter table `inbox_messages` modify `original_message` LONGTEXT;
mysql> alter table `content_blocks` modify `content` LONGTEXT;
mysql> alter table `inbox_messages` modify `original_message` LONGBLOB;
mysql> alter table `content_blocks` modify `content` LONGBLOB;
mysql> exit;
```

View File

@ -1,6 +1,7 @@
---
domain: "localhost:9000"
support_basic_auth: yes
auto_publish: no
persistence_api:
class: opentaxii.persistence.sqldb.SQLDatabaseAPI

View File

@ -10,7 +10,9 @@ import tempfile
import logging
from pyaml import yaml
from io import StringIO
from lxml.etree import XMLSyntaxError
import time
import json
log = logging.getLogger("__main__")
from opentaxii.signals import (
@ -47,40 +49,60 @@ MISP = pymisp.PyMISP(
ssl = CONFIG["misp"].get("verifySSL", True)
)
if "auto_publish" in CONFIG:
AUTO_PUBLISH = CONFIG["auto_publish"]
def post_stix(manager, content_block, collection_ids, service_id):
'''
Callback function for when our taxii server gets new data
Will convert it to a MISPEvent and push to the server
'''
# Handle exceptions
try:
# Load the package
log.info("Posting STIX...")
block = content_block.content
if isinstance(block, bytes):
block = block.decode()
# Load the package
log.info("Posting STIX...")
block = content_block.content
if isinstance(block, bytes):
block = block.decode()
package = pymisp.tools.stix.load_stix(StringIO(block))
log.info("STIX loaded succesfully.")
values = [x.value for x in package.attributes]
log.info("Extracted %s", values)
for attrib in values:
log.info("Checking for existence of %s", attrib)
search = MISP.search(controller="attributes", value=str(attrib))
if search["Attribute"] != []:
# This means we have it!
log.info("%s is a duplicate, we'll ignore it.", attrib)
package.attributes.pop([x.value for x in package.attributes].index(attrib))
package = pymisp.tools.stix.load_stix(StringIO(block),distribution=0)
log.info("STIX loaded succesfully.")
# Auto-publish the event,if configured to do so.
if AUTO_PUBLISH:
package.publish()
values = [x.value for x in package.attributes]
log.info("Extracted %s", values)
for attrib in values:
try:
log.debug("Checking for existence of %s", attrib)
search = MISP.search(controller="attributes", value=str(attrib))
if search["Attribute"] != []:
# This means we have it!
log.debug("%s is a duplicate, we'll ignore it.", attrib)
package.attributes.pop([x.value for x in package.attributes].index(attrib))
else:
log.info("%s is unique, we'll keep it", attrib)
except Exception:
log.exception("Attribute lookup error:%s",attrib)
continue
# Push the event to MISP
# TODO: There's probably a proper method to do this rather than json_full
# But I don't wanna read docs
if (len(package.attributes) > 0):
log.info("Uploading event to MISP with attributes %s", [x.value for x in package.attributes])
MISP.add_event(package)
else:
log.info("%s is unique, we'll keep it", attrib)
# Push the event to MISP
# TODO: There's probably a proper method to do this rather than json_full
# But I don't wanna read docs
if (len(package.attributes) > 0):
log.info("Uploading event to MISP with attributes %s", [x.value for x in package.attributes])
MISP.add_event(package)
else:
log.info("No attributes, not bothering.")
log.debug("No attributes %s, not bothering.",str(package.attributes))
except json.decoder.JSONDecodeError as e:
log.exception("Json Decoder Error")
log.info("Content: %s",str(block))
except XMLSyntaxError as e:
log.exception("lxml.etree.XMLSyntaxError")
log.info("Content: %s",str(block))
except Exception as e:
log.exception("Exception: %s",str(e))
# Make TAXII call our push function whenever it gets new data
CONTENT_BLOCK_CREATED.connect(post_stix)

View File

@ -37,6 +37,8 @@ cat >> $CONFIGDIR/remote-servers.yml << EOF
discovery_path:
use_https: False
taxii_version: '1.1'
rate_limit: 0
rate_limit_threshold: 0
headers:
auth:
username:

View File

@ -7,6 +7,7 @@ import argparse
import os
import logging
import sys
import time
from datetime import datetime
# Create an argument parser for our program
@ -72,7 +73,6 @@ local_client = create_client(host=local_config["host"],
local_client.username = local_config["auth"]["username"]
local_client.password = local_config["auth"]["password"]
local_inbox = "{}://{}:{}{}".format(
"https" if local_config["use_https"] else "http",
local_config["host"], local_config["port"],
@ -146,10 +146,17 @@ for server in config:
log.debug("Discovering services...")
services = cli.discover_services()
log.debug(services)
rate_limit = 0
if "rate_limit" in server:
rate_limit = server["rate_limit"]
rate_limit_threshold = 5
if "rate_limit_threshold" in server:
rate_limit_threshold = server["rate_limit_threshold"]
log.debug("Auth set.")
for collection in server["collections"]:
log.debug("Polling %s", collection)
log.debug("Polling: %s ; Collection: /%s", server["host"],collection)
server_uri_override = server.get("uri", "")
if not server_uri_override.startswith("http"):
server_uri_override = None
@ -158,12 +165,21 @@ for server in config:
log.debug("Within date range %s - %s",
poll_from or "Beginning of time", poll_to)
poll_count =0
try:
log.debug("Polling %s , URI:%s",collection,server.get("uri",None))
for content_block in cli.poll(collection_name=collection,
subscription_id=subscription_id,
begin_date=poll_from,
end_date=poll_to,
uri=server.get("uri", None)):
time.sleep(0.5) #Prevent hammering the server
poll_count += 1
if poll_count >= rate_limit_threshold:
log.info("Rate limiting,sleeping for %s after %s polls",str(rate_limit),str(rate_limit_threshold))
time.sleep(rate_limit)
poll_count = 0
try:
log.debug("Pushing block %s", content_block)
local_client.push(
@ -175,6 +191,9 @@ for server in config:
log.error("FAILED TO PUSH BLOCK!")
log.error("%s", content_block)
log.exception(ex, exc_info=True)
# Continue polling when individual blocks fail to push to the local server
continue
except Exception as ex:
log.error("FAILED TO POLL %s", collection)
log.exception(ex, exc_info=True)