2021-05-02 18:45:55 +02:00
|
|
|
"""
|
|
|
|
Common Output Format for passive DNS library.
|
|
|
|
|
2021-05-04 09:44:47 +02:00
|
|
|
Copyright 2021: Farsight Security (https://www.farsightsecurity.com/)
|
|
|
|
|
|
|
|
Author: Aaron Kaplan <aaron@lo-res.org>
|
|
|
|
|
|
|
|
Released under the Apache 2.0 license.
|
|
|
|
See: https://www.apache.org/licenses/LICENSE-2.0.txt
|
|
|
|
|
2021-05-02 18:45:55 +02:00
|
|
|
"""
|
|
|
|
|
|
|
|
import ipaddress
|
|
|
|
import sys
|
|
|
|
import ndjson
|
|
|
|
|
|
|
|
|
|
|
|
def is_valid_ip(ip: str) -> bool:
|
|
|
|
"""Check if an IP address given as string would be convertible to
|
|
|
|
an ipaddress object (and thus if it is a valid IP).
|
|
|
|
|
|
|
|
Returns
|
|
|
|
--------
|
|
|
|
True on success, False on validation failure.
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
ipaddress.ip_address(ip)
|
|
|
|
except Exception as ex:
|
2021-05-26 12:38:56 +02:00
|
|
|
print("is_valid_ip(%s) returned False. Reason: %s" % (ip, str(ex)), file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
2021-05-03 00:01:50 +02:00
|
|
|
def is_cof_valid_strict(d: dict) -> bool:
|
|
|
|
"""Check the COF - do the full JSON schema validation.
|
|
|
|
|
|
|
|
Returns
|
|
|
|
--------
|
|
|
|
True on success, False on validation failure.
|
|
|
|
"""
|
2021-05-26 12:38:56 +02:00
|
|
|
return True # FIXME
|
2021-05-03 00:01:50 +02:00
|
|
|
|
|
|
|
|
2021-05-02 18:45:55 +02:00
|
|
|
def is_cof_valid_simple(d: dict) -> bool:
|
|
|
|
"""Check MANDATORY fields according to COF - simple check, do not do the full JSON schema validation.
|
|
|
|
|
|
|
|
Returns
|
|
|
|
--------
|
|
|
|
True on success, False on validation failure.
|
|
|
|
"""
|
|
|
|
|
|
|
|
if "rrname" not in d:
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Missing MANDATORY field 'rrname'", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
if not isinstance(d['rrname'], str):
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Type error: 'rrname' is not a JSON string", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
if "rrtype" not in d:
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Missing MANDATORY field 'rrtype'", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
if not isinstance(d['rrtype'], str):
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Type error: 'rrtype' is not a JSON string", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
if "rdata" not in d:
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Missing MANDATORY field 'rdata'", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
if "rdata" not in d:
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Missing MANDATORY field 'rdata'", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
if not isinstance(d['rdata'], str) and not isinstance(d['rdata'], list):
|
2021-05-26 12:38:56 +02:00
|
|
|
print("'rdata' is not a list and not a string.", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
2021-05-02 23:11:10 +02:00
|
|
|
if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d):
|
2021-05-26 12:38:56 +02:00
|
|
|
print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields",
|
|
|
|
file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
return False
|
|
|
|
# currently we don't check the OPTIONAL fields. Sorry... to be done later.
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
2021-05-03 00:01:50 +02:00
|
|
|
def validate_cof(d: dict, strict=True) -> bool:
|
2021-05-02 18:45:55 +02:00
|
|
|
"""Validate an input passive DNS COF (given as dict).
|
|
|
|
strict might be set to False in order to loosen the checking.
|
|
|
|
With strict==True, a full JSON Schema validation will happen.
|
|
|
|
|
|
|
|
|
|
|
|
Returns
|
|
|
|
--------
|
|
|
|
True on success, False on validation failure.
|
|
|
|
"""
|
|
|
|
if not strict:
|
|
|
|
return is_cof_valid_simple(d)
|
2021-05-03 00:01:50 +02:00
|
|
|
else:
|
|
|
|
return is_cof_valid_strict(d)
|
2021-05-02 18:45:55 +02:00
|
|
|
|
2021-05-26 12:38:56 +02:00
|
|
|
|
2021-05-26 12:16:11 +02:00
|
|
|
def validate_dnsdbflex(d: dict, strict=True) -> bool:
|
|
|
|
"""
|
|
|
|
Validate if dict d is valid dnsdbflex. It should looks like this:
|
|
|
|
{ "rrtype": <str>, "rrname": <str> }
|
|
|
|
"""
|
|
|
|
if "rrname" not in d:
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Missing MANDATORY field 'rrname'", file = sys.stderr)
|
2021-05-26 12:16:11 +02:00
|
|
|
return False
|
|
|
|
if not isinstance(d['rrname'], str):
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Type error: 'rrname' is not a JSON string", file = sys.stderr)
|
2021-05-26 12:16:11 +02:00
|
|
|
return False
|
|
|
|
if "rrtype" not in d:
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Missing MANDATORY field 'rrtype'", file = sys.stderr)
|
2021-05-26 12:16:11 +02:00
|
|
|
return False
|
|
|
|
if not isinstance(d['rrtype'], str):
|
2021-05-26 12:38:56 +02:00
|
|
|
print("Type error: 'rrtype' is not a JSON string", file = sys.stderr)
|
2021-05-26 12:16:11 +02:00
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
2021-05-02 18:45:55 +02:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# simple, poor man's unit tests.
|
|
|
|
|
2021-05-26 12:38:56 +02:00
|
|
|
print(80 * "=", file = sys.stderr)
|
|
|
|
print("Unit Tests:", file = sys.stderr)
|
2021-05-02 18:45:55 +02:00
|
|
|
assert not is_valid_ip("a.2.3.4")
|
|
|
|
assert is_valid_ip("99.88.77.6")
|
|
|
|
assert is_valid_ip("2a0c:88:77:6::1")
|
|
|
|
|
|
|
|
# COF validation
|
2021-06-17 16:33:15 +02:00
|
|
|
print(80 * "=", file = sys.stderr)
|
|
|
|
print("COF unit tests....", file = sys.stderr)
|
|
|
|
|
2021-05-02 18:45:55 +02:00
|
|
|
mock_input = """{"count":1909,"rdata":["cpa.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1315586409","time_last":"1449566799"}
|
|
|
|
{"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}"""
|
|
|
|
|
|
|
|
i = 0
|
2021-05-02 23:22:48 +02:00
|
|
|
for entry in ndjson.loads(mock_input):
|
2021-05-26 12:38:56 +02:00
|
|
|
retval = validate_cof(entry, strict = False)
|
2021-05-02 18:45:55 +02:00
|
|
|
assert retval
|
2021-05-02 23:11:10 +02:00
|
|
|
print("line %d is valid: %s" % (i, retval))
|
2021-05-02 18:45:55 +02:00
|
|
|
i += 1
|
|
|
|
|
2021-05-03 00:01:50 +02:00
|
|
|
test2 = '{"count": 2, "time_first": 1619556027, "time_last": 1619556034, "rrname": "westernunion.com.ph.unblock-all.com.beta.opera-mini.net.", "rrtype": "A", "bailiwick": "beta.opera-mini.net.", "rdata": ["185.26.181.253"]}'
|
|
|
|
for entry in ndjson.loads(test2):
|
|
|
|
assert validate_cof(entry)
|
|
|
|
|
2021-06-17 16:33:15 +02:00
|
|
|
# dnsdbflex validation
|
|
|
|
print(80 * "=", file = sys.stderr)
|
|
|
|
print("dnsdbflex unit tests....", file = sys.stderr)
|
|
|
|
|
|
|
|
mock_input = """{"rrname":"labs.deep-insights.ai.","rrtype":"A"}
|
|
|
|
{"rrname":"www.deep-insights.ca.","rrtype":"CNAME"}
|
|
|
|
{"rrname":"mail.deep-insights.ca.","rrtype":"CNAME"}
|
|
|
|
{"rrname":"cpanel.deep-insights.ca.","rrtype":"A"}
|
|
|
|
{"rrname":"webdisk.deep-insights.ca.","rrtype":"A"}
|
|
|
|
{"rrname":"webmail.deep-insights.ca.","rrtype":"A"}"""
|
|
|
|
|
|
|
|
i = 0
|
|
|
|
for entry in ndjson.loads(mock_input):
|
|
|
|
retval = validate_dnsdbflex(entry, strict = False)
|
|
|
|
assert retval
|
|
|
|
print("dnsdbflex line %d is valid: %s" % (i, retval))
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
2021-05-26 12:38:56 +02:00
|
|
|
print(80 * "=", file = sys.stderr)
|
|
|
|
print("Unit Tests DONE", file = sys.stderr)
|