From 4816844d16dadac5c9e85cfcf101ec8b9607621a Mon Sep 17 00:00:00 2001 From: aaronkaplan Date: Wed, 26 May 2021 12:38:56 +0200 Subject: [PATCH] Add a function to validate dnsdbflex output add dnsdbflex parser. It's rather easy Signed-off-by: aaronkaplan --- misp_modules/lib/cof2misp/cof.py | 40 +++++++++++---------- misp_modules/modules/import_mod/cof2misp.py | 31 ++++++++++++++-- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/misp_modules/lib/cof2misp/cof.py b/misp_modules/lib/cof2misp/cof.py index a222a3d..7b8d35c 100644 --- a/misp_modules/lib/cof2misp/cof.py +++ b/misp_modules/lib/cof2misp/cof.py @@ -27,7 +27,7 @@ def is_valid_ip(ip: str) -> bool: try: ipaddress.ip_address(ip) except Exception as ex: - print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file=sys.stderr) + print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file = sys.stderr) return False return True @@ -39,7 +39,7 @@ def is_cof_valid_strict(d: dict) -> bool: -------- True on success, False on validation failure. """ - return True # FIXME + return True # FIXME def is_cof_valid_simple(d: dict) -> bool: @@ -51,28 +51,29 @@ def is_cof_valid_simple(d: dict) -> bool: """ if "rrname" not in d: - print("Missing MANDATORY field 'rrname'", file=sys.stderr) + print("Missing MANDATORY field 'rrname'", file = sys.stderr) return False if not isinstance(d['rrname'], str): - print("Type error: 'rrname' is not a JSON string", file=sys.stderr) + print("Type error: 'rrname' is not a JSON string", file = sys.stderr) return False if "rrtype" not in d: - print("Missing MANDATORY field 'rrtype'", file=sys.stderr) + print("Missing MANDATORY field 'rrtype'", file = sys.stderr) return False if not isinstance(d['rrtype'], str): - print("Type error: 'rrtype' is not a JSON string", file=sys.stderr) + print("Type error: 'rrtype' is not a JSON string", file = sys.stderr) return False if "rdata" not in d: - print("Missing MANDATORY field 'rdata'", file=sys.stderr) + print("Missing MANDATORY field 'rdata'", file = sys.stderr) return False if "rdata" not in d: - print("Missing MANDATORY field 'rdata'", file=sys.stderr) + print("Missing MANDATORY field 'rdata'", file = sys.stderr) return False if not isinstance(d['rdata'], str) and not isinstance(d['rdata'], list): - print("'rdata' is not a list and not a string.", file=sys.stderr) + print("'rdata' is not a list and not a string.", file = sys.stderr) return False if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d): - print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields", file=sys.stderr) + print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields", + file = sys.stderr) return False # currently we don't check the OPTIONAL fields. Sorry... to be done later. return True @@ -93,22 +94,23 @@ def validate_cof(d: dict, strict=True) -> bool: else: return is_cof_valid_strict(d) + def validate_dnsdbflex(d: dict, strict=True) -> bool: """ Validate if dict d is valid dnsdbflex. It should looks like this: { "rrtype": , "rrname": } """ if "rrname" not in d: - print("Missing MANDATORY field 'rrname'", file=sys.stderr) + print("Missing MANDATORY field 'rrname'", file = sys.stderr) return False if not isinstance(d['rrname'], str): - print("Type error: 'rrname' is not a JSON string", file=sys.stderr) + print("Type error: 'rrname' is not a JSON string", file = sys.stderr) return False if "rrtype" not in d: - print("Missing MANDATORY field 'rrtype'", file=sys.stderr) + print("Missing MANDATORY field 'rrtype'", file = sys.stderr) return False if not isinstance(d['rrtype'], str): - print("Type error: 'rrtype' is not a JSON string", file=sys.stderr) + print("Type error: 'rrtype' is not a JSON string", file = sys.stderr) return False return True @@ -116,8 +118,8 @@ def validate_dnsdbflex(d: dict, strict=True) -> bool: if __name__ == "__main__": # simple, poor man's unit tests. - print(80 * "=", file=sys.stderr) - print("Unit Tests:", file=sys.stderr) + print(80 * "=", file = sys.stderr) + print("Unit Tests:", file = sys.stderr) assert not is_valid_ip("a.2.3.4") assert is_valid_ip("99.88.77.6") assert is_valid_ip("2a0c:88:77:6::1") @@ -128,7 +130,7 @@ if __name__ == "__main__": i = 0 for entry in ndjson.loads(mock_input): - retval = validate_cof(entry, strict=False) + retval = validate_cof(entry, strict = False) assert retval print("line %d is valid: %s" % (i, retval)) i += 1 @@ -137,5 +139,5 @@ if __name__ == "__main__": for entry in ndjson.loads(test2): assert validate_cof(entry) - print(80 * "=", file=sys.stderr) - print("Unit Tests DONE", file=sys.stderr) + print(80 * "=", file = sys.stderr) + print("Unit Tests DONE", file = sys.stderr) diff --git a/misp_modules/modules/import_mod/cof2misp.py b/misp_modules/modules/import_mod/cof2misp.py index 3fe36fc..abddc0b 100755 --- a/misp_modules/modules/import_mod/cof2misp.py +++ b/misp_modules/modules/import_mod/cof2misp.py @@ -22,7 +22,7 @@ import ndjson # from pymisp import MISPObject, MISPEvent, PyMISP from pymisp import MISPObject -from cof2misp.cof import validate_cof +from cof2misp.cof import validate_cof, validate_dnsdbflex create_specific_attributes = False # this is for https://github.com/MISP/misp-objects/pull/314 @@ -147,7 +147,34 @@ def parse_and_insert_dnsdbflex(data: str): -------- none """ - return {"error": "NOT IMPLEMENTED YET"} # XXX FIXME: need a MISP object for dnsdbflex + objects = [] + try: + entries = ndjson.loads(data) + for entry in entries: # iterate over all ndjson lines + + # validate here (simple validation or full JSON Schema validation) + if not validate_dnsdbflex(entry): + return {"error": "Could not validate the dnsdbflex input '%s'" % entry} + + # Next, extract some fields + rrtype = entry['rrtype'].upper() + rrname = entry['rrname'].rstrip('.') + + # create a new MISP object, based on the passive-dns object for each nd-JSON line + o = MISPObject(name='passive-dns-dnsdbflex', standalone=False, comment='created by cof2misp') + o.add_attribute('rrname', value=rrname) + o.add_attribute('rrtype', value=rrtype) + + # + # add dnsdbflex entry to MISP object + # + objects.append(o.to_json()) + + r = {'results': {'Object': [json.loads(o) for o in objects]}} + except Exception as ex: + misperrors["error"] = "An error occured during parsing of input: '%s'" % (str(ex),) + return misperrors + return r def is_dnsdbflex(data: str) -> bool: