mirror of https://github.com/MISP/misp-modules
commit
354427d173
|
@ -72,7 +72,8 @@ def is_cof_valid_simple(d: dict) -> bool:
|
||||||
print("'rdata' is not a list and not a string.", file = sys.stderr)
|
print("'rdata' is not a list and not a string.", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d):
|
if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d):
|
||||||
print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields", file=sys.stderr)
|
print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields",
|
||||||
|
file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
# currently we don't check the OPTIONAL fields. Sorry... to be done later.
|
# currently we don't check the OPTIONAL fields. Sorry... to be done later.
|
||||||
return True
|
return True
|
||||||
|
@ -94,6 +95,26 @@ def validate_cof(d: dict, strict=True) -> bool:
|
||||||
return is_cof_valid_strict(d)
|
return is_cof_valid_strict(d)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_dnsdbflex(d: dict, strict=True) -> bool:
|
||||||
|
"""
|
||||||
|
Validate if dict d is valid dnsdbflex. It should looks like this:
|
||||||
|
{ "rrtype": <str>, "rrname": <str> }
|
||||||
|
"""
|
||||||
|
if "rrname" not in d:
|
||||||
|
print("Missing MANDATORY field 'rrname'", file = sys.stderr)
|
||||||
|
return False
|
||||||
|
if not isinstance(d['rrname'], str):
|
||||||
|
print("Type error: 'rrname' is not a JSON string", file = sys.stderr)
|
||||||
|
return False
|
||||||
|
if "rrtype" not in d:
|
||||||
|
print("Missing MANDATORY field 'rrtype'", file = sys.stderr)
|
||||||
|
return False
|
||||||
|
if not isinstance(d['rrtype'], str):
|
||||||
|
print("Type error: 'rrtype' is not a JSON string", file = sys.stderr)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# simple, poor man's unit tests.
|
# simple, poor man's unit tests.
|
||||||
|
|
||||||
|
@ -104,6 +125,9 @@ if __name__ == "__main__":
|
||||||
assert is_valid_ip("2a0c:88:77:6::1")
|
assert is_valid_ip("2a0c:88:77:6::1")
|
||||||
|
|
||||||
# COF validation
|
# COF validation
|
||||||
|
print(80 * "=", file = sys.stderr)
|
||||||
|
print("COF unit tests....", file = sys.stderr)
|
||||||
|
|
||||||
mock_input = """{"count":1909,"rdata":["cpa.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1315586409","time_last":"1449566799"}
|
mock_input = """{"count":1909,"rdata":["cpa.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1315586409","time_last":"1449566799"}
|
||||||
{"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}"""
|
{"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}"""
|
||||||
|
|
||||||
|
@ -118,5 +142,24 @@ if __name__ == "__main__":
|
||||||
for entry in ndjson.loads(test2):
|
for entry in ndjson.loads(test2):
|
||||||
assert validate_cof(entry)
|
assert validate_cof(entry)
|
||||||
|
|
||||||
|
# dnsdbflex validation
|
||||||
|
print(80 * "=", file = sys.stderr)
|
||||||
|
print("dnsdbflex unit tests....", file = sys.stderr)
|
||||||
|
|
||||||
|
mock_input = """{"rrname":"labs.deep-insights.ai.","rrtype":"A"}
|
||||||
|
{"rrname":"www.deep-insights.ca.","rrtype":"CNAME"}
|
||||||
|
{"rrname":"mail.deep-insights.ca.","rrtype":"CNAME"}
|
||||||
|
{"rrname":"cpanel.deep-insights.ca.","rrtype":"A"}
|
||||||
|
{"rrname":"webdisk.deep-insights.ca.","rrtype":"A"}
|
||||||
|
{"rrname":"webmail.deep-insights.ca.","rrtype":"A"}"""
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
for entry in ndjson.loads(mock_input):
|
||||||
|
retval = validate_dnsdbflex(entry, strict = False)
|
||||||
|
assert retval
|
||||||
|
print("dnsdbflex line %d is valid: %s" % (i, retval))
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
|
||||||
print(80 * "=", file = sys.stderr)
|
print(80 * "=", file = sys.stderr)
|
||||||
print("Unit Tests DONE", file = sys.stderr)
|
print("Unit Tests DONE", file = sys.stderr)
|
||||||
|
|
|
@ -22,7 +22,7 @@ import ndjson
|
||||||
# from pymisp import MISPObject, MISPEvent, PyMISP
|
# from pymisp import MISPObject, MISPEvent, PyMISP
|
||||||
from pymisp import MISPObject
|
from pymisp import MISPObject
|
||||||
|
|
||||||
from cof2misp.cof import validate_cof
|
from cof2misp.cof import validate_cof, validate_dnsdbflex
|
||||||
|
|
||||||
|
|
||||||
create_specific_attributes = False # this is for https://github.com/MISP/misp-objects/pull/314
|
create_specific_attributes = False # this is for https://github.com/MISP/misp-objects/pull/314
|
||||||
|
@ -37,7 +37,7 @@ mispattributes = {'inputSource': ['file'], 'output': ['MISP objects'],
|
||||||
'format': 'misp_standard'}
|
'format': 'misp_standard'}
|
||||||
|
|
||||||
|
|
||||||
moduleinfo = {'version': '0.2', 'author': 'Aaron Kaplan',
|
moduleinfo = {'version': '0.3', 'author': 'Aaron Kaplan',
|
||||||
'description': 'Module to import the passive DNS Common Output Format (COF) and merge as a MISP objet into a MISP event.',
|
'description': 'Module to import the passive DNS Common Output Format (COF) and merge as a MISP objet into a MISP event.',
|
||||||
'module-type': ['import']}
|
'module-type': ['import']}
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ def parse_and_insert_cof(data: str) -> dict:
|
||||||
|
|
||||||
# o.add_tag('tlp:amber') # FIXME: we'll want to add a tlp: tag to the object
|
# o.add_tag('tlp:amber') # FIXME: we'll want to add a tlp: tag to the object
|
||||||
if 'bailiwick' in entry:
|
if 'bailiwick' in entry:
|
||||||
o.add_attribute('bailiwick', value=entry['bailiwick'].rstrip('.'))
|
o.add_attribute('bailiwick', value=entry['bailiwick'].rstrip('.'), distribution=0)
|
||||||
|
|
||||||
#
|
#
|
||||||
# handle the combinations of rrtype (domain, ip) on both left and right side
|
# handle the combinations of rrtype (domain, ip) on both left and right side
|
||||||
|
@ -91,26 +91,26 @@ def parse_and_insert_cof(data: str) -> dict:
|
||||||
if create_specific_attributes:
|
if create_specific_attributes:
|
||||||
if rrtype in ['A', 'AAAA', 'A6']: # address type
|
if rrtype in ['A', 'AAAA', 'A6']: # address type
|
||||||
# address type
|
# address type
|
||||||
o.add_attribute('rrname_domain', value=rrname)
|
o.add_attribute('rrname_domain', value=rrname, distribution=0)
|
||||||
for r in rdata:
|
for r in rdata:
|
||||||
o.add_attribute('rdata_ip', value=r)
|
o.add_attribute('rdata_ip', value=r, distribution=0)
|
||||||
elif rrtype in ['CNAME', 'DNAME', 'NS']: # both sides are domains
|
elif rrtype in ['CNAME', 'DNAME', 'NS']: # both sides are domains
|
||||||
o.add_attribute('rrname_domain', value=rrname)
|
o.add_attribute('rrname_domain', value=rrname, distribution=0)
|
||||||
for r in rdata:
|
for r in rdata:
|
||||||
o.add_attribute('rdata_domain', value=r)
|
o.add_attribute('rdata_domain', value=r, distribution=0)
|
||||||
elif rrtype in ['SOA']: # left side is a domain, right side is text
|
elif rrtype in ['SOA']: # left side is a domain, right side is text
|
||||||
o.add_attribute('rrname_domain', value=rrname)
|
o.add_attribute('rrname_domain', value=rrname, distribution=0)
|
||||||
|
|
||||||
#
|
#
|
||||||
# now do the regular filling up of rrname, rrtype, time_first, etc.
|
# now do the regular filling up of rrname, rrtype, time_first, etc.
|
||||||
#
|
#
|
||||||
o.add_attribute('rrname', value=rrname)
|
o.add_attribute('rrname', value=rrname, distribution=0)
|
||||||
o.add_attribute('rrtype', value=rrtype)
|
o.add_attribute('rrtype', value=rrtype, distribution=0)
|
||||||
for r in rdata:
|
for r in rdata:
|
||||||
o.add_attribute('rdata', value=r)
|
o.add_attribute('rdata', value=r, distribution=0)
|
||||||
o.add_attribute('raw_rdata', value=json.dumps(rdata)) # FIXME: do we need to hex encode it?
|
o.add_attribute('raw_rdata', value=json.dumps(rdata), distribution=0) # FIXME: do we need to hex encode it?
|
||||||
o.add_attribute('time_first', value=entry['time_first'])
|
o.add_attribute('time_first', value=entry['time_first'], distribution=0)
|
||||||
o.add_attribute('time_last', value=entry['time_last'])
|
o.add_attribute('time_last', value=entry['time_last'], distribution=0)
|
||||||
o.first_seen = entry['time_first'] # is this redundant?
|
o.first_seen = entry['time_first'] # is this redundant?
|
||||||
o.last_seen = entry['time_last']
|
o.last_seen = entry['time_last']
|
||||||
|
|
||||||
|
@ -119,7 +119,7 @@ def parse_and_insert_cof(data: str) -> dict:
|
||||||
#
|
#
|
||||||
for k in ['count', 'sensor_id', 'origin', 'text', 'time_first_ms', 'time_last_ms', 'zone_time_first', 'zone_time_last']:
|
for k in ['count', 'sensor_id', 'origin', 'text', 'time_first_ms', 'time_last_ms', 'zone_time_first', 'zone_time_last']:
|
||||||
if k in entry and entry[k]:
|
if k in entry and entry[k]:
|
||||||
o.add_attribute(k, value=entry[k])
|
o.add_attribute(k, value=entry[k], distribution=0)
|
||||||
|
|
||||||
#
|
#
|
||||||
# add COF entry to MISP object
|
# add COF entry to MISP object
|
||||||
|
@ -148,7 +148,36 @@ def parse_and_insert_dnsdbflex(data: str):
|
||||||
--------
|
--------
|
||||||
none
|
none
|
||||||
"""
|
"""
|
||||||
return {"error": "NOT IMPLEMENTED YET"} # XXX FIXME: need a MISP object for dnsdbflex
|
objects = []
|
||||||
|
try:
|
||||||
|
entries = ndjson.loads(data)
|
||||||
|
for entry in entries: # iterate over all ndjson lines
|
||||||
|
# validate here (simple validation or full JSON Schema validation)
|
||||||
|
if not validate_dnsdbflex(entry):
|
||||||
|
return {"error": "Could not validate the dnsdbflex input '%s'" % entry}
|
||||||
|
|
||||||
|
# Next, extract some fields
|
||||||
|
rrtype = entry['rrtype'].upper()
|
||||||
|
rrname = entry['rrname'].rstrip('.')
|
||||||
|
|
||||||
|
# create a new MISP object, based on the passive-dns object for each nd-JSON line
|
||||||
|
try:
|
||||||
|
o = MISPObject(name='passive-dns', standalone=False, distribution=0, comment='DNSDBFLEX import by cof2misp')
|
||||||
|
o.add_attribute('rrtype', value=rrtype, distribution=0, comment='DNSDBFLEX import by cof2misp')
|
||||||
|
o.add_attribute('rrname', value=rrname, distribution=0, comment='DNSDBFLEX import by cof2misp')
|
||||||
|
except Exception as ex:
|
||||||
|
print("could not create object. Reason: %s" % str(ex))
|
||||||
|
|
||||||
|
#
|
||||||
|
# add dnsdbflex entry to MISP object
|
||||||
|
#
|
||||||
|
objects.append(o.to_json())
|
||||||
|
|
||||||
|
r = {'results': {'Object': [json.loads(o) for o in objects]}}
|
||||||
|
except Exception as ex:
|
||||||
|
misperrors["error"] = "An error occured during parsing of input: '%s'" % (str(ex),)
|
||||||
|
return misperrors
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
def is_dnsdbflex(data: str) -> bool:
|
def is_dnsdbflex(data: str) -> bool:
|
||||||
|
|
Loading…
Reference in New Issue