Version 0.2 of the cof2misp import module.

pull/491/head
aaronkaplan 2021-05-02 16:45:55 +00:00
parent 509e5ac979
commit f1da1dd6fa
5 changed files with 325 additions and 1 deletions

View File

@ -59,6 +59,7 @@ isodate==0.6.0
jbxapi==3.14.0 jbxapi==3.14.0
json-log-formatter==0.3.0 json-log-formatter==0.3.0
jsonschema==3.2.0 jsonschema==3.2.0
ndjson==0.3.1
lark-parser==0.11.1 lark-parser==0.11.1
lief==0.11.0 lief==0.11.0
lxml==4.6.2 lxml==4.6.2

View File

@ -1,3 +1,3 @@
from .vt_graph_parser import * # noqa from .vt_graph_parser import * # noqa
all = ['joe_parser', 'lastline_api'] all = ['joe_parser', 'lastline_api', 'cof2misp']

View File

@ -0,0 +1,103 @@
"""
Common Output Format for passive DNS library.
"""
import ipaddress
import sys
import ndjson
def is_valid_ip(ip: str) -> bool:
"""Check if an IP address given as string would be convertible to
an ipaddress object (and thus if it is a valid IP).
Returns
--------
True on success, False on validation failure.
"""
try:
ipaddress.ip_address(ip)
except Exception as ex:
print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file=sys.stderr)
return False
return True
def is_cof_valid_simple(d: dict) -> bool:
"""Check MANDATORY fields according to COF - simple check, do not do the full JSON schema validation.
Returns
--------
True on success, False on validation failure.
"""
if "rrname" not in d:
print("Missing MANDATORY field 'rrname'", file=sys.stderr)
return False
if not isinstance(d['rrname'], str):
print("Type error: 'rrname' is not a JSON string", file=sys.stderr)
return False
if "rrtype" not in d:
print("Missing MANDATORY field 'rrtype'", file=sys.stderr)
return False
if not isinstance(d['rrtype'], str):
print("Type error: 'rrtype' is not a JSON string", file=sys.stderr)
return False
if "rdata" not in d:
print("Missing MANDATORY field 'rdata'", file=sys.stderr)
return False
if "rdata" not in d:
print("Missing MANDATORY field 'rdata'", file=sys.stderr)
return False
if not isinstance(d['rdata'], str) and not isinstance(d['rdata'], list):
print("'rdata' is not a list and not a string.", file=sys.stderr)
return False
if not ("time_first" in d and "time_last" in d) or \
("zone_time_first" in d and "zone_time_last" in d):
print("We are missing EITHER ('first_seen' and 'last_seen') OR " \
"('zone_time_first' and zone_time_last') fields", file=sys.stderr)
return False
# currently we don't check the OPTIONAL fields. Sorry... to be done later.
return True
def validate_cof(d: dict, strict=True) -> bool:
"""Validate an input passive DNS COF (given as dict).
strict might be set to False in order to loosen the checking.
With strict==True, a full JSON Schema validation will happen.
Returns
--------
True on success, False on validation failure.
"""
if not strict:
return is_cof_valid_simple(d)
if __name__ == "__main__":
# simple, poor man's unit tests.
print(80*"=", file=sys.stderr)
print("Unit Tests:", file=sys.stderr)
assert not is_valid_ip("a.2.3.4")
assert is_valid_ip("99.88.77.6")
assert is_valid_ip("2a0c:88:77:6::1")
# COF validation
mock_input = """{"count":1909,"rdata":["cpa.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1315586409","time_last":"1449566799"}
{"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}"""
i = 0
for l in ndjson.loads(mock_input):
retval = validate_cof(l, strict=False)
assert retval
print("line %d is valid: %s" %(i, retval))
i += 1
print(80*"=", file=sys.stderr)
print("Unit Tests DONE", file=sys.stderr)

View File

@ -13,5 +13,6 @@ __all__ = [
'openiocimport', 'openiocimport',
'threatanalyzer_import', 'threatanalyzer_import',
'csvimport', 'csvimport',
'cof2misp',
'joe_import', 'joe_import',
] ]

View File

@ -0,0 +1,219 @@
""" PassiveDNS Common Output Format (COF) MISP importer.
Takes as input a valid COF file or the output of the dnsdbflex utility
and creates MISP objects for the input.
Author: Aaron Kaplan
License: see LICENSE
"""
import json
import base64
import pprint
import ndjson
from pymisp import MISPObject, MISPEvent, PyMISP
from cof2misp.cof import is_valid_ip, validate_cof
misperrors = {'error': 'Error'}
userConfig = {}
inputSource = ['file']
mispattributes = {'inputSource': ['file'], 'output': ['MISP objects'],
'format': 'misp_standard'}
moduleinfo = {'version': '0.2', 'author': 'Aaron Kaplan',
'description': 'Module to import the passive DNS Common Output Format (COF) and merge as a MISP objet into a MISP event.',
'module-type': ['import']}
moduleconfig = []
# misp = PyMISP()
def parse_and_insert_cof(data: str) -> dict:
"""Parse and validate the COF data.
Parameters
----------
data as a string
Returns
-------
A dict with either the error message or the data which may be sent off the the caller of handler()
Raises
--------
none. All Exceptions will be handled here. On error, a misperror is returned.
"""
objects = []
try:
entries = ndjson.loads(data)
# pprint.pprint(entries)
for l in entries: # iterate over all ndjson lines
# validate here (simple validation or full JSON Schema validation)
# FIXME
# Next, extract some fields
rrtype = l['rrtype'].upper()
rrname = l['rrname'].rstrip('.')
rdata = [x.rstrip('.') for x in l['rdata']]
# create a new MISP object, based on the passive-dns object for each nd-JSON line
o = MISPObject(name='passive-dns', standalone=False, comment='created by cof2misp')
# o.add_tag('tlp:amber') # FIXME: we'll want to add a tlp: tag to the object
o.add_attribute('bailiwick', value=l['bailiwick'].rstrip('.'))
#
# handle the combinations of rrtype (domain, ip) on both left and right side
#
if rrtype in ['A', 'AAAA', 'A6']: # address type
# address type
o.add_attribute('rrname_domain', value=rrname)
for r in rdata:
o.add_attribute('rdata_ip', value=r)
elif rrtype in ['CNAME', 'DNAME', 'NS']: # both sides are domains
o.add_attribute('rrname_domain', value=rrname)
for r in rdata:
o.add_attribute('rdata_domain', value=r)
elif rrtype in ['SOA']: # left side is a domain, right side is text
o.add_attribute('rrname_domain', value=rrname)
#
# now do the regular filling up of rrname, rrtype, time_first, etc.
#
o.add_attribute('rrname', value=rrname)
o.add_attribute('rrtype', value=rrtype)
for r in rdata:
o.add_attribute('rdata', value=r)
o.add_attribute('raw_rdata', value=json.dumps(rdata)) # FIXME: do we need to hex encode it?
o.add_attribute('time_first', value=l['time_first'])
o.add_attribute('time_last', value=l['time_last'])
o.first_seen = l['time_first'] # is this redundant?
o.last_seen = l['time_last']
#
# Now add the other optional values. # FIXME: how about a map() other function. DNRY
#
for k in ['count', 'sensor_id', 'origin', 'text', 'time_first_ms', 'time_last_ms', 'zone_time_first', 'zone_time_last']:
if k in l and l[k]:
o.add_attribute(k, value=l[k])
#
# add COF entry to MISP object
#
objects.append(o.to_json())
r = {'results': {'Object': [json.loads(o) for o in objects]}}
except Exception as ex:
misperrors["error"] = "An error occured during parsing of input: '%s'" % (str(ex),)
return misperrors
return r
def parse_and_insert_dnsdbflex(data: str):
"""Parse and validate the more simplier dndsdbflex output data.
Parameters
----------
data as a string
Returns
-------
A dict with either the error message or the data which may be sent off the the caller of handler()
Raises
--------
none
"""
pass # XXX FIXME: need a MISP object for dnsdbflex
def is_dnsdbflex(data: str) -> bool:
"""Check if the supplied data conforms to the dnsdbflex output (which only contains rrname and rrtype)
Parameters
----------
ndjson data as a string
Returns
-------
True or False
Raises
--------
none
"""
try:
j = ndjson.loads(data)
for l in j:
if not set(l.keys()) == { 'rrname' , 'rrtype' }:
return False # shortcut
return True
except Exception as _ex:
return False
def is_cof(data: str) -> bool:
return True
def handler(q=False):
if q is False:
return False
r = {'results': []}
request = json.loads(q)
# Parse the json, determine which type of JSON it is (dnsdbflex or COF?)
# Validate it
# transform into MISP object
# push to MISP
event_id = request['event_id']
# event = misp.get_event(event_id)
pprint.pprint("event_id = %s" % event_id)
try:
data = base64.b64decode(request["data"]).decode('utf-8')
if not data:
return json.dumps({'success': 0}) # empty file is ok
if is_dnsdbflex(data):
return parse_and_insert_dnsdbflex(data)
elif is_cof(data):
# check if it's valid COF format
return parse_and_insert_cof(data)
else:
return {'error': 'Could not find any valid COF input nor dnsdbflex input. Please have a loot at: https://datatracker.ietf.org/doc/draft-dulaunoy-dnsop-passive-dns-cof/'}
except Exception as ex:
print("oops, got exception %s" % str(ex))
return {'error': "Got exception %s" % str(ex) }
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
if __name__ == '__main__':
x = open('test.json', 'r')
r = handler(q=x.read())
print(json.dumps(r))