diff --git a/misp_modules/lib/cof2misp/cof.py b/misp_modules/lib/cof2misp/cof.py index 395569e..d7420a0 100644 --- a/misp_modules/lib/cof2misp/cof.py +++ b/misp_modules/lib/cof2misp/cof.py @@ -27,7 +27,7 @@ def is_valid_ip(ip: str) -> bool: try: ipaddress.ip_address(ip) except Exception as ex: - print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file=sys.stderr) + print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file = sys.stderr) return False return True @@ -39,7 +39,7 @@ def is_cof_valid_strict(d: dict) -> bool: -------- True on success, False on validation failure. """ - return True # FIXME + return True # FIXME def is_cof_valid_simple(d: dict) -> bool: @@ -51,28 +51,29 @@ def is_cof_valid_simple(d: dict) -> bool: """ if "rrname" not in d: - print("Missing MANDATORY field 'rrname'", file=sys.stderr) + print("Missing MANDATORY field 'rrname'", file = sys.stderr) return False if not isinstance(d['rrname'], str): - print("Type error: 'rrname' is not a JSON string", file=sys.stderr) + print("Type error: 'rrname' is not a JSON string", file = sys.stderr) return False if "rrtype" not in d: - print("Missing MANDATORY field 'rrtype'", file=sys.stderr) + print("Missing MANDATORY field 'rrtype'", file = sys.stderr) return False if not isinstance(d['rrtype'], str): - print("Type error: 'rrtype' is not a JSON string", file=sys.stderr) + print("Type error: 'rrtype' is not a JSON string", file = sys.stderr) return False if "rdata" not in d: - print("Missing MANDATORY field 'rdata'", file=sys.stderr) + print("Missing MANDATORY field 'rdata'", file = sys.stderr) return False if "rdata" not in d: - print("Missing MANDATORY field 'rdata'", file=sys.stderr) + print("Missing MANDATORY field 'rdata'", file = sys.stderr) return False if not isinstance(d['rdata'], str) and not isinstance(d['rdata'], list): - print("'rdata' is not a list and not a string.", file=sys.stderr) + print("'rdata' is not a list and not a string.", file = sys.stderr) return False if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d): - print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields", file=sys.stderr) + print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields", + file = sys.stderr) return False # currently we don't check the OPTIONAL fields. Sorry... to be done later. return True @@ -94,22 +95,45 @@ def validate_cof(d: dict, strict=True) -> bool: return is_cof_valid_strict(d) +def validate_dnsdbflex(d: dict, strict=True) -> bool: + """ + Validate if dict d is valid dnsdbflex. It should looks like this: + { "rrtype": , "rrname": } + """ + if "rrname" not in d: + print("Missing MANDATORY field 'rrname'", file = sys.stderr) + return False + if not isinstance(d['rrname'], str): + print("Type error: 'rrname' is not a JSON string", file = sys.stderr) + return False + if "rrtype" not in d: + print("Missing MANDATORY field 'rrtype'", file = sys.stderr) + return False + if not isinstance(d['rrtype'], str): + print("Type error: 'rrtype' is not a JSON string", file = sys.stderr) + return False + return True + + if __name__ == "__main__": # simple, poor man's unit tests. - print(80 * "=", file=sys.stderr) - print("Unit Tests:", file=sys.stderr) + print(80 * "=", file = sys.stderr) + print("Unit Tests:", file = sys.stderr) assert not is_valid_ip("a.2.3.4") assert is_valid_ip("99.88.77.6") assert is_valid_ip("2a0c:88:77:6::1") # COF validation + print(80 * "=", file = sys.stderr) + print("COF unit tests....", file = sys.stderr) + mock_input = """{"count":1909,"rdata":["cpa.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1315586409","time_last":"1449566799"} {"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}""" i = 0 for entry in ndjson.loads(mock_input): - retval = validate_cof(entry, strict=False) + retval = validate_cof(entry, strict = False) assert retval print("line %d is valid: %s" % (i, retval)) i += 1 @@ -118,5 +142,24 @@ if __name__ == "__main__": for entry in ndjson.loads(test2): assert validate_cof(entry) - print(80 * "=", file=sys.stderr) - print("Unit Tests DONE", file=sys.stderr) + # dnsdbflex validation + print(80 * "=", file = sys.stderr) + print("dnsdbflex unit tests....", file = sys.stderr) + + mock_input = """{"rrname":"labs.deep-insights.ai.","rrtype":"A"} +{"rrname":"www.deep-insights.ca.","rrtype":"CNAME"} +{"rrname":"mail.deep-insights.ca.","rrtype":"CNAME"} +{"rrname":"cpanel.deep-insights.ca.","rrtype":"A"} +{"rrname":"webdisk.deep-insights.ca.","rrtype":"A"} +{"rrname":"webmail.deep-insights.ca.","rrtype":"A"}""" + + i = 0 + for entry in ndjson.loads(mock_input): + retval = validate_dnsdbflex(entry, strict = False) + assert retval + print("dnsdbflex line %d is valid: %s" % (i, retval)) + i += 1 + + + print(80 * "=", file = sys.stderr) + print("Unit Tests DONE", file = sys.stderr) diff --git a/misp_modules/modules/import_mod/cof2misp.py b/misp_modules/modules/import_mod/cof2misp.py index bca2df5..841da09 100755 --- a/misp_modules/modules/import_mod/cof2misp.py +++ b/misp_modules/modules/import_mod/cof2misp.py @@ -22,7 +22,7 @@ import ndjson # from pymisp import MISPObject, MISPEvent, PyMISP from pymisp import MISPObject -from cof2misp.cof import validate_cof +from cof2misp.cof import validate_cof, validate_dnsdbflex create_specific_attributes = False # this is for https://github.com/MISP/misp-objects/pull/314 @@ -37,7 +37,7 @@ mispattributes = {'inputSource': ['file'], 'output': ['MISP objects'], 'format': 'misp_standard'} -moduleinfo = {'version': '0.2', 'author': 'Aaron Kaplan', +moduleinfo = {'version': '0.3', 'author': 'Aaron Kaplan', 'description': 'Module to import the passive DNS Common Output Format (COF) and merge as a MISP objet into a MISP event.', 'module-type': ['import']} @@ -82,7 +82,7 @@ def parse_and_insert_cof(data: str) -> dict: # o.add_tag('tlp:amber') # FIXME: we'll want to add a tlp: tag to the object if 'bailiwick' in entry: - o.add_attribute('bailiwick', value=entry['bailiwick'].rstrip('.')) + o.add_attribute('bailiwick', value=entry['bailiwick'].rstrip('.'), distribution=0) # # handle the combinations of rrtype (domain, ip) on both left and right side @@ -91,26 +91,26 @@ def parse_and_insert_cof(data: str) -> dict: if create_specific_attributes: if rrtype in ['A', 'AAAA', 'A6']: # address type # address type - o.add_attribute('rrname_domain', value=rrname) + o.add_attribute('rrname_domain', value=rrname, distribution=0) for r in rdata: - o.add_attribute('rdata_ip', value=r) + o.add_attribute('rdata_ip', value=r, distribution=0) elif rrtype in ['CNAME', 'DNAME', 'NS']: # both sides are domains - o.add_attribute('rrname_domain', value=rrname) + o.add_attribute('rrname_domain', value=rrname, distribution=0) for r in rdata: - o.add_attribute('rdata_domain', value=r) + o.add_attribute('rdata_domain', value=r, distribution=0) elif rrtype in ['SOA']: # left side is a domain, right side is text - o.add_attribute('rrname_domain', value=rrname) + o.add_attribute('rrname_domain', value=rrname, distribution=0) # # now do the regular filling up of rrname, rrtype, time_first, etc. # - o.add_attribute('rrname', value=rrname) - o.add_attribute('rrtype', value=rrtype) + o.add_attribute('rrname', value=rrname, distribution=0) + o.add_attribute('rrtype', value=rrtype, distribution=0) for r in rdata: - o.add_attribute('rdata', value=r) - o.add_attribute('raw_rdata', value=json.dumps(rdata)) # FIXME: do we need to hex encode it? - o.add_attribute('time_first', value=entry['time_first']) - o.add_attribute('time_last', value=entry['time_last']) + o.add_attribute('rdata', value=r, distribution=0) + o.add_attribute('raw_rdata', value=json.dumps(rdata), distribution=0) # FIXME: do we need to hex encode it? + o.add_attribute('time_first', value=entry['time_first'], distribution=0) + o.add_attribute('time_last', value=entry['time_last'], distribution=0) o.first_seen = entry['time_first'] # is this redundant? o.last_seen = entry['time_last'] @@ -119,7 +119,7 @@ def parse_and_insert_cof(data: str) -> dict: # for k in ['count', 'sensor_id', 'origin', 'text', 'time_first_ms', 'time_last_ms', 'zone_time_first', 'zone_time_last']: if k in entry and entry[k]: - o.add_attribute(k, value=entry[k]) + o.add_attribute(k, value=entry[k], distribution=0) # # add COF entry to MISP object @@ -148,7 +148,36 @@ def parse_and_insert_dnsdbflex(data: str): -------- none """ - return {"error": "NOT IMPLEMENTED YET"} # XXX FIXME: need a MISP object for dnsdbflex + objects = [] + try: + entries = ndjson.loads(data) + for entry in entries: # iterate over all ndjson lines + # validate here (simple validation or full JSON Schema validation) + if not validate_dnsdbflex(entry): + return {"error": "Could not validate the dnsdbflex input '%s'" % entry} + + # Next, extract some fields + rrtype = entry['rrtype'].upper() + rrname = entry['rrname'].rstrip('.') + + # create a new MISP object, based on the passive-dns object for each nd-JSON line + try: + o = MISPObject(name='passive-dns', standalone=False, distribution=0, comment='DNSDBFLEX import by cof2misp') + o.add_attribute('rrtype', value=rrtype, distribution=0, comment='DNSDBFLEX import by cof2misp') + o.add_attribute('rrname', value=rrname, distribution=0, comment='DNSDBFLEX import by cof2misp') + except Exception as ex: + print("could not create object. Reason: %s" % str(ex)) + + # + # add dnsdbflex entry to MISP object + # + objects.append(o.to_json()) + + r = {'results': {'Object': [json.loads(o) for o in objects]}} + except Exception as ex: + misperrors["error"] = "An error occured during parsing of input: '%s'" % (str(ex),) + return misperrors + return r def is_dnsdbflex(data: str) -> bool: