mirror of https://github.com/MISP/misp-modules
Add a function to validate dnsdbflex output
add dnsdbflex parser. It's rather easy Signed-off-by: aaronkaplan <aaron@lo-res.org>pull/507/head
parent
bbe0a1efa8
commit
4816844d16
|
@ -27,7 +27,7 @@ def is_valid_ip(ip: str) -> bool:
|
||||||
try:
|
try:
|
||||||
ipaddress.ip_address(ip)
|
ipaddress.ip_address(ip)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file=sys.stderr)
|
print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ def is_cof_valid_strict(d: dict) -> bool:
|
||||||
--------
|
--------
|
||||||
True on success, False on validation failure.
|
True on success, False on validation failure.
|
||||||
"""
|
"""
|
||||||
return True # FIXME
|
return True # FIXME
|
||||||
|
|
||||||
|
|
||||||
def is_cof_valid_simple(d: dict) -> bool:
|
def is_cof_valid_simple(d: dict) -> bool:
|
||||||
|
@ -51,28 +51,29 @@ def is_cof_valid_simple(d: dict) -> bool:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if "rrname" not in d:
|
if "rrname" not in d:
|
||||||
print("Missing MANDATORY field 'rrname'", file=sys.stderr)
|
print("Missing MANDATORY field 'rrname'", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if not isinstance(d['rrname'], str):
|
if not isinstance(d['rrname'], str):
|
||||||
print("Type error: 'rrname' is not a JSON string", file=sys.stderr)
|
print("Type error: 'rrname' is not a JSON string", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if "rrtype" not in d:
|
if "rrtype" not in d:
|
||||||
print("Missing MANDATORY field 'rrtype'", file=sys.stderr)
|
print("Missing MANDATORY field 'rrtype'", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if not isinstance(d['rrtype'], str):
|
if not isinstance(d['rrtype'], str):
|
||||||
print("Type error: 'rrtype' is not a JSON string", file=sys.stderr)
|
print("Type error: 'rrtype' is not a JSON string", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if "rdata" not in d:
|
if "rdata" not in d:
|
||||||
print("Missing MANDATORY field 'rdata'", file=sys.stderr)
|
print("Missing MANDATORY field 'rdata'", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if "rdata" not in d:
|
if "rdata" not in d:
|
||||||
print("Missing MANDATORY field 'rdata'", file=sys.stderr)
|
print("Missing MANDATORY field 'rdata'", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if not isinstance(d['rdata'], str) and not isinstance(d['rdata'], list):
|
if not isinstance(d['rdata'], str) and not isinstance(d['rdata'], list):
|
||||||
print("'rdata' is not a list and not a string.", file=sys.stderr)
|
print("'rdata' is not a list and not a string.", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d):
|
if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d):
|
||||||
print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields", file=sys.stderr)
|
print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields",
|
||||||
|
file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
# currently we don't check the OPTIONAL fields. Sorry... to be done later.
|
# currently we don't check the OPTIONAL fields. Sorry... to be done later.
|
||||||
return True
|
return True
|
||||||
|
@ -93,22 +94,23 @@ def validate_cof(d: dict, strict=True) -> bool:
|
||||||
else:
|
else:
|
||||||
return is_cof_valid_strict(d)
|
return is_cof_valid_strict(d)
|
||||||
|
|
||||||
|
|
||||||
def validate_dnsdbflex(d: dict, strict=True) -> bool:
|
def validate_dnsdbflex(d: dict, strict=True) -> bool:
|
||||||
"""
|
"""
|
||||||
Validate if dict d is valid dnsdbflex. It should looks like this:
|
Validate if dict d is valid dnsdbflex. It should looks like this:
|
||||||
{ "rrtype": <str>, "rrname": <str> }
|
{ "rrtype": <str>, "rrname": <str> }
|
||||||
"""
|
"""
|
||||||
if "rrname" not in d:
|
if "rrname" not in d:
|
||||||
print("Missing MANDATORY field 'rrname'", file=sys.stderr)
|
print("Missing MANDATORY field 'rrname'", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if not isinstance(d['rrname'], str):
|
if not isinstance(d['rrname'], str):
|
||||||
print("Type error: 'rrname' is not a JSON string", file=sys.stderr)
|
print("Type error: 'rrname' is not a JSON string", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if "rrtype" not in d:
|
if "rrtype" not in d:
|
||||||
print("Missing MANDATORY field 'rrtype'", file=sys.stderr)
|
print("Missing MANDATORY field 'rrtype'", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
if not isinstance(d['rrtype'], str):
|
if not isinstance(d['rrtype'], str):
|
||||||
print("Type error: 'rrtype' is not a JSON string", file=sys.stderr)
|
print("Type error: 'rrtype' is not a JSON string", file = sys.stderr)
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -116,8 +118,8 @@ def validate_dnsdbflex(d: dict, strict=True) -> bool:
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# simple, poor man's unit tests.
|
# simple, poor man's unit tests.
|
||||||
|
|
||||||
print(80 * "=", file=sys.stderr)
|
print(80 * "=", file = sys.stderr)
|
||||||
print("Unit Tests:", file=sys.stderr)
|
print("Unit Tests:", file = sys.stderr)
|
||||||
assert not is_valid_ip("a.2.3.4")
|
assert not is_valid_ip("a.2.3.4")
|
||||||
assert is_valid_ip("99.88.77.6")
|
assert is_valid_ip("99.88.77.6")
|
||||||
assert is_valid_ip("2a0c:88:77:6::1")
|
assert is_valid_ip("2a0c:88:77:6::1")
|
||||||
|
@ -128,7 +130,7 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
for entry in ndjson.loads(mock_input):
|
for entry in ndjson.loads(mock_input):
|
||||||
retval = validate_cof(entry, strict=False)
|
retval = validate_cof(entry, strict = False)
|
||||||
assert retval
|
assert retval
|
||||||
print("line %d is valid: %s" % (i, retval))
|
print("line %d is valid: %s" % (i, retval))
|
||||||
i += 1
|
i += 1
|
||||||
|
@ -137,5 +139,5 @@ if __name__ == "__main__":
|
||||||
for entry in ndjson.loads(test2):
|
for entry in ndjson.loads(test2):
|
||||||
assert validate_cof(entry)
|
assert validate_cof(entry)
|
||||||
|
|
||||||
print(80 * "=", file=sys.stderr)
|
print(80 * "=", file = sys.stderr)
|
||||||
print("Unit Tests DONE", file=sys.stderr)
|
print("Unit Tests DONE", file = sys.stderr)
|
||||||
|
|
|
@ -22,7 +22,7 @@ import ndjson
|
||||||
# from pymisp import MISPObject, MISPEvent, PyMISP
|
# from pymisp import MISPObject, MISPEvent, PyMISP
|
||||||
from pymisp import MISPObject
|
from pymisp import MISPObject
|
||||||
|
|
||||||
from cof2misp.cof import validate_cof
|
from cof2misp.cof import validate_cof, validate_dnsdbflex
|
||||||
|
|
||||||
|
|
||||||
create_specific_attributes = False # this is for https://github.com/MISP/misp-objects/pull/314
|
create_specific_attributes = False # this is for https://github.com/MISP/misp-objects/pull/314
|
||||||
|
@ -147,7 +147,34 @@ def parse_and_insert_dnsdbflex(data: str):
|
||||||
--------
|
--------
|
||||||
none
|
none
|
||||||
"""
|
"""
|
||||||
return {"error": "NOT IMPLEMENTED YET"} # XXX FIXME: need a MISP object for dnsdbflex
|
objects = []
|
||||||
|
try:
|
||||||
|
entries = ndjson.loads(data)
|
||||||
|
for entry in entries: # iterate over all ndjson lines
|
||||||
|
|
||||||
|
# validate here (simple validation or full JSON Schema validation)
|
||||||
|
if not validate_dnsdbflex(entry):
|
||||||
|
return {"error": "Could not validate the dnsdbflex input '%s'" % entry}
|
||||||
|
|
||||||
|
# Next, extract some fields
|
||||||
|
rrtype = entry['rrtype'].upper()
|
||||||
|
rrname = entry['rrname'].rstrip('.')
|
||||||
|
|
||||||
|
# create a new MISP object, based on the passive-dns object for each nd-JSON line
|
||||||
|
o = MISPObject(name='passive-dns-dnsdbflex', standalone=False, comment='created by cof2misp')
|
||||||
|
o.add_attribute('rrname', value=rrname)
|
||||||
|
o.add_attribute('rrtype', value=rrtype)
|
||||||
|
|
||||||
|
#
|
||||||
|
# add dnsdbflex entry to MISP object
|
||||||
|
#
|
||||||
|
objects.append(o.to_json())
|
||||||
|
|
||||||
|
r = {'results': {'Object': [json.loads(o) for o in objects]}}
|
||||||
|
except Exception as ex:
|
||||||
|
misperrors["error"] = "An error occured during parsing of input: '%s'" % (str(ex),)
|
||||||
|
return misperrors
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
def is_dnsdbflex(data: str) -> bool:
|
def is_dnsdbflex(data: str) -> bool:
|
||||||
|
|
Loading…
Reference in New Issue