Merge branch 'cof2misp' of github.com:aaronkaplan/misp-modules into cof2misp

pull/491/head
aaronkaplan 2021-05-02 20:51:52 +00:00
commit ff950bc50c
2 changed files with 9 additions and 12 deletions

View File

@ -65,7 +65,7 @@ def is_cof_valid_simple(d: dict) -> bool:
def validate_cof(d: dict, strict=True) -> bool: def validate_cof(d: dict, strict=False) -> bool:
"""Validate an input passive DNS COF (given as dict). """Validate an input passive DNS COF (given as dict).
strict might be set to False in order to loosen the checking. strict might be set to False in order to loosen the checking.
With strict==True, a full JSON Schema validation will happen. With strict==True, a full JSON Schema validation will happen.

View File

@ -15,9 +15,10 @@ import base64
import pprint import pprint
import ndjson import ndjson
from pymisp import MISPObject, MISPEvent, PyMISP # from pymisp import MISPObject, MISPEvent, PyMISP
from pymisp import MISPObject
from cof2misp.cof import is_valid_ip, validate_cof from cof2misp.cof import validate_cof
misperrors = {'error': 'Error'} misperrors = {'error': 'Error'}
@ -58,19 +59,17 @@ def parse_and_insert_cof(data: str) -> dict:
objects = [] objects = []
try: try:
entries = ndjson.loads(data) entries = ndjson.loads(data)
# pprint.pprint(entries)
for l in entries: # iterate over all ndjson lines for l in entries: # iterate over all ndjson lines
# validate here (simple validation or full JSON Schema validation) # validate here (simple validation or full JSON Schema validation)
# FIXME if not validate_cof(l):
return {"error": "Could not validate the COF input '%r'" % l}
# Next, extract some fields # Next, extract some fields
rrtype = l['rrtype'].upper() rrtype = l['rrtype'].upper()
rrname = l['rrname'].rstrip('.') rrname = l['rrname'].rstrip('.')
rdata = [x.rstrip('.') for x in l['rdata']] rdata = [x.rstrip('.') for x in l['rdata']]
# create a new MISP object, based on the passive-dns object for each nd-JSON line # create a new MISP object, based on the passive-dns object for each nd-JSON line
o = MISPObject(name='passive-dns', standalone=False, comment='created by cof2misp') o = MISPObject(name='passive-dns', standalone=False, comment='created by cof2misp')
@ -143,7 +142,6 @@ def parse_and_insert_dnsdbflex(data: str):
pass # XXX FIXME: need a MISP object for dnsdbflex pass # XXX FIXME: need a MISP object for dnsdbflex
def is_dnsdbflex(data: str) -> bool: def is_dnsdbflex(data: str) -> bool:
"""Check if the supplied data conforms to the dnsdbflex output (which only contains rrname and rrtype) """Check if the supplied data conforms to the dnsdbflex output (which only contains rrname and rrtype)
@ -163,12 +161,11 @@ def is_dnsdbflex(data: str) -> bool:
try: try:
j = ndjson.loads(data) j = ndjson.loads(data)
for l in j: for l in j:
if not set(l.keys()) == { 'rrname' , 'rrtype' }: if not set(l.keys()) == {'rrname', 'rrtype'}:
return False # shortcut return False # shortcut
return True return True
except Exception as _ex: except Exception as _ex:
return False return False
def is_cof(data: str) -> bool: def is_cof(data: str) -> bool:
@ -178,7 +175,7 @@ def is_cof(data: str) -> bool:
def handler(q=False): def handler(q=False):
if q is False: if q is False:
return False return False
r = {'results': []}
request = json.loads(q) request = json.loads(q)
# Parse the json, determine which type of JSON it is (dnsdbflex or COF?) # Parse the json, determine which type of JSON it is (dnsdbflex or COF?)
# Validate it # Validate it
@ -201,7 +198,7 @@ def handler(q=False):
return {'error': 'Could not find any valid COF input nor dnsdbflex input. Please have a loot at: https://datatracker.ietf.org/doc/draft-dulaunoy-dnsop-passive-dns-cof/'} return {'error': 'Could not find any valid COF input nor dnsdbflex input. Please have a loot at: https://datatracker.ietf.org/doc/draft-dulaunoy-dnsop-passive-dns-cof/'}
except Exception as ex: except Exception as ex:
print("oops, got exception %s" % str(ex)) print("oops, got exception %s" % str(ex))
return {'error': "Got exception %s" % str(ex) } return {'error': "Got exception %s" % str(ex)}
def introspection(): def introspection():